[ https://issues.apache.org/jira/browse/IMPALA-12831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844280#comment-17844280 ]
ASF subversion and git services commented on IMPALA-12831: ---------------------------------------------------------- Commit 5d32919f46117213249c60574f77e3f9bb66ed90 in impala's branch refs/heads/branch-4.4.0 from stiga-huang [ https://gitbox.apache.org/repos/asf?p=impala.git;h=5d32919f4 ] IMPALA-13009: Fix catalogd not sending deletion updates for some dropped partitions *Background* Since IMPALA-3127, catalogd sends incremental partition updates based on the last sent table snapshot ('maxSentPartitionId_' to be specific). Dropped partitions since the last catalog update are tracked in 'droppedPartitions_' of HdfsTable. When catalogd collects the next catalog update, they will be collected. HdfsTable then clears the set. See details in CatalogServiceCatalog#addHdfsPartitionsToCatalogDelta(). If an HdfsTable is invalidated, it's replaced with an IncompleteTable which doesn't track any partitions. The HdfsTable object is then added to the deleteLog so catalogd can send deletion updates for all its partitions. The same if the HdfsTable is dropped. However, the previously dropped partitions are not collected in this case, which results in a leak in the catalog topic if the partition name is not reused anymore. Note that in the catalog topic, the key of a partition update consists of the table name and the partition name. So if the partition is added back to the table, the topic key will be reused then resolves the leak. The leak will be observed when a coordinator restarts. In the initial catalog update sent from statestore, coordinator will find some partition updates that are not referenced by the HdfsTable (assuming the table is used again after the INVALIDATE). Then a Precondition check fails and the table is not added to the coordinator. *Overview of the patch* This patch fixes the leak by also collecting the dropped partitions when adding the HdfsTable to the deleteLog. A new field, dropped_partitions, is added in THdfsTable to collect them. It's only used when catalogd collects catalog updates. Removes the Precondition check in coordinator and just reports the stale partitions since IMPALA-12831 could also introduce them. Also adds a log line in CatalogOpExecutor.alterTableDropPartition() to show the dropped partition names for better diagnostics. Tests - Added e2e tests Change-Id: I12a68158dca18ee48c9564ea16b7484c9f5b5d21 Reviewed-on: http://gerrit.cloudera.org:8080/21326 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> (cherry picked from commit ee21427d26620b40d38c706b4944d2831f84f6f5) > HdfsTable.toMinimalTCatalogObject() should hold table read lock to generate > incremental updates > ----------------------------------------------------------------------------------------------- > > Key: IMPALA-12831 > URL: https://issues.apache.org/jira/browse/IMPALA-12831 > Project: IMPALA > Issue Type: Bug > Components: Catalog > Affects Versions: Impala 4.0.0, Impala 4.1.0, Impala 4.2.0, Impala 4.1.1, > Impala 4.1.2, Impala 4.3.0 > Reporter: Quanlong Huang > Assignee: Quanlong Huang > Priority: Critical > Fix For: Impala 4.4.0 > > > When enable_incremental_metadata_updates=true (default), catalogd sends > incremental partition updates to coordinators, which goes into > HdfsTable.toMinimalTCatalogObject(): > {code:java} > public TCatalogObject toMinimalTCatalogObject() { > TCatalogObject catalogObject = super.toMinimalTCatalogObject(); > if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) { > return catalogObject; > } > catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE); > THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), > nullPartitionKeyValue_, nullColumnValue_, > /*idToPartition=*/ new HashMap<>(), > /*prototypePartition=*/ new THdfsPartition()); > for (HdfsPartition part : partitionMap_.values()) { > hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition()); > } > hdfsTable.setHas_full_partitions(false); > // The minimal catalog object of partitions contain the partition names. > hdfsTable.setHas_partition_names(true); > catalogObject.getTable().setHdfs_table(hdfsTable); > return catalogObject; > }{code} > Accessing table fields without holding the table read lock might be failed by > concurrent DDLs. All workloads that use this method (e.g. INVALIDATE > commands) could hit this issue. We've saw event-processor failed in > processing a RELOAD event that want to invalidates an HdfsTable: > {noformat} > E0216 16:23:44.283689 253 MetastoreEventsProcessor.java:899] Unexpected > exception received while processing event > Java exception follows: > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911) > at java.util.ArrayList$Itr.next(ArrayList.java:861) > at org.apache.impala.catalog.Column.toColumnNames(Column.java:148) > at org.apache.impala.catalog.Table.getColumnNames(Table.java:844) > at > org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132) > at > org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2221) > at > org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2202) > at > org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2797) > at > org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.processTableInvalidate(MetastoreEvents.java:2734) > at > org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.process(MetastoreEvents.java:2656) > at > org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.processIfEnabled(MetastoreEvents.java:522) > at > org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1052) > at > org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:881) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750){noformat} > I can reproduce the issue using the following test: > {code:python} > @CustomClusterTestSuite.with_args( > catalogd_args="--enable_incremental_metadata_updates=true") > def test_concurrent_invalidate_metadata_with_refresh(self, unique_database): > # Create a wide table with some partitions > tbl = unique_database + ".wide_tbl" > create_stmt = "create table {} (".format(tbl) > for i in range(600): > create_stmt += "col{} int, ".format(i) > create_stmt += "col600 int) partitioned by (p int) stored as textfile" > self.execute_query(create_stmt) > for i in range(10): > self.execute_query("alter table {} add partition (p={})".format(tbl, i)) > refresh_stmt = "refresh " + tbl > handle = self.client.execute_async(refresh_stmt) > for i in range(10): > self.execute_query("invalidate metadata " + tbl) > # Always keep a concurrent REFRESH statement running > if self.client.get_state(handle) == > self.client.QUERY_STATES['FINISHED']: > handle = self.client.execute_async(refresh_stmt){code} > and see a similar exception: > {noformat} > E0222 10:44:40.912338 6833 JniUtil.java:183] > da4099ef24bb1f03:01c8f5d200000000] Error in INVALIDATE TABLE > test_concurrent_invalidate_metadata_with_refresh_65c57cb0.wide_tbl issued by > quanlong. Time spent: 32ms > I0222 10:44:40.912528 6833 jni-util.cc:302] > da4099ef24bb1f03:01c8f5d200000000] java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911) > at java.util.ArrayList$Itr.next(ArrayList.java:861) > at org.apache.impala.catalog.Column.toColumnNames(Column.java:148) > at org.apache.impala.catalog.Table.getColumnNames(Table.java:875) > at > org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132) > at > org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2264) > at > org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2245) > at > org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2840) > at > org.apache.impala.service.CatalogOpExecutor.execResetMetadataImpl(CatalogOpExecutor.java:6676) > at > org.apache.impala.service.CatalogOpExecutor.execResetMetadata(CatalogOpExecutor.java:6612) > at > org.apache.impala.service.JniCatalog.lambda$resetMetadata$4(JniCatalog.java:327) > at > org.apache.impala.service.JniCatalogOp.lambda$execAndSerialize$1(JniCatalogOp.java:90) > at org.apache.impala.service.JniCatalogOp.execOp(JniCatalogOp.java:58) > at > org.apache.impala.service.JniCatalogOp.execAndSerialize(JniCatalogOp.java:89) > at > org.apache.impala.service.JniCatalogOp.execAndSerialize(JniCatalogOp.java:100) > at > org.apache.impala.service.JniCatalog.execAndSerialize(JniCatalog.java:243) > at > org.apache.impala.service.JniCatalog.execAndSerialize(JniCatalog.java:257) > at > org.apache.impala.service.JniCatalog.resetMetadata(JniCatalog.java:326){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org For additional commands, e-mail: issues-all-h...@impala.apache.org