Quanlong Huang created IMPALA-12831:
---------------------------------------
Summary: HdfsTable.toMinimalTCatalogObject() should hold table
read lock to generate incremental updates
Key: IMPALA-12831
URL: https://issues.apache.org/jira/browse/IMPALA-12831
Project: IMPALA
Issue Type: Bug
Components: Catalog
Reporter: Quanlong Huang
Assignee: Quanlong Huang
When enable_incremental_metadata_updates=true (default), catalogd sends
incremental partition updates to coordinators, which goes into
HdfsTable.toMinimalTCatalogObject():
{code:java}
public TCatalogObject toMinimalTCatalogObject() {
TCatalogObject catalogObject = super.toMinimalTCatalogObject();
if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
return catalogObject;
}
catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
nullPartitionKeyValue_, nullColumnValue_,
/*idToPartition=*/ new HashMap<>(),
/*prototypePartition=*/ new THdfsPartition());
for (HdfsPartition part : partitionMap_.values()) {
hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
}
hdfsTable.setHas_full_partitions(false);
// The minimal catalog object of partitions contain the partition names.
hdfsTable.setHas_partition_names(true);
catalogObject.getTable().setHdfs_table(hdfsTable);
return catalogObject;
}{code}
Accessing table fields without holding the table read lock might be failed by
concurrent DDLs. We've saw event-processor failed in processing a RELOAD event
that want to invalidates an HdfsTable:
{noformat}
E0216 16:23:44.283689 253 MetastoreEventsProcessor.java:899] Unexpected
exception received while processing event
Java exception follows:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at org.apache.impala.catalog.Column.toColumnNames(Column.java:148)
at org.apache.impala.catalog.Table.getColumnNames(Table.java:844)
at
org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132)
at
org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2221)
at
org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2202)
at
org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2797)
at
org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.processTableInvalidate(MetastoreEvents.java:2734)
at
org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.process(MetastoreEvents.java:2656)
at
org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.processIfEnabled(MetastoreEvents.java:522)
at
org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1052)
at
org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:881)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750){noformat}
I can reproduce the issue using the following test:
{code:python}
@CustomClusterTestSuite.with_args(
catalogd_args="--enable_incremental_metadata_updates=true")
def test_concurrent_invalidate_metadata_with_refresh(self, unique_database):
# Create a wide table with some partitions
tbl = unique_database + ".wide_tbl"
create_stmt = "create table {} (".format(tbl)
for i in range(600):
create_stmt += "col{} int, ".format(i)
create_stmt += "col600 int) partitioned by (p int) stored as textfile"
self.execute_query(create_stmt)
for i in range(10):
self.execute_query("alter table {} add partition (p={})".format(tbl, i))
refresh_stmt = "refresh " + tbl
handle = self.client.execute_async(refresh_stmt)
for i in range(10):
self.execute_query("invalidate metadata " + tbl)
# Always keep a concurrent REFRESH statement running
if self.client.get_state(handle) == self.client.QUERY_STATES['FINISHED']:
handle = self.client.execute_async(refresh_stmt){code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]