This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c54d8ad4692768ff270947bc9a2f0f6fe629c701 Author: stiga-huang <[email protected]> AuthorDate: Sun Sep 29 18:42:42 2024 +0800 IMPALA-13340: Fix missing partitions in COPY TESTCASE of LocalCatalog mode There are 3 places that we should fix: *Exporting testcase files* In LocalCatalog mode coordinators, to export the testcase file, LocalFsTable objects are converted to THdfsTable objects. In this step, coordinators should set the field of 'has_full_partitions' to true. Otherwise, the partition map will be ignored when catalogd imports the THdfsTable object. *Importing testcase files* When importing the THdfsTable object, catalogd should regenerate the partition ids since those in the testcase file are usually generated by the other catalogd instance (of another cluster). Reusing them might conflict with the existing partition ids. Note that partition ids are incremental ids generated by catalogd itself (starts from 0 at bootstrap). Table.loadFromThrift() is primarily used in coordinator side to load metadata from catalogd. We always set 'storedInImpaladCatalogCache_' to true in this method. However, this method is also used in catalogd to import metadata from a testcase file. This patch adds a parameter to this method to distinguish where it's used. So we can decide whether to reuse the partition ids or generate new ones. *Fetching metadata from catalogd* When catalogd processes the getPartialCatalog requests on the imported partitions, HdfsPartition#setPartitionMetadata() is used to update the TPartialPartitionInfo instance. Previously this method used 'cachedMsPartitionDescriptor_ == null' to detect prototype partitions or the only partition of unpartitioned tables. This is incorrect now since HdfsPartitions imported from testcase files won't have 'cachedMsPartitionDescriptor_' set. The values of this field come from msPartition objects from HMS and are not passed to the coordinators, thus do not exist in the exported testcase files. This patch fixes the condition to check prototype partition and unpartitioned tables correctly. Tests - Added e2e tests to dump the partitioned table and verify the partition and file metadata after importing it back. The test also verify that we can get the same query plan after importing the testcase file. - Moved the util method __get_partition_id_set() from test_reuse_partitions.py to ImpalaTestSuite so we can reuse it in the new test. Also renamed it to get_partition_id_set(). Change-Id: Icc2e8b71564ad37973ddfca92801afea8e26ff73 Reviewed-on: http://gerrit.cloudera.org:8080/21864 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/HdfsPartition.java | 7 ++- .../java/org/apache/impala/catalog/HdfsTable.java | 18 +++++- .../org/apache/impala/catalog/ImpaladCatalog.java | 2 +- .../main/java/org/apache/impala/catalog/Table.java | 5 +- .../impala/catalog/local/CatalogdMetaProvider.java | 2 +- .../apache/impala/catalog/local/LocalFsTable.java | 1 + .../apache/impala/service/CatalogOpExecutor.java | 17 +++++- .../catalog/CatalogObjectToFromThriftTest.java | 9 ++- tests/common/impala_test_suite.py | 17 ++++++ tests/metadata/test_reuse_partitions.py | 40 ++++--------- tests/metadata/test_testcase_builder.py | 70 +++++++++++++++++----- 11 files changed, 126 insertions(+), 62 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 038b4ee47..e194a05e7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -1071,8 +1071,11 @@ public class HdfsPartition extends CatalogObjectImpl public void setPartitionMetadata(TPartialPartitionInfo tPart) { // The special "prototype partition" or the only partition of an unpartitioned table - // have a null cachedMsPartitionDescriptor. - if (cachedMsPartitionDescriptor_ == null) return; + // don't have partition metadata. + if (id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID + || !table_.isPartitioned()) { + return; + } // Don't need to make a copy here since the caller should not modify the parameters. tPart.hms_parameters = getParameters(); tPart.write_id = writeId_; diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 009e702f9..40e0750bb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2098,9 +2098,21 @@ public class HdfsTable extends Table implements FeFsTable { try { if (hdfsTable.has_full_partitions) { for (THdfsPartition tPart : hdfsTable.getPartitions().values()) { - addPartition(new HdfsPartition.Builder(this, tPart.id) - .fromThrift(tPart) - .build()); + HdfsPartition.Builder builder; + // This method is used in two places: + // - legacy-catalog mode coordinators applying updates from catalogd + // - catalogd importing a testcase file + // In the first case, coordinator should reuse the partition ids to be + // consistent with catalogd. + // In the second case, catalogd should generate the partition ids by itself + // since the ids generated by other catalogds might conflict with the local + // ones. + if (storedInImpaladCatalogCache_) { + builder = new HdfsPartition.Builder(this, tPart.id); + } else { + builder = new HdfsPartition.Builder(this); + } + addPartition(builder.fromThrift(tPart).build()); } } prototypePartition_ = diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index a13f28f76..a5237008d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -486,7 +486,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { Preconditions.checkNotNull(newPartitions); Table existingTable = db.getTable(thriftTable.tbl_name); - Table newTable = Table.fromThrift(db, thriftTable); + Table newTable = Table.fromThrift(db, thriftTable, true); newTable.setCatalogVersion(catalogVersion); newTable.setLastLoadedTimeMs(lastLoadedTime); if (existingTable != null && existingTable.getCatalogVersion() >= catalogVersion) { diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index d8866613d..e93ed0153 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -566,7 +566,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { * Factory method that creates a new Table from its Thrift representation. * Determines the type of table to create based on the Thrift table provided. */ - public static Table fromThrift(Db parentDb, TTable thriftTable) + public static Table fromThrift(Db parentDb, TTable thriftTable, boolean loadedInImpalad) throws TableLoadingException { CatalogInterners.internFieldsInPlace(thriftTable); Table newTable; @@ -587,6 +587,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name(), tblType, MetadataOp.getTableComment(thriftTable.getMetastore_table())); } + newTable.storedInImpaladCatalogCache_ = loadedInImpalad; try { newTable.loadFromThrift(thriftTable); } catch (IcebergTableLoadingException e) { @@ -637,8 +638,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { TAccessLevel.READ_WRITE; storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time_ns(); - - storedInImpaladCatalogCache_ = true; } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index a00b5be2a..275ebb4a5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -1106,7 +1106,7 @@ public class CatalogdMetaProvider implements MetaProvider { } else { checkResponse(table.msTable_.getPartitionKeysSize() == 0, req, "Should not return a partition with missing partition meta unless " + - "the table is unpartitioned"); + "the table is unpartitioned: %s", part); // For the only partition of a nonpartitioned table, reuse table-level metadata. try { hdfsStorageDescriptor = HdfsStorageDescriptor.fromStorageDescriptor( diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java index a60ba7c23..c9d10d5c0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java @@ -347,6 +347,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable { THdfsTable hdfsTable = new THdfsTable(getHdfsBaseDir(), getColumnNames(), getNullPartitionKeyValue(), nullColumnValue_, idToPartition, tPrototypePartition); + hdfsTable.setHas_full_partitions(true); if (avroSchema_ != null) { hdfsTable.setAvroSchema(avroSchema_); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 954f87b4a..20937c57b 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -127,6 +127,7 @@ import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.PartitionNotFoundException; import org.apache.impala.catalog.PartitionStatsUtil; +import org.apache.impala.catalog.PrunablePartition; import org.apache.impala.catalog.RowFormat; import org.apache.impala.catalog.ScalarFunction; import org.apache.impala.catalog.Table; @@ -146,7 +147,6 @@ import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.events.MetastoreEventsProcessor; -import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus; import org.apache.impala.catalog.events.MetastoreNotificationException; import org.apache.impala.catalog.monitor.CatalogMonitor; import org.apache.impala.catalog.monitor.CatalogOperationTracker; @@ -717,12 +717,14 @@ public class CatalogOpExecutor { int numTblsAdded = 0; int numViewsAdded = 0; + int numPartsAdded = 0; + int numFilesAdded = 0; if (testCaseData.getTables_and_views() != null) { for (TTable tTable : testCaseData.tables_and_views) { Db db = catalog_.getDb(tTable.db_name); // Db should have been created by now. Preconditions.checkNotNull(db, String.format("Missing db %s", tTable.db_name)); - Table t = Table.fromThrift(db, tTable); + Table t = Table.fromThrift(db, tTable, /*loadedInImpalad*/false); // Set a new version to force an overwrite if a table already exists with the same // name. t.setCatalogVersion(catalog_.incrementAndGetCatalogVersion()); @@ -738,6 +740,14 @@ public class CatalogOpExecutor { t.takeReadLock(); try { addTableToCatalogUpdate(t, wantMinimalResult, response.result); + if (t instanceof HdfsTable) { + HdfsTable hdfsTable = (HdfsTable) t; + for (PrunablePartition p : hdfsTable.getPartitions()) { + HdfsPartition part = (HdfsPartition) p; + numFilesAdded += part.getNumFileDescriptors(); + ++numPartsAdded; + } + } } finally { t.releaseReadLock(); } @@ -750,7 +760,8 @@ public class CatalogOpExecutor { "%d db(s), %d table(s) and %d view(s) imported for query: ", numDbsAdded, numTblsAdded, numViewsAdded)); responseStr.append("\n\n").append(testCaseData.getQuery_stmt()); - LOG.info(String.format("%s. Testcase path: %s", responseStr, inputPath)); + LOG.info("{}\n\nTotal partitions: {}. Total files: {}. Testcase path: {}", + responseStr, numPartsAdded, numFilesAdded, inputPath); addSummary(response, responseStr.toString()); return testCaseData.getQuery_stmt(); } diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java index 9789bc62c..8b4f14bde 100644 --- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java @@ -19,7 +19,6 @@ package org.apache.impala.catalog; import static org.junit.Assert.fail; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -90,7 +89,7 @@ public class CatalogObjectToFromThriftTest { } // Now try to load the thrift struct. - Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable); + Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable, true); Assert.assertTrue(newTable instanceof HdfsTable); Assert.assertEquals(newTable.name_, thriftTable.tbl_name); Assert.assertEquals(newTable.numClusteringCols_, 2); @@ -141,7 +140,7 @@ public class CatalogObjectToFromThriftTest { // Now try to load the thrift struct. Table newTable = Table.fromThrift(catalog_.getDb("functional_avro_snap"), - thriftTable); + thriftTable, true); Assert.assertEquals(newTable.getColumns().size(), 9); // The table schema does not match the Avro schema - it has only 2 columns. @@ -167,7 +166,7 @@ public class CatalogObjectToFromThriftTest { Assert.assertTrue(!isBinaryEncoded); } - Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable); + Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable, true); Assert.assertTrue(newTable instanceof HBaseTable); HBaseTable newHBaseTable = (HBaseTable) newTable; Assert.assertEquals(newHBaseTable.getColumns().size(), 13); @@ -201,7 +200,7 @@ public class CatalogObjectToFromThriftTest { // Verify that creating a table from this thrift struct results in a valid // Table. - Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable); + Table newTable = Table.fromThrift(catalog_.getDb(dbName), thriftTable, true); Assert.assertTrue(newTable instanceof HBaseTable); HBaseTable newHBaseTable = (HBaseTable) newTable; Assert.assertEquals(newHBaseTable.getColumns().size(), 13); diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index f092a4375..e81b47f44 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -165,6 +165,10 @@ SET_PATTERN = re.compile( METRICS_URL = 'http://{0}:25000/metrics?json'.format(IMPALAD_HOSTNAME) VARZ_URL = 'http://{0}:25000/varz?json'.format(IMPALAD_HOSTNAME) +JSON_TABLE_OBJECT_URL =\ + "http://{0}:25020/catalog_object?".format(IMPALAD_HOSTNAME) +\ + "json&object_type=TABLE&object_name={0}.{1}" + GROUP_NAME = grp.getgrgid(pwd.getpwnam(getuser()).pw_gid).gr_name EXEC_OPTION_NAMES = set([val.lower() @@ -474,6 +478,19 @@ class ImpalaTestSuite(BaseTestSuite): result.append(tuple(result_fields)) return result + def get_partition_id_set(self, db_name, tbl_name): + obj_url = JSON_TABLE_OBJECT_URL.format(db_name, tbl_name) + response = requests.get(obj_url) + assert response.status_code == requests.codes.ok + json_response = json.loads(response.text) + assert "json_string" in json_response, json_response + catalog_obj = json.loads(json_response["json_string"]) + assert "table" in catalog_obj, catalog_obj + assert "hdfs_table" in catalog_obj["table"], catalog_obj["table"] + tbl_obj = catalog_obj["table"]["hdfs_table"] + assert "partitions" in tbl_obj, tbl_obj + return set(tbl_obj["partitions"].keys()) + def get_debug_page(self, page_url): """Returns the content of the debug page 'page_url' as json.""" response = requests.get(page_url) diff --git a/tests/metadata/test_reuse_partitions.py b/tests/metadata/test_reuse_partitions.py index e732f8423..815683056 100644 --- a/tests/metadata/test_reuse_partitions.py +++ b/tests/metadata/test_reuse_partitions.py @@ -25,9 +25,6 @@ from tests.common.test_dimensions import create_uncompressed_text_dimension class TestReusePartitions(ImpalaTestSuite): """Tests for catalogd reusing unchanged partition instances for DDL/DMLs""" - JSON_TABLE_OBJECT_URL = "http://localhost:25020/catalog_object?" \ - "json&object_type=TABLE&object_name={0}.{1}" - @classmethod def get_workload(self): return 'functional-query' @@ -40,19 +37,6 @@ class TestReusePartitions(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) - def __get_partition_id_set(self, db_name, tbl_name): - obj_url = self.JSON_TABLE_OBJECT_URL.format(db_name, tbl_name) - response = requests.get(obj_url) - assert response.status_code == requests.codes.ok - json_response = json.loads(response.text) - assert "json_string" in json_response, json_response - catalog_obj = json.loads(json_response["json_string"]) - assert "table" in catalog_obj, catalog_obj - assert "hdfs_table" in catalog_obj["table"], catalog_obj["table"] - tbl_obj = catalog_obj["table"]["hdfs_table"] - assert "partitions" in tbl_obj, tbl_obj - return set(tbl_obj["partitions"].keys()) - def test_reuse_partitions_nontransactional(self, unique_database): self.__test_reuse_partitions_helper(unique_database, transactional=False) @@ -73,22 +57,22 @@ class TestReusePartitions(ImpalaTestSuite): self.client.execute(create_tbl_ddl) self.client.execute("insert into %s.%s partition (p) values (1, 1), (2, 2), (3, 3)" % (unique_database, tbl_name)) - part_ids = self.__get_partition_id_set(unique_database, tbl_name) + part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(part_ids) == 3 # REFRESH can reuse the existing partition instances. self.client.execute("refresh %s.%s" % (unique_database, tbl_name)) - assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids + assert self.get_partition_id_set(unique_database, tbl_name) == part_ids # INSERT query that only touches one partition will reuse the other partitions. self.client.execute("insert into %s.%s partition (p) values (1, 1)" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(part_ids.intersection(new_part_ids)) == 2 part_ids = new_part_ids # INSERT query that adds a new partition will reuse the existing partitions. self.client.execute("insert into %s.%s partition(p) values (4, 4)" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(part_ids.intersection(new_part_ids)) == 3 part_ids = new_part_ids @@ -97,31 +81,31 @@ class TestReusePartitions(ImpalaTestSuite): # ALTER statements that don't touch data will reuse the existing partitions. self.client.execute("alter table %s.%s set tblproperties('numRows'='4')" % (unique_database, tbl_name)) - assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids + assert self.get_partition_id_set(unique_database, tbl_name) == part_ids self.client.execute("alter table %s.%s add column name string" % (unique_database, tbl_name)) - assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids + assert self.get_partition_id_set(unique_database, tbl_name) == part_ids self.client.execute("alter table %s.%s drop column name" % (unique_database, tbl_name)) - assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids + assert self.get_partition_id_set(unique_database, tbl_name) == part_ids # ALTER statements that modify a partition will reuse other partitions. self.client.execute("alter table %s.%s add partition (p=5)" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(new_part_ids) == 5 assert len(part_ids.intersection(new_part_ids)) == 4 self.client.execute("alter table %s.%s drop partition (p=5)" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert part_ids == new_part_ids # Updating stats will also update partition stats so no instances can be reused. self.client.execute("compute stats %s.%s" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(new_part_ids) == 4 assert len(part_ids.intersection(new_part_ids)) == 0 self.client.execute("compute incremental stats %s.%s" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(new_part_ids) == 4 assert len(part_ids.intersection(new_part_ids)) == 0 part_ids = new_part_ids @@ -130,5 +114,5 @@ class TestReusePartitions(ImpalaTestSuite): # Drop incremental stats of one partition can reuse the other 3 partitions. self.client.execute("drop incremental stats %s.%s partition (p=1)" % (unique_database, tbl_name)) - new_part_ids = self.__get_partition_id_set(unique_database, tbl_name) + new_part_ids = self.get_partition_id_set(unique_database, tbl_name) assert len(part_ids.intersection(new_part_ids)) == 3 diff --git a/tests/metadata/test_testcase_builder.py b/tests/metadata/test_testcase_builder.py index eea71a308..d3050b19d 100644 --- a/tests/metadata/test_testcase_builder.py +++ b/tests/metadata/test_testcase_builder.py @@ -20,6 +20,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) +from tests.util.event_processor_utils import EventProcessorUtils from tests.util.filesystem_utils import get_fs_path @@ -44,23 +45,45 @@ class TestTestcaseBuilder(ImpalaTestSuite): """Verify the basic usage. Use a unique database so the import won't impact the metadata used by other tests""" self.client.execute( - "create table {0}.alltypes like functional.alltypes".format(unique_database)) + "create table {0}.alltypes partitioned by (`year`, `month`) as " + "select * from functional.alltypes".format(unique_database)) self.client.execute( "create view {0}.alltypes_view as select * from {0}.alltypes" .format(unique_database)) + original_part_ids = self.get_partition_id_set(unique_database, "alltypes") + # Test SELECT on a view. The view will be expanded and the underlying table will also # be exported. - self._test_export_and_import(1, 1, 1, - "select count(*) from {0}.alltypes_view".format(unique_database)) + self._test_export_and_import( + 1, 1, 1, + "select count(*) from {0}.alltypes_view".format(unique_database), + [unique_database + ".alltypes"], [unique_database + ".alltypes_view"]) + + res = self.execute_query( + "show partitions {0}.alltypes".format(unique_database), + {"PLANNER_TESTCASE_MODE": True}) + # 24 partitions with a Total line + assert len(res.data) == 25 + + res = self.execute_query( + "show files in {0}.alltypes".format(unique_database), + {"PLANNER_TESTCASE_MODE": True}) + # Each partition shoule have exactly one file + assert len(res.data) == 24 + + # Verify the partition ids changed + new_part_ids = self.get_partition_id_set(unique_database, "alltypes") + assert new_part_ids != original_part_ids - def _test_export_and_import(self, num_dbs, num_tbls, num_views, query): + def _test_export_and_import(self, num_dbs, num_tbls, num_views, query, + tbls=(), views=()): tmp_path = get_fs_path("/tmp") # Make sure /tmp dir exists if not self.filesystem_client.exists(tmp_path): self.filesystem_client.make_dir(tmp_path) # Generate Testcase Data for query without table reference testcase_generate_query = "COPY TESTCASE TO '%s' %s" % (tmp_path, query) - result = self.execute_query_expect_success(self.client, testcase_generate_query) + result = self.execute_query(testcase_generate_query) assert len(result.data) == 1, "Testcase builder wrong result: {0}".format(result.data) # Check file exists @@ -70,14 +93,29 @@ class TestTestcaseBuilder(ImpalaTestSuite): assert self.filesystem_client.exists(hdfs_path), \ "File not generated {0}".format(hdfs_path) - try: - # Test load testcase works - testcase_load_query = "COPY TESTCASE FROM {0}".format(testcase_path) - result = self.execute_query_expect_success(self.client, testcase_load_query) - expected_msg = "{0} db(s), {1} table(s) and {2} view(s) imported for query".format( - num_dbs, num_tbls, num_views) - assert expected_msg in result.get_data() - finally: - # Delete testcase file from tmp - status = self.filesystem_client.delete_file_dir(hdfs_path) - assert status, "Delete generated testcase file failed with {0}".format(status) + result = self.execute_query("explain " + query, {"explain_level": 2}) + original_plan = result.data + + # Remove the original tables and views to make sure we use the imported ones. + for t in tbls: + self.execute_query("drop table if exists " + t) + for v in views: + self.execute_query("drop view if exists " + v) + # Make sure the DROP events are processed so the imported ones won't be dropped + # by them + EventProcessorUtils.wait_for_event_processing(self) + + # Test load testcase works + testcase_load_query = "COPY TESTCASE FROM {0}".format(testcase_path) + result = self.execute_query(testcase_load_query) + expected_msg = "{0} db(s), {1} table(s) and {2} view(s) imported for query".format( + num_dbs, num_tbls, num_views) + assert expected_msg in result.get_data() + + result = self.execute_query("explain " + query, {"explain_level": 2}) + new_plan = result.data + assert new_plan == original_plan + + # Only delete testcase file if everything works fine. + status = self.filesystem_client.delete_file_dir(hdfs_path) + assert status, "Delete generated testcase file failed with {0}".format(status)
