This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0140a15a044e719e052e2fedfe01efe81f65fe06 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Tue May 14 18:40:04 2024 -0700 IMPALA-12680: Fix NullPointerException during AlterTableAddPartitions When global INVALIDATE METADATA is run at the same time while AlterTableAddPartition statement is being run, a precondition check in addHmsPartitions() could lead to NullPointerException. This happens due to Map<String, Long> partitionToEventId being initialized to null when event processor is not active. We should always initialize 'partitionToEventId' to empty hash map regardless of the state of event processor. If the event processor is not active, then addHmsPartitions() adds partitions that are directly fetched from metastore. Note: Also, addressed the same issue that could potentially happen in AlterTableRecoverPartitions. Testing: - Verified manually that NullPointerException scenario is avoided. - Added a unit test to verify the above use case. Change-Id: I730fed311ebc09762dccc152d9583d5394b0b9b3 Reviewed-on: http://gerrit.cloudera.org:8080/21430 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/service/CatalogOpExecutor.java | 28 ++++++++------ .../java/org/apache/impala/util/DebugUtils.java | 3 ++ .../events/MetastoreEventsProcessorTest.java | 45 +++++++++++++++++++++- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 6b5ffa01d..17210b657 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -145,6 +145,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.events.MetastoreEventsProcessor; +import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus; import org.apache.impala.catalog.events.MetastoreNotificationException; import org.apache.impala.catalog.monitor.CatalogMonitor; import org.apache.impala.catalog.monitor.CatalogOperationTracker; @@ -1270,7 +1271,7 @@ public class CatalogOpExecutor { format = params.getSet_file_format_params().file_format; } alterTableAddPartitions(tbl, params.getAdd_partition_params(), format, - catalogTimeline, modification); + catalogTimeline, modification, debugAction); reloadMetadata = false; responseSummaryMsg = "New partition has been added to the table."; break; @@ -4600,8 +4601,8 @@ public class CatalogOpExecutor { */ private void alterTableAddPartitions(Table tbl, TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat, - EventSequence catalogTimeline, InProgressTableModification modification) - throws ImpalaException { + EventSequence catalogTimeline, InProgressTableModification modification, + String debugAction) throws ImpalaException { Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); TableName tableName = tbl.getTableName(); @@ -4639,10 +4640,10 @@ public class CatalogOpExecutor { List<Partition> difference = null; try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { - Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ? - Maps.newHashMap() : null; + Map<String, Long> partitionToEventId = Maps.newHashMap(); List<Partition> addedHmsPartitions = addHmsPartitionsInTransaction(msClient, - tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, catalogTimeline); + tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, catalogTimeline, + debugAction); // Handle HDFS cache. This is done in a separate round bacause we have to apply // caching only to newly added partitions. alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions, @@ -5299,7 +5300,7 @@ public class CatalogOpExecutor { */ private List<Partition> addHmsPartitions(MetaStoreClient msClient, Table tbl, List<Partition> allHmsPartitionsToAdd, - @Nullable Map<String, Long> partitionToEventId, boolean ifNotExists, + Map<String, Long> partitionToEventId, boolean ifNotExists, EventSequence catalogTimeline) throws ImpalaRuntimeException, CatalogException { long eventId = getCurrentEventId(msClient, catalogTimeline); List<Partition> addedHmsPartitions = Lists @@ -5332,7 +5333,6 @@ public class CatalogOpExecutor { // add_partitions call above. addedHmsPartitions.addAll(addedPartitions); } else { - Preconditions.checkNotNull(partitionToEventId); addedHmsPartitions.addAll(partitionToEventSubMap.keySet()); // we cannot keep a mapping of Partition to event ids because the // partition objects are changed later in the cachePartitions code path. @@ -5361,8 +5361,12 @@ public class CatalogOpExecutor { */ private List<Partition> addHmsPartitionsInTransaction(MetaStoreClient msClient, Table tbl, List<Partition> partitions, Map<String, Long> partitionToEventId, - boolean ifNotExists, EventSequence catalogTimeline) throws ImpalaException { + boolean ifNotExists, EventSequence catalogTimeline, String debugAction) + throws ImpalaException { if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) { + if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) { + catalog_.startEventsProcessor(); + } return addHmsPartitions(msClient, tbl, partitions, partitionToEventId, ifNotExists, catalogTimeline); } @@ -6302,10 +6306,12 @@ public class CatalogOpExecutor { } // Add partitions to metastore. - Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ? - Maps.newHashMap() : null; + Map<String, Long> partitionToEventId = Maps.newHashMap(); String annotation = String.format("Recovering %d partitions for %s", hmsPartitions.size(), tbl.getFullName()); + if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) { + catalog_.startEventsProcessor(); + } try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation); MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { List<Partition> addedPartitions = addHmsPartitions(msClient, tbl, hmsPartitions, diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index f1b150fbc..67b05e4e4 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -83,6 +83,9 @@ public class DebugUtils { // debug action label for introducing delay in loading table metadata. public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay"; + // debug action to enable eventProcessor + public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor"; + /** * Returns true if the label of action is set in the debugActions */ diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 69b8a084d..22fd09fb5 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -3962,6 +3962,40 @@ public class MetastoreEventsProcessorTest { } } + @Test + public void testAlterTableWithEpDisabled() throws Exception { + try { + createDatabaseFromImpala(TEST_DB_NAME, null); + String testTable = "testAlterTableNoError"; + createTableFromImpala(TEST_DB_NAME, testTable, true); + eventsProcessor_.processEvents(); + // set EP to paused state and execute Alter table add partition query + eventsProcessor_.pause(); + long numberOfSelfEventsBefore = + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount(); + TPartitionDef partitionDef = new TPartitionDef(); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "100")); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200")); + alterTableAddPartition(TEST_DB_NAME, testTable, partitionDef, + "enable_event_processor"); + eventsProcessor_.processEvents(); + assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus()); + long numberOfSelfEventsAfter = + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount(); + // expect ADD_PARTITION event to be skipped as self-event + assertEquals("Unexpected self events skipped: ", numberOfSelfEventsAfter, + numberOfSelfEventsBefore + 1); + } catch (NullPointerException ex) { + throw new CatalogException("Exception occured while applying AlterTableEvent", ex); + } finally { + if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) { + eventsProcessor_.start(); + } + } + } + private void createDatabase(String catName, String dbName, Map<String, String> params) throws TException { try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) { @@ -4259,8 +4293,17 @@ public class MetastoreEventsProcessorTest { */ private void alterTableAddPartition( String dbName, String tblName, TPartitionDef partitionDef) throws ImpalaException { + alterTableAddPartition(dbName, tblName,partitionDef, null); + } + + private void alterTableAddPartition(String dbName, String tblName, + TPartitionDef partitionDef, String debugActions) throws ImpalaException { TDdlExecRequest req = new TDdlExecRequest(); - req.setQuery_options(new TDdlQueryOptions()); + TDdlQueryOptions queryOptions = new TDdlQueryOptions(); + if (debugActions != null) { + queryOptions.setDebug_action(debugActions); + } + req.setQuery_options(queryOptions); req.setDdl_type(TDdlType.ALTER_TABLE); TAlterTableParams alterTableParams = new TAlterTableParams(); alterTableParams.setTable_name(new TTableName(dbName, tblName));
