Repository: impala Updated Branches: refs/heads/2.x fb876f7e3 -> e7d48a3da
IMPALA-6131: Track time of last statistics update in metadata The timestamp of the last COMPUTE STATS operation is saved to table property "impala.lastComputeStatsTime". The format is the same as in "transient_lastDdlTime", so the two can be compared to check if the schema has changed since computing statistics. Other changes: - Handling of "transient_lastDdlTime" is simplified - the old logic set it to current time + 1, if the old version was >= current time, to ensure that it is always increased by DDL operations. This was useful in the past, as IMPALA-387 used lastDdlTime to check if partition data needs to be reloaded, but since IMPALA-1480, Impala does not rely on lastDdlTime at all. - Computing / setting stats on HDFS tables no longer increases "transient_lastDdlTime". - When Kudu tables are (re)loaded, it is checked if their HMS representation is up to date, and if it is, then IMetaStoreClient.alter_table() is not called. The old logic always called alter_table() after loading metadata from Kudu. This change was needed to ensure that "transient_lastDdlTime" works similarly in HDFS and Kudu tables, and should also make (re)loading Kudu tables faster. Notes: - Kudu will be able to sync its tables to HMS in the near future (see KUDU-2191), so the Kudu metadata handling in Impala may need to be redesigned. Testing: tests/metadata/test_last_ddl_time_update.py is extended by - also checking "impala.lastComputeStatsTime" - testing more SQL statements - tests for Kudu tables Note that test_last_ddl_time_update.py is ran only in exhaustive testing. Change-Id: Ibda49725d3e76456f2d1b3edd1bf117b0174e234 Reviewed-on: http://gerrit.cloudera.org:8080/10484 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c98c01c5 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c98c01c5 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c98c01c5 Branch: refs/heads/2.x Commit: c98c01c55d7f6af7e536347986c5b22841bc78e7 Parents: fb876f7 Author: Csaba Ringhofer <[email protected]> Authored: Tue Mar 20 13:29:33 2018 +0100 Committer: Impala Public Jenkins <[email protected]> Committed: Thu May 24 03:59:48 2018 +0000 ---------------------------------------------------------------------- .../org/apache/impala/analysis/ToSqlUtils.java | 4 +- .../impala/catalog/CatalogServiceCatalog.java | 29 --- .../org/apache/impala/catalog/HdfsTable.java | 18 +- .../org/apache/impala/catalog/KuduTable.java | 12 +- .../java/org/apache/impala/catalog/Table.java | 30 +-- .../impala/service/CatalogOpExecutor.java | 92 +++----- tests/metadata/test_last_ddl_time_update.py | 228 ++++++++++++++----- 7 files changed, 233 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index a3c084d..a4129c4 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -224,8 +224,8 @@ public class ToSqlUtils { org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); // Use a LinkedHashMap to preserve the ordering of the table properties. LinkedHashMap<String, String> properties = Maps.newLinkedHashMap(msTable.getParameters()); - if (properties.containsKey("transient_lastDdlTime")) { - properties.remove("transient_lastDdlTime"); + if (properties.containsKey(Table.TBL_PROP_LAST_DDL_TIME)) { + properties.remove(Table.TBL_PROP_LAST_DDL_TIME); } boolean isExternal = msTable.getTableType() != null && msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString()); http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 20a3e2f..19006ae 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -1401,35 +1401,6 @@ public class CatalogServiceCatalog extends Catalog { } /** - * Returns the table parameter 'transient_lastDdlTime', or -1 if it's not set. - * TODO: move this to a metastore helper class. - */ - public static long getLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl) { - Preconditions.checkNotNull(msTbl); - Map<String, String> params = msTbl.getParameters(); - String lastDdlTimeStr = params.get("transient_lastDdlTime"); - if (lastDdlTimeStr != null) { - try { - return Long.parseLong(lastDdlTimeStr); - } catch (NumberFormatException e) {} - } - return -1; - } - - /** - * Updates the cached lastDdlTime for the given table. The lastDdlTime is used during - * the metadata refresh() operations to determine if there have been any external - * (outside of Impala) modifications to the table. - */ - public void updateLastDdlTime(TTableName tblName, long ddlTime) { - Db db = getDb(tblName.getDb_name()); - if (db == null) return; - Table tbl = db.getTable(tblName.getTable_name()); - if (tbl == null) return; - tbl.updateLastDdlTime(ddlTime); - } - - /** * Renames a table. Equivalent to an atomic drop + add of the table. Returns * a pair of tables containing the removed table (or null if the table drop was not * successful) and the new table (or null if either the drop of the old one or the http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index b309156..9910eb8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -1232,12 +1232,10 @@ public class HdfsTable extends Table { * * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore. * - * There are several cases where existing file descriptors might be reused incorrectly: - * 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through - * Hive. This does not update the lastDdlTime. - * 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update - * the mtime (file modification time). - * If any of these occur, user has to execute "invalidate metadata" to invalidate the + * Existing file descriptors might be reused incorrectly if Hdfs rebalancer was + * executed, as it changes the block locations but doesn't update the mtime (file + * modification time). + * If this occurs, user has to execute "invalidate metadata" to invalidate the * metadata cache of the table and trigger a fresh load. */ public void load(boolean reuseMetadata, IMetaStoreClient client, @@ -1339,13 +1337,13 @@ public class HdfsTable extends Table { * any, or for all the table partitions if 'partitionsToUpdate' is null. */ private void updatePartitionsFromHms(IMetaStoreClient client, - Set<String> partitionsToUpdate, boolean loadParitionFileMetadata) + Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata) throws Exception { if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName()); org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); Preconditions.checkNotNull(msTbl); Preconditions.checkState(msTbl.getPartitionKeysSize() != 0); - Preconditions.checkState(loadParitionFileMetadata || partitionsToUpdate == null); + Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null); // Retrieve all the partition names from the Hive Metastore. We need this to // identify the delta between partitions of the local HdfsTable and the table entry @@ -1378,7 +1376,7 @@ public class HdfsTable extends Table { // list and loading them from the Hive Metastore. dirtyPartitions.add(partition); } else { - if (partitionsToUpdate == null && loadParitionFileMetadata) { + if (partitionsToUpdate == null && loadPartitionFileMetadata) { Path partitionPath = partition.getLocationPath(); List<HdfsPartition> partitions = partitionsToUpdateFileMdByPath.get(partitionPath); @@ -1412,7 +1410,7 @@ public class HdfsTable extends Table { // Load file metadata. Until we have a notification mechanism for when a // file changes in hdfs, it is sometimes required to reload all the file // descriptors and block metadata of a table (e.g. REFRESH statement). - if (loadParitionFileMetadata) { + if (loadPartitionFileMetadata) { if (partitionsToUpdate != null) { // Only reload file metadata of partitions specified in 'partitionsToUpdate' Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty()); http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index 8296ed0..5d6a10d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -219,7 +219,8 @@ public class KuduTable extends Table { final Timer.Context context = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); try { - msTable_ = msTbl; + // Copy the table to check later if anything has changed. + msTable_ = msTbl.deepCopy(); kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME); Preconditions.checkNotNull(kuduTableName_); kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS); @@ -234,10 +235,15 @@ public class KuduTable extends Table { kuduTableName_, e); } + // Avoid updating HMS if the schema didn't change. + if (msTable_.equals(msTbl)) return; + // Update the table schema in HMS. try { - long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_); - msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime)); + // HMS would fill this table property if it was not set, but as metadata written + // with alter_table is not read back in case of Kudu tables, it has to be set here + // to ensure that HMS and catalogd have the same timestamp. + updateTimestampProperty(msTable_, TBL_PROP_LAST_DDL_TIME); msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_); http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index aca9409..1584ca1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -97,9 +97,6 @@ public abstract class Table extends CatalogObjectImpl { // Type of this table (array of struct) that mirrors the columns. Useful for analysis. protected final ArrayType type_ = new ArrayType(new StructType()); - // The lastDdlTime for this table; -1 if not set - protected long lastDdlTime_; - // True if this object is stored in an Impalad catalog cache. protected boolean storedInImpaladCatalogCache_ = false; @@ -109,14 +106,19 @@ public abstract class Table extends CatalogObjectImpl { public static final String ALTER_DURATION_METRIC = "alter-duration"; public static final String LOAD_DURATION_METRIC = "load-duration"; + // Table property key for storing the time of the last DDL operation. + public static final String TBL_PROP_LAST_DDL_TIME = "transient_lastDdlTime"; + + // Table property key for storing the last time when Impala executed COMPUTE STATS. + public static final String TBL_PROP_LAST_COMPUTE_STATS_TIME = + "impala.lastComputeStatsTime"; + protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { msTable_ = msTable; db_ = db; name_ = name.toLowerCase(); owner_ = owner; - lastDdlTime_ = (msTable_ != null) ? - CatalogServiceCatalog.getLastDdlTime(msTable_) : -1; tableStats_ = new TTableStats(-1); tableStats_.setTotal_file_bytes(-1); initMetrics(); @@ -185,16 +187,6 @@ public abstract class Table extends CatalogObjectImpl { ((StructType) type_.getItemType()).clearFields(); } - /** - * Updates the lastDdlTime for this Table, if the new value is greater - * than the existing value. Does nothing if the new value is less than - * or equal to the existing value. - */ - public void updateLastDdlTime(long ddlTime) { - // Ensure the lastDdlTime never goes backwards. - if (ddlTime > lastDdlTime_) lastDdlTime_ = ddlTime; - } - // Returns a list of all column names for this table which we expect to have column // stats in the HMS. This exists because, when we request the column stats from HMS, // including a column name that does not have stats causes the @@ -586,4 +578,12 @@ public abstract class Table extends CatalogObjectImpl { if (!(obj instanceof Table)) return false; return getFullName().equals(((Table) obj).getFullName()); } + + /** + * Updates a table property with the current system time in seconds precision. + */ + public static void updateTimestampProperty( + org.apache.hadoop.hive.metastore.api.Table msTbl, String propertyKey) { + msTbl.putToParameters(propertyKey, Long.toString(System.currentTimeMillis() / 1000)); + } } http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index ea67389..82422ab 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -711,7 +711,7 @@ public class CatalogOpExecutor { if (LOG.isTraceEnabled()) { LOG.trace(String.format("Altering view %s", tableName)); } - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { tbl.load(true, msClient.getHiveClient(), msTbl); } @@ -729,6 +729,8 @@ public class CatalogOpExecutor { * in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'. * This function is used by COMPUTE STATS, COMPUTE INCREMENTAL STATS and * ALTER TABLE SET COLUMN STATS. + * Updates table property 'impala.lastComputeStatsTime' for COMPUTE (INCREMENTAL) STATS, + * but not for ALTER TABLE SET COLUMN STATS. * Returns the number of updated partitions and columns in 'numUpdatedPartitions' * and 'numUpdatedColumns', respectively. */ @@ -780,10 +782,15 @@ public class CatalogOpExecutor { bulkAlterPartitions(table.getDb().getName(), table.getName(), modifiedParts); } - // Update table row count and total file bytes. Apply table alteration to HMS last to - // ensure the lastDdlTime is as accurate as possible. - if (params.isSetTable_stats()) updateTableStats(params, msTbl); - applyAlterTable(msTbl); + if (params.isSetTable_stats()) { + // Update table row count and total file bytes. + updateTableStats(params, msTbl); + // Set impala.lastComputeStatsTime just before alter_table to ensure that it is as + // accurate as possible. + Table.updateTimestampProperty(msTbl, HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME); + } + + applyAlterTable(msTbl, false); numUpdatedPartitions.setRef(Long.valueOf(0)); if (modifiedParts != null) { @@ -1232,7 +1239,7 @@ public class CatalogOpExecutor { boolean droppedTotalSize = msTbl.getParameters().remove(StatsSetupConst.TOTAL_SIZE) != null; if (droppedRowCount || droppedTotalSize) { - applyAlterTable(msTbl); + applyAlterTable(msTbl, false); ++numTargetedPartitions; } @@ -1778,7 +1785,7 @@ public class CatalogOpExecutor { cacheOp.getCache_pool_name(), replication); catalog_.watchCacheDirs(Lists.<Long>newArrayList(id), new TTableName(newTable.getDbName(), newTable.getTableName())); - applyAlterTable(newTable); + applyAlterTable(newTable, true); } Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName()); addTableToCatalogUpdate(newTbl, response.result); @@ -1968,7 +1975,7 @@ public class CatalogOpExecutor { msTbl.getSd().addToCols(fs); } } - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); } /** @@ -2005,7 +2012,7 @@ public class CatalogOpExecutor { "Column name %s not found in table %s.", colName, tbl.getFullName())); } } - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); } /** @@ -2276,7 +2283,7 @@ public class CatalogOpExecutor { String alteredColumns = MetaStoreUtil.removeValueFromCsvList(oldColumns, colName); msTbl.getParameters().put(sortByKey, alteredColumns); } - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); } /** @@ -2337,7 +2344,7 @@ public class CatalogOpExecutor { // The default partition must be updated if the file format is changed so that new // partitions are created with the new file format. if (tbl instanceof HdfsTable) ((HdfsTable) tbl).addDefaultPartition(msTbl.getSd()); - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); reloadFileMetadata = true; } else { Preconditions.checkArgument(tbl instanceof HdfsTable); @@ -2376,7 +2383,7 @@ public class CatalogOpExecutor { // The default partition must be updated if the row format is changed so that new // partitions are created with the new file format. ((HdfsTable) tbl).addDefaultPartition(msTbl.getSd()); - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); reloadFileMetadata = true; } else { List<HdfsPartition> partitions = @@ -2418,7 +2425,7 @@ public class CatalogOpExecutor { tbl.getMetaStoreTable().deepCopy(); if (msTbl.getPartitionKeysSize() == 0) reloadFileMetadata = true; msTbl.getSd().setLocation(location); - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); } else { TableName tableName = tbl.getTableName(); HdfsPartition partition = catalog_.getHdfsPartition( @@ -2503,7 +2510,7 @@ public class CatalogOpExecutor { throw new UnsupportedOperationException( "Unknown target TTablePropertyType: " + params.getTarget()); } - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); } } @@ -2630,7 +2637,7 @@ public class CatalogOpExecutor { } // Update the table metadata. - applyAlterTable(msTbl); + applyAlterTable(msTbl, true); return loadFileMetadata; } @@ -2869,13 +2876,19 @@ public class CatalogOpExecutor { * command if the metadata is not completely in-sync. This affects both Hive and * Impala, but is more important in Impala because the metadata is cached for a * longer period of time. + * If 'setLastDdlTime' is true, then table property 'transient_lastDdlTime' is updated + * to the current time. */ - private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl) + private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl, + boolean setLastDdlTime) throws ImpalaRuntimeException { - long lastDdlTime = -1; try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { - lastDdlTime = calculateDdlTime(msTbl); - msTbl.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime)); + if (setLastDdlTime) { + // It would be enough to remove this table property, as HMS would fill it, but + // this would make it necessary to reload the table after alter_table in order to + // remain consistent with HMS. + Table.updateTimestampProperty(msTbl, Table.TBL_PROP_LAST_DDL_TIME); + } // Avoid computing/setting stats on the HMS side because that may reset the // 'numRows' table property (see HIVE-15653). The DO_NOT_UPDATE_STATS flag // tells the HMS not to recompute/reset any statistics on its own. Any @@ -2886,9 +2899,6 @@ public class CatalogOpExecutor { } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e); - } finally { - catalog_.updateLastDdlTime( - new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime); } } @@ -2906,7 +2916,6 @@ public class CatalogOpExecutor { try { MetastoreShim.alterPartitions( msClient.getHiveClient(), tableName.getDb(), tableName.getTbl(), hmsPartitions); - updateLastDdlTime(msTbl, msClient); } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e); @@ -3111,42 +3120,6 @@ public class CatalogOpExecutor { return fsList; } - /** - * Sets the table parameter 'transient_lastDdlTime' to System.currentTimeMillis()/1000 - * in the given msTbl. 'transient_lastDdlTime' is guaranteed to be changed. - * If msClient is not null then this method applies alter_table() to update the - * Metastore. Otherwise, the caller is responsible for the final update. - */ - private long updateLastDdlTime(org.apache.hadoop.hive.metastore.api.Table msTbl, - MetaStoreClient msClient) throws MetaException, NoSuchObjectException, TException { - Preconditions.checkNotNull(msTbl); - if (LOG.isTraceEnabled()) { - LOG.trace("Updating lastDdlTime for table: " + msTbl.getTableName()); - } - Map<String, String> params = msTbl.getParameters(); - long lastDdlTime = calculateDdlTime(msTbl); - params.put("transient_lastDdlTime", Long.toString(lastDdlTime)); - msTbl.setParameters(params); - if (msClient != null) { - msClient.getHiveClient().alter_table( - msTbl.getDbName(), msTbl.getTableName(), msTbl); - } - catalog_.updateLastDdlTime( - new TTableName(msTbl.getDbName(), msTbl.getTableName()), lastDdlTime); - return lastDdlTime; - } - - /** - * Calculates the next transient_lastDdlTime value. - */ - public static long calculateDdlTime( - org.apache.hadoop.hive.metastore.api.Table msTbl) { - long existingLastDdlTime = CatalogServiceCatalog.getLastDdlTime(msTbl); - long currentTime = System.currentTimeMillis() / 1000; - if (existingLastDdlTime == currentTime) ++currentTime; - return currentTime; - } - /** * Executes a TResetMetadataRequest and returns the result as a * TResetMetadataResponse. Based on the request parameters, this operation @@ -3269,7 +3242,6 @@ public class CatalogOpExecutor { * If the insert touched any pre-existing partitions that were cached, a request to * watch the associated cache directives will be submitted. This will result in an * async table refresh once the cache request completes. - * Updates the lastDdlTime of the table if new partitions were created. */ public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update) throws ImpalaException { http://git-wip-us.apache.org/repos/asf/impala/blob/c98c01c5/tests/metadata/test_last_ddl_time_update.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_last_ddl_time_update.py b/tests/metadata/test_last_ddl_time_update.py index b1bcfbb..e9b48fc 100644 --- a/tests/metadata/test_last_ddl_time_update.py +++ b/tests/metadata/test_last_ddl_time_update.py @@ -21,7 +21,7 @@ import time from tests.common.impala_test_suite import ImpalaTestSuite from tests.util.filesystem_utils import WAREHOUSE, IS_S3 -# Checks that ALTER and INSERT statements update the last DDL time of the modified table. +# Checks different statements' effect on last DDL time and last compute stats time. class TestLastDdlTimeUpdate(ImpalaTestSuite): @classmethod @@ -41,82 +41,188 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): # to regress here. cls.ImpalaTestMatrix.add_constraint(lambda v: False) + # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by + # storing common arguments as members and substituting table name and HDFS warehouse + # path to the query string. + class TestHelper: + def __init__(self, test_suite, db_name, tbl_name): + self.test_suite = test_suite + self.db_name = db_name + self.tbl_name = tbl_name + self.fq_tbl_name = "%s.%s" % (self.db_name, self.tbl_name) + + def expect_no_time_change(self, query): + """Neither transient_lastDdlTime or impala.lastComputeStatsTime should be + changed by running the query. + The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s" + """ + self.run_test(query, False, False) + + def expect_ddl_time_change(self, query): + """Running the query should increase transient_lastDdlTime but + not impala.lastComputeStatsTime. + The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s" + """ + self.run_test(query, True, False) + + def expect_stat_time_change(self, query): + """Running the query should increase impala.lastComputeStatsTime but + not transient_lastDdlTime. + The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s" + """ + self.run_test(query, False, True) + + def run_test(self, query, expect_changed_ddl_time, expect_changed_stats_time): + """ + Runs the query and compares the last ddl/compute stats time before and after + executing the query. If expect_changed_ddl_time or expect_changed_stat_time + is true, then we expect the given table property to be increased, otherwise we + expect it to be unchanged. + The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s" + """ + + HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime" + LAST_COMPUTE_STATS_TIME_KEY = "impala.lastComputeStatsTime" + + # Get timestamps before executing query. + table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name) + assert table is not None + beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY] + beforeStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY] + # HMS uses a seconds granularity on the last ddl time - sleeping 1100 ms should be + # enough to ensure that the new timestamps are strictly greater than the old ones. + time.sleep (1.1) + + self.test_suite.execute_query(query % + {'TBL': self.fq_tbl_name, 'WAREHOUSE': WAREHOUSE}) + + # Get timestamps after executing query. + table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name) + afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY] + afterStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY] + + if expect_changed_ddl_time: + # check that the new ddlTime is strictly greater than the old one. + assert long(afterDdlTime) > long(beforeDdlTime) + else: + assert long(afterDdlTime) == long(beforeDdlTime) + + if expect_changed_stats_time: + # check that the new statsTime is strictly greater than the old one. + assert long(afterStatsTime) > long(beforeStatsTime) + else: + assert long(afterStatsTime) == long(beforeStatsTime) + def test_alter(self, vector, unique_database): TBL_NAME = "alter_test_tbl" FQ_TBL_NAME = unique_database + "." + TBL_NAME + self.execute_query("create external table %s (i int) " "partitioned by (j int, s string)" % FQ_TBL_NAME) + # compute statistics to fill table property impala.lastComputeStatsTime + self.execute_query("compute stats %s" % FQ_TBL_NAME) + + h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME) + # add/drop partitions - self.run_test("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME, - unique_database, TBL_NAME, False) - self.run_test("alter table %s add if not exists " - "partition (j=1, s='2012')" % FQ_TBL_NAME, - unique_database, TBL_NAME, False) - self.run_test("alter table %s drop partition (j=1, s='2012')" % FQ_TBL_NAME, - unique_database, TBL_NAME, False) - self.run_test("alter table %s drop if exists " - "partition (j=2, s='2012')" % FQ_TBL_NAME, - unique_database, TBL_NAME, False) - # rename columns - self.run_test("alter table %s change column i k int" % FQ_TBL_NAME, - unique_database, TBL_NAME, True) - self.run_test("alter table %s change column k i int" % FQ_TBL_NAME, - unique_database, TBL_NAME, True) + h.expect_no_time_change("alter table %(TBL)s add partition (j=1, s='2012')") + h.expect_no_time_change("alter table %(TBL)s add if not exists " + "partition (j=1, s='2012')") + h.expect_no_time_change("alter table %(TBL)s drop partition (j=1, s='2012')") + h.expect_no_time_change("alter table %(TBL)s drop if exists " + "partition (j=2, s='2012')") # change location of table - self.run_test("alter table %s set location " - "'%s'" % (FQ_TBL_NAME, WAREHOUSE), - unique_database, TBL_NAME, True) + h.expect_ddl_time_change("alter table %(TBL)s set location '%(WAREHOUSE)s'") # change format of table - self.run_test("alter table %s set fileformat textfile" % FQ_TBL_NAME, - unique_database, TBL_NAME, True) + h.expect_ddl_time_change("alter table %(TBL)s set fileformat textfile") + + self.run_common_test_cases(h) + + # prepare for incremental statistics tests + self.execute_query("drop stats %s" % FQ_TBL_NAME) + self.execute_query("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME) + + # compute incremental statistics + h.expect_stat_time_change("compute incremental stats %(TBL)s") + h.expect_stat_time_change("compute incremental stats %(TBL)s " + "partition (j=1, s='2012')") + + # drop incremental statistics + h.expect_no_time_change("drop incremental stats %(TBL)s partition (j=1, s='2012')") + + # prepare for table sample statistics tests + self.execute_query( + "alter table %s set tblproperties ('impala.enable.stats.extrapolation'='true')" + % FQ_TBL_NAME) + + # compute sampled statistics + h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)") + def test_insert(self, vector, unique_database): TBL_NAME = "insert_test_tbl" FQ_TBL_NAME = unique_database + "." + TBL_NAME self.execute_query("create external table %s (i int) " "partitioned by (j int, s string)" % FQ_TBL_NAME) + + # initialize compute stats time + self.execute_query("compute stats %s" % FQ_TBL_NAME) + + h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME) + # static partition insert - self.run_test("insert into %s partition(j=1, s='2012') " - "select 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False) + h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') select 10") # dynamic partition insert - self.run_test("insert into %s partition(j, s) " - "select 10, 2, '2013'" % FQ_TBL_NAME, unique_database, TBL_NAME, False) + h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 10, 2, '2013'") + # dynamic partition insert changing no partitions (empty input) - self.run_test("insert into %s partition(j, s) " - "select * from (select 10 as i, 2 as j, '2013' as s) as t " - "where t.i < 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False) + h.expect_no_time_change("insert into %(TBL)s partition(j, s) " + "select * from (select 10 as i, 2 as j, '2013' as s) as t " + "where t.i < 10") # dynamic partition insert modifying an existing partition - self.run_test("insert into %s partition(j, s) " - "select 20, 1, '2012'" % FQ_TBL_NAME, unique_database, TBL_NAME, False) - - def run_test(self, query, db_name, table_name, expect_changed): - """ - Runs the given query (expected to be an ALTER or INSERT statement) - and compares the last ddl time before and after executing the query. - If expect_change is true then we expect the last ddl time to increase, - otherwise we expect no change. - """ - - HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime" - - # Get last DDL time before executing query. - table = self.hive_client.get_table(db_name, table_name) - assert table is not None - beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY] - - # Sleep for 2s to make sure the new ddlTime is strictly greater than the old one. - # Hive uses a seconds granularity on the last ddl time. - time.sleep (2) - - self.execute_query(query) - - # Get last ddl time after executing query . - table = self.hive_client.get_table(db_name, table_name) - afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY] - - if expect_changed: - # check that the new ddlTime is strictly greater than the old one. - assert long(afterDdlTime) > long(beforeDdlTime) - else: - assert long(afterDdlTime) == long(beforeDdlTime) + h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 20, 1, '2012'") + + def test_kudu(self, vector, unique_database): + TBL_NAME = "kudu_test_tbl" + FQ_TBL_NAME = unique_database + "." + TBL_NAME + self.execute_query("create table %s (i int primary key) " + "partition by hash(i) partitions 3 stored as kudu" % FQ_TBL_NAME) + + # initialize last compute stats time + self.execute_query("compute stats %s" % FQ_TBL_NAME) + + h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME) + + # insert + h.expect_no_time_change("insert into %s values (1)" % FQ_TBL_NAME) + + self.run_common_test_cases(h) + + # Tests that should behave the same with HDFS and Kudu tables. + def run_common_test_cases(self, test_helper): + h = test_helper + # rename columns + h.expect_ddl_time_change("alter table %(TBL)s change column i k int") + h.expect_ddl_time_change("alter table %(TBL)s change column k i int") + # change table property + h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')") + + # changing table statistics manually + h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('numRows'='1')") + + # changing column statistics manually + h.expect_no_time_change("alter table %(TBL)s set column stats i ('numdvs'='3')") + + # compute statistics + h.expect_stat_time_change("compute stats %(TBL)s") + # compute statistics for a single column + h.expect_stat_time_change("compute stats %(TBL)s (i)") + + # drop statistics + h.expect_no_time_change("drop stats %(TBL)s") + + # invalidate metadata and reload table + self.execute_query("invalidate metadata %s" % h.fq_tbl_name) + # run any query to reload the table + h.expect_no_time_change("describe %(TBL)s")
