This is an automated email from the ASF dual-hosted git repository. bharathv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5ed6c665d190dbe5303e241afbc50e0eacb0a6af Author: Bharath Vissapragada <[email protected]> AuthorDate: Sun Feb 10 17:03:19 2019 -0800 IMPALA-7961: Avoid adding unmodified objects to DDL response When a DDL is processed, we typically add the affected (added/removed) objects to the response TCatalogUpdateResult struct. This response is processed on the coordinator and the changes are applied locally. When SYNC_DDL is enabled, the Catalog server also includes a topic version number that should include all the affected objects so that the coordinator can wait for that miniumum topic version to be applied on all other coordinators before returning the control back to the user. This covering topic version is calculated by looking at the topic update log, which contains all the in-flight updates (and to an extent past updates) that are perodically GC'ed. Bug: In certain cases like CREATE TBL IF NOT EXISTS, we could end up adding objects to the DDL response which haven't been modified in a while (> TOPIC_UPDATE_LOG_GC_FREQUENCY) and hence could be potentially GC'ed from the TopicUpdateLog. This means that the Catalog server wouldn't be able to find a covering topic update version and eventually gives up throwing an error as described in the jira. Fix: Bumps the version of any objects that already exists when IF EXISTS is used in conjunction with SYNC_DDL. This makes sure that the object is included in the upcoming topic updates and waitForSyncDdlVersion() can find a covering topic update that includes this object. This is a hack and could cause false-positive invalidations, but definitely better than breaking SYNC_DDL semantics. Also added some additional diagnostic logging that could've simplified debugging an issue like this. Testing: Since this is a racy bug, I could only repro it by forcing frequent topic update log GCs along with a specific sequence of actions. Couldn't reproduce it with the patch. Change-Id: If3e914b70ba796c9b224e9dea559b8c40aa25d83 Reviewed-on: http://gerrit.cloudera.org:8080/12428 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 11 ++- .../org/apache/impala/catalog/FeCatalogUtils.java | 15 ++++ .../org/apache/impala/catalog/TopicUpdateLog.java | 16 ++--- .../apache/impala/service/CatalogOpExecutor.java | 81 +++++++++++++++++----- 4 files changed, 92 insertions(+), 31 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 274ca35..0904714 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -80,7 +80,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest; import org.apache.impala.util.FunctionUtils; import org.apache.impala.util.PatternMatcher; import org.apache.impala.util.SentryProxy; -import org.apache.log4j.Logger; +import org.slf4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; @@ -91,6 +91,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.slf4j.LoggerFactory; /** @@ -171,7 +172,7 @@ import com.google.common.collect.Sets; * loading thread pool. */ public class CatalogServiceCatalog extends Catalog { - public static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class); + public static final Logger LOG = LoggerFactory.getLogger(CatalogServiceCatalog.class); private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10; private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2; @@ -1279,7 +1280,7 @@ public class CatalogServiceCatalog extends Catalog { tableLoadingMgr_.backgroundLoad(tblName); } } catch (Exception e) { - LOG.error(e); + LOG.error("Error initializing Catalog", e); throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e); } finally { versionLock_.writeLock().unlock(); @@ -2237,6 +2238,10 @@ public class CatalogServiceCatalog extends Catalog { if (lastSentTopicUpdate != currentTopicUpdate) { ++numAttempts; if (numAttempts > maxNumAttempts) { + LOG.error(String.format("Couldn't retrieve the covering topic version for " + + "catalog objects. Updated objects: %s, deleted objects: %s", + FeCatalogUtils.debugString(result.updated_catalog_objects), + FeCatalogUtils.debugString(result.removed_catalog_objects))); throw new CatalogException("Couldn't retrieve the catalog topic version " + "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." + "The operation has been successfully executed but its effects may have " + diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java index d10dadc..7e55d73 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java @@ -40,6 +40,7 @@ import org.apache.impala.catalog.local.CatalogdMetaProvider; import org.apache.impala.catalog.local.LocalCatalog; import org.apache.impala.catalog.local.MetaProvider; import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TColumnDescriptor; import org.apache.impala.thrift.TGetCatalogMetricsResult; import org.apache.impala.thrift.THdfsPartition; @@ -387,4 +388,18 @@ public abstract class FeCatalogUtils { } + /** + * Returns a debug string for a given list of TCatalogObjects. Includes the unique key + * and version number for each object. + */ + public static String debugString(List<TCatalogObject> objects) { + if (objects == null || objects.size() == 0) return "[]"; + List<String> catalogObjs = new ArrayList<>(); + for (TCatalogObject object: objects) { + catalogObjs.add(String.format("%s version: %d", + Catalog.toCatalogObjectKey(object), object.catalog_version)); + } + return "[" + Joiner.on(",").join(catalogObjs) + "]"; + } + } diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java index 779d8f7..be80b3b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java +++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java @@ -20,7 +20,8 @@ package org.apache.impala.catalog; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -37,7 +38,7 @@ import com.google.common.base.Strings; // by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from // the log. public class TopicUpdateLog { - private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class); + private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateLog.class); // Frequency at which the entries of the topic update log are garbage collected. // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates. private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000; @@ -104,9 +105,8 @@ public class TopicUpdateLog { return; } if (numTopicUpdatesToGc_ == 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Topic update log GC started."); - } + LOG.info("Topic update log GC started. GC-ing topics with versions " + + "<= {}", oldestTopicUpdateToGc_); Preconditions.checkState(oldestTopicUpdateToGc_ > 0); int numEntriesRemoved = 0; for (Map.Entry<String, Entry> entry: @@ -120,10 +120,8 @@ public class TopicUpdateLog { } numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY; oldestTopicUpdateToGc_ = lastTopicUpdateVersion; - if (LOG.isTraceEnabled()) { - LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved + - " entries."); - } + LOG.info("Topic update log GC finished. Removed {} entries.", + numEntriesRemoved); } else { --numTopicUpdatesToGc_; } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 137e682..19021b0 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -169,7 +169,7 @@ import org.apache.impala.util.CompressionUtil; import org.apache.impala.util.FunctionUtils; import org.apache.impala.util.HdfsCachingUtil; import org.apache.impala.util.MetaStoreUtil; -import org.apache.log4j.Logger; +import org.slf4j.Logger; import org.apache.thrift.TException; import com.codahale.metrics.Timer; @@ -179,6 +179,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.slf4j.LoggerFactory; /** * Class used to execute Catalog Operations, including DDL and refresh/invalidate @@ -249,7 +250,7 @@ import com.google.common.collect.Sets; * metastore out of this class. */ public class CatalogOpExecutor { - private static final Logger LOG = Logger.getLogger(CatalogOpExecutor.class); + private static final Logger LOG = LoggerFactory.getLogger(CatalogOpExecutor.class); // Format string for exceptions returned by Hive Metastore RPCs. private final static String HMS_RPC_ERROR_FORMAT_STR = "Error making '%s' RPC to Hive Metastore: "; @@ -280,6 +281,7 @@ public class CatalogOpExecutor { requestingUser = new User(ddlRequest.getHeader().getRequesting_user()); } + boolean syncDdl = ddlRequest.isSync_ddl(); switch (ddlRequest.ddl_type) { case ALTER_TABLE: alterTable(ddlRequest.getAlter_table_params(), response); @@ -291,14 +293,14 @@ public class CatalogOpExecutor { createDatabase(ddlRequest.getCreate_db_params(), response); break; case CREATE_TABLE_AS_SELECT: - response.setNew_table_created( - createTable(ddlRequest.getCreate_table_params(), response)); + response.setNew_table_created(createTable( + ddlRequest.getCreate_table_params(), response, syncDdl)); break; case CREATE_TABLE: - createTable(ddlRequest.getCreate_table_params(), response); + createTable(ddlRequest.getCreate_table_params(), response, syncDdl); break; case CREATE_TABLE_LIKE: - createTableLike(ddlRequest.getCreate_table_like_params(), response); + createTableLike(ddlRequest.getCreate_table_like_params(), response, syncDdl); break; case CREATE_VIEW: createView(ddlRequest.getCreate_view_params(), response); @@ -363,7 +365,7 @@ public class CatalogOpExecutor { // operation. The version of this catalog update is returned to the requesting // impalad which will wait until this catalog update has been broadcast to all the // coordinators. - if (ddlRequest.isSync_ddl()) { + if (syncDdl) { response.getResult().setVersion( catalog_.waitForSyncDdlVersion(response.getResult())); } @@ -1762,9 +1764,12 @@ public class CatalogOpExecutor { * lazily load the new metadata on the next access. If this is a managed Kudu table, * the table is also created in the Kudu storage engine. Re-throws any HMS or Kudu * exceptions encountered during the create. + * @param syncDdl tells if SYNC_DDL option is enabled on this DDL request. + * @return true if a new table has been created with the given params, false + * otherwise. */ - private boolean createTable(TCreateTableParams params, TDdlExecResponse response) - throws ImpalaException { + private boolean createTable(TCreateTableParams params, TDdlExecResponse response, + boolean syncDdl) throws ImpalaException { Preconditions.checkNotNull(params); TableName tableName = TableName.fromThrift(params.getTable_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); @@ -1774,18 +1779,36 @@ public class CatalogOpExecutor { Table existingTbl = catalog_.getTableNoThrow(tableName.getDb(), tableName.getTbl()); if (params.if_not_exists && existingTbl != null) { addSummary(response, "Table already exists."); - LOG.trace(String.format("Skipping table creation because %s already exists and " + - "IF NOT EXISTS was specified.", tableName)); - existingTbl.getLock().lock(); + LOG.trace("Skipping table creation because {} already exists and " + + "IF NOT EXISTS was specified.", tableName); + tryLock(existingTbl); try { - addTableToCatalogUpdate(existingTbl, response.getResult()); - return false; + if (syncDdl) { + // When SYNC_DDL is enabled and the table already exists, we force a version + // bump on it so that it is added to the next statestore update. Without this + // we could potentially be referring to a table object that has already been + // GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a + // covering topic version (IMPALA-7961). + // + // This is a conservative hack to not break the SYNC_DDL semantics and could + // possibly result in false-positive invalidates on this table. However, that is + // better than breaking the SYNC_DDL semantics and the subsequent queries + // referring to this table failing with "table not found" errors. + long newVersion = catalog_.incrementAndGetCatalogVersion(); + existingTbl.setCatalogVersion(newVersion); + LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.", + tableName, newVersion); + } + addTableToCatalogUpdate(existingTbl, response.result); } finally { + // Release the locks held in tryLock(). + catalog_.getLock().writeLock().unlock(); existingTbl.getLock().unlock(); } + return false; } org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params); - LOG.trace(String.format("Creating table %s", tableName)); + LOG.trace("Creating table {}", tableName); if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response); Preconditions.checkState(params.getColumns().size() > 0, "Empty column list given as argument to Catalog.createTable"); @@ -2015,8 +2038,10 @@ public class CatalogOpExecutor { * No data is copied as part of this process, it is a metadata only operation. If the * creation succeeds, an entry is added to the metadata cache to lazily load the new * table's metadata on the next access. + * @param syncDdl tells is SYNC_DDL is enabled for this DDL request. */ - private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response) + private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response + , boolean syncDdl) throws ImpalaException { Preconditions.checkNotNull(params); @@ -2033,13 +2058,31 @@ public class CatalogOpExecutor { addSummary(response, "Table already exists."); LOG.trace(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tblName)); - existingTbl.getLock().lock(); + tryLock(existingTbl); try { - addTableToCatalogUpdate(existingTbl, response.getResult()); - return; + if (syncDdl) { + // When SYNC_DDL is enabled and the table already exists, we force a version + // bump on it so that it is added to the next statestore update. Without this + // we could potentially be referring to a table object that has already been + // GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a + // covering topic version (IMPALA-7961). + // + // This is a conservative hack to not break the SYNC_DDL semantics and could + // possibly result in false-positive invalidates on this table. However, that is + // better than breaking the SYNC_DDL semantics and the subsequent queries + // referring to this table failing with "table not found" errors. + long newVersion = catalog_.incrementAndGetCatalogVersion(); + existingTbl.setCatalogVersion(newVersion); + LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.", + existingTbl.getFullName(), newVersion); + } + addTableToCatalogUpdate(existingTbl, response.result); } finally { + // Release the locks held in tryLock(). + catalog_.getLock().writeLock().unlock(); existingTbl.getLock().unlock(); } + return; } Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl()); org.apache.hadoop.hive.metastore.api.Table tbl =
