This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b9c2e00a6ba1eaf1ed3f63013be60408409152b6 Author: stiga-huang <[email protected]> AuthorDate: Fri Mar 1 13:08:31 2024 +0800 IMPALA-12855: Fix NPE in firing RELOAD events when the partition doesn't exist When --enable_reload_events is set to true, catalogd will fire RELOAD events for INVALIDATE/REFRESH statements. When the RELOAD event is fired successfully for a REFRESH statement, we also update lastRefreshEventId of the table/partition. This part could hit NullPointerException when the partition is dropped by concurrent DDLs. This patch ignores updating lastRefreshEventId if the partition doesn't exists. Note that ideally we should hold the table lock of REFRESH until finish firing the RELOAD events and updating lastRefreshEventId. So no concurrent operations can drop the partition. However, when the table is loaded from scratch, we don't actually hold the table write lock. We just load the table and take a read lock to get the thrift object. The partition could still be dropped concurrently after the load and before taking the read lock. So ignoring missing partitions is a simpler solution. Refactors some codes of fireReloadEventAndUpdateRefreshEventId to save some indention and avoid acquiring table lock if no events are fired. Adds error messages in some Precondition checks in methods used by this feature. Also refactors Table.getFullName() to not always constructing the result. Improves logs of not reloading a partition for an event. Tests: - Add e2e test Change-Id: I01af3624bf7cf5cd69935cffa28d54f6a6807504 Reviewed-on: http://gerrit.cloudera.org:8080/21096 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 2 +- .../java/org/apache/impala/catalog/FeFsTable.java | 10 ++- .../main/java/org/apache/impala/catalog/Table.java | 4 +- .../impala/catalog/events/MetastoreEvents.java | 7 +- .../apache/impala/service/CatalogOpExecutor.java | 82 ++++++++++++---------- tests/custom_cluster/test_events_custom_configs.py | 27 +++++++ 6 files changed, 86 insertions(+), 46 deletions(-) diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 2f11da0c3..c004f8b9a 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -545,7 +545,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { LOG.error("FireEventResponse does not have event ids set for table {}.{}. This " + "may cause the table to unnecessarily be refreshed when the " + "refresh/invalidate event is received.", dbName, tableName); - return Collections.EMPTY_LIST; + return Collections.emptyList(); } return response.getEventIds(); } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java index 47847348f..e5853251f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java @@ -595,7 +595,7 @@ public interface FeFsTable extends FeTable { Set<String> keys = new HashSet<>(); for (FieldSchema fs: table.getMetaStoreTable().getPartitionKeys()) { for (TPartitionKeyValue kv: partitionSpec) { - if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) { + if (fs.getName().equalsIgnoreCase(kv.getName())) { targetValues.add(kv.getValue()); // Same key was specified twice if (!keys.add(kv.getName().toLowerCase())) { @@ -615,7 +615,9 @@ public interface FeFsTable extends FeTable { // match the values being searched for. for (PrunablePartition partition: table.getPartitions()) { List<LiteralExpr> partitionValues = partition.getPartitionValues(); - Preconditions.checkState(partitionValues.size() == targetValues.size()); + Preconditions.checkState(partitionValues.size() == targetValues.size(), + "Partition values not match in table %s: %s != %s", + table.getFullName(), partitionValues.size(), targetValues.size()); boolean matchFound = true; for (int i = 0; i < targetValues.size(); ++i) { String value; @@ -623,7 +625,9 @@ public interface FeFsTable extends FeTable { value = table.getNullPartitionKeyValue(); } else { value = partitionValues.get(i).getStringValue(); - Preconditions.checkNotNull(value); + Preconditions.checkNotNull(value, + "Got null string from non-null partition value of table %s: i=%s", + table.getFullName(), i); // See IMPALA-252: we deliberately map empty strings on to // NULL when they're in partition columns. This is for // backwards compatibility with Hive, and is clearly broken. diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 15ec5784b..4dbd4fcfe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -83,6 +83,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { protected org.apache.hadoop.hive.metastore.api.Table msTable_; protected final Db db_; protected final String name_; + protected final String full_name_; protected final String owner_; protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE; // Lock protecting this table. A read lock must be table when we are serializing @@ -224,6 +225,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { msTable_ = msTable; db_ = db; name_ = name.toLowerCase(); + full_name_ = (db_ != null ? db_.getName() + "." : "") + name_; owner_ = owner; tableStats_ = new TTableStats(-1); tableStats_.setTotal_file_bytes(-1); @@ -850,7 +852,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { public String getName() { return name_; } @Override // FeTable - public String getFullName() { return (db_ != null ? db_.getName() + "." : "") + name_; } + public String getFullName() { return full_name_; } @Override // FeTable public TableName getTableName() { diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index b5b1f6db0..a344e2231 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1055,7 +1055,8 @@ public class MetastoreEvents { throws CatalogException { try { int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(), - dbName_, tblName_, partitions, reason, fileMetadataLoadOpts); + getEventType().toString(), dbName_, tblName_, partitions, reason, + fileMetadataLoadOpts); if (numPartsRefreshed > 0) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES) .inc(numPartsRefreshed); @@ -1098,8 +1099,8 @@ public class MetastoreEvents { FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException { try { int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsFromNamesIfExists( - getEventId(), dbName_, tblName_, partitionNames, reason, - fileMetadataLoadOpts); + getEventId(), getEventType().toString(), dbName_, tblName_, partitionNames, + reason, fileMetadataLoadOpts); if (numPartsRefreshed > 0) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES) .inc(numPartsRefreshed); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 3e620aa51..1e5aa1802 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -4717,7 +4717,8 @@ public class CatalogOpExecutor { * Reloads the given partitions if they exist and have not been removed since the event * was generated. * - * @param eventId EventId being processed. + * @param eventId EventId of the event being processed. + * @param eventType EventType of the event being processed. * @param dbName Database name for the partition * @param tblName Table name for the partition * @param partsFromEvent List of {@link Partition} objects from the events to be @@ -4727,8 +4728,8 @@ public class CatalogOpExecutor { * @return the number of partitions which were reloaded. If the table does not exist, * returns 0. Some partitions could be skipped if they don't exist anymore. */ - public int reloadPartitionsIfExist(long eventId, String dbName, String tblName, - List<Partition> partsFromEvent, String reason, + public int reloadPartitionsIfExist(long eventId, String eventType, String dbName, + String tblName, List<Partition> partsFromEvent, String reason, FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException { List<String> partNames = new ArrayList<>(); Table table = catalog_.getTable(dbName, tblName); @@ -4739,8 +4740,8 @@ public class CatalogOpExecutor { part.getValues(), null)); } } - return reloadPartitionsFromNamesIfExists(eventId, dbName, tblName, partNames, - reason, fileMetadataLoadOpts); + return reloadPartitionsFromNamesIfExists(eventId, eventType, dbName, tblName, + partNames, reason, fileMetadataLoadOpts); } /** @@ -4756,8 +4757,8 @@ public class CatalogOpExecutor { * @return the number of partitions which were reloaded. If the table does not exist, * returns 0. Some partitions could be skipped if they don't exist anymore. */ - public int reloadPartitionsFromNamesIfExists (long eventId, String dbName, - String tblName, List<String> partNames, String reason, + public int reloadPartitionsFromNamesIfExists(long eventId, String eventType, + String dbName, String tblName, List<String> partNames, String reason, FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException { Table table = catalog_.getTable(dbName, tblName); if (table == null) { @@ -4765,9 +4766,8 @@ public class CatalogOpExecutor { .getDeleteEventLog(); if (deleteEventLog .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, tblName))) { - LOG.info( - "Not reloading the partition of table {} since it was removed " - + "later in catalog", new TableName(dbName, tblName)); + LOG.info("EventId: {} EventType: {} Not reloading the partition of table {}.{} " + + "since it was removed later in catalog", eventId, eventType, dbName, tblName); return 0; } else { throw new TableNotFoundException( @@ -4775,8 +4775,8 @@ public class CatalogOpExecutor { } } if (table instanceof IncompleteTable) { - LOG.info("Table {} is not loaded. Skipping drop partition event {}", - table.getFullName(), eventId); + LOG.info("Table {} is not loaded. Skipping {} event {}", + table.getFullName(), eventType, eventId); return 0; } if (!(table instanceof HdfsTable)) { @@ -4791,9 +4791,9 @@ public class CatalogOpExecutor { long newCatalogVersion = catalog_.incrementAndGetCatalogVersion(); catalog_.getLock().writeLock().unlock(); if (syncToLatestEventId && table.getLastSyncedEventId() >= eventId) { - LOG.info("Not reloading partition from event id: {} since table {} is already " - + "synced till event id {}", eventId, table.getFullName(), - table.getLastSyncedEventId()); + LOG.info("EventId: {} EventType: {} Not reloading partition since table {} is " + + "already synced till event id {}", + eventId, eventType, table.getFullName(), table.getLastSyncedEventId()); return 0; } HdfsTable hdfsTable = (HdfsTable) table; @@ -4808,8 +4808,8 @@ public class CatalogOpExecutor { hdfsTable.setCatalogVersion(newCatalogVersion); return numOfPartsReloaded; } catch (TableLoadingException e) { - LOG.info("Could not reload {} partitions of table {}", partNames.size(), - table.getFullName(), e); + LOG.info("EventId: {} EventType: {} Could not reload {} partitions of table {}", + eventId, eventType, partNames.size(), table.getFullName(), e); } catch (InternalException e) { errorOccured = true; throw new CatalogException( @@ -6668,8 +6668,9 @@ public class CatalogOpExecutor { Reference<Boolean> dbWasAdded = new Reference<Boolean>(false); // Thrift representation of the result of the invalidate/refresh operation. TCatalogObject updatedThriftTable = null; - TableName tblName = TableName.fromThrift(req.getTable_name()); + // Result table of the invalidate/refresh operation. Table tbl = null; + TableName tblName = TableName.fromThrift(req.getTable_name()); if (!req.isIs_refresh()) { // For INVALIDATE METADATA <db>.<table>, the db might be unloaded. // So we can't update 'tbl' here. @@ -6786,39 +6787,44 @@ public class CatalogOpExecutor { * Helper class for refresh event. * This class invokes metastore shim's fireReloadEvent to fire event to HMS * and update the last refresh event id in the cache - * @param req - request object for TResetMetadataRequest. - * @param tblName - * @param tbl */ private void fireReloadEventAndUpdateRefreshEventId( TResetMetadataRequest req, TableName tblName, Table tbl) { List<String> partVals = null; if (req.isSetPartition_spec()) { partVals = req.getPartition_spec().stream(). - map(partSpec -> partSpec.getValue()).collect(Collectors.toList()); + map(TPartitionKeyValue::getValue).collect(Collectors.toList()); } try { List<Long> eventIds = MetastoreShim.fireReloadEventHelper( catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(), tblName.getTbl(), Collections.emptyMap()); - if (req.isIs_refresh()) { - if (catalog_.tryLock(tbl, true, 600000)) { - if (!eventIds.isEmpty()) { - if (req.isSetPartition_spec()) { - HdfsPartition partition = ((HdfsTable) tbl) - .getPartitionFromThriftPartitionSpec(req.getPartition_spec()); - HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); - partBuilder.setLastRefreshEventId(eventIds.get(0)); - ((HdfsTable) tbl).updatePartition(partBuilder); - } else { - tbl.setLastRefreshEventId(eventIds.get(0)); - } - } + LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(), + tbl.getFullName(), StringUtils.join(",", eventIds)); + // Update the lastRefreshEventId accordingly + if (!req.isIs_refresh() || eventIds.isEmpty()) return; + if (!catalog_.tryLock(tbl, true, 600000)) { + LOG.warn("Couldn't obtain a version lock for the table: {}. " + + "Self events may go undetected in that case", + tbl.getFullName()); + return; + } + if (req.isSetPartition_spec()) { + HdfsTable hdfsTbl = (HdfsTable) tbl; + HdfsPartition partition = hdfsTbl + .getPartitionFromThriftPartitionSpec(req.getPartition_spec()); + if (partition != null) { + HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); + partBuilder.setLastRefreshEventId(eventIds.get(0)); + hdfsTbl.updatePartition(partBuilder); } else { - LOG.warn(String.format("Couldn't obtain a version lock for the table: %s. " + - "Self events may go undetected in that case", - tbl.getName())); + LOG.warn("Partition {} no longer exists in table {}. It might be " + + "dropped by a concurrent operation.", + FeCatalogUtils.getPartitionName(hdfsTbl, partVals), + hdfsTbl.getFullName()); } + } else { + tbl.setLastRefreshEventId(eventIds.get(0)); } } catch (TException | CatalogException e) { LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index eda2e6163..dafcb5854 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -21,6 +21,7 @@ from os import getenv import pytest +from beeswaxd.BeeswaxService import QueryState from hive_metastore.ttypes import FireEventRequest from hive_metastore.ttypes import FireEventRequestData from hive_metastore.ttypes import InsertEventRequestData @@ -327,6 +328,32 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): EventProcessorUtils.wait_for_event_processing(self) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" + @CustomClusterTestSuite.with_args( + catalogd_args="--enable_reload_events=true") + def test_reload_events_with_transient_partitions(self, unique_database): + tbl = unique_database + ".tbl" + create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl) + add_part_stmt = "alter table {} add if not exists partition(p=0)".format(tbl) + drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl) + refresh_stmt = "refresh {} partition(p=0)".format(tbl) + end_states = [self.client.QUERY_STATES['FINISHED'], + self.client.QUERY_STATES['EXCEPTION']] + + self.execute_query(create_stmt) + self.execute_query(add_part_stmt) + # Run REFRESH partition in the background so we can drop the partition concurrently. + refresh_handle = self.client.execute_async(refresh_stmt) + # Before IMPALA-12855, REFRESH usually fails in 2-3 rounds. + for i in range(100): + self.execute_query(drop_part_stmt) + refresh_state = self.wait_for_any_state(refresh_handle, end_states, 10) + assert refresh_state == self.client.QUERY_STATES['FINISHED'],\ + "REFRESH state: {}. Error log: {}".format( + QueryState._VALUES_TO_NAMES[refresh_state], + self.client.get_log(refresh_handle)) + self.execute_query(add_part_stmt) + refresh_handle = self.client.execute_async(refresh_stmt) + @CustomClusterTestSuite.with_args( catalogd_args="--hms_event_polling_interval_s=10" " --enable_skipping_older_events=true"
