[
https://issues.apache.org/jira/browse/IMPALA-6671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563974#comment-16563974
]
Todd Lipcon commented on IMPALA-6671:
-------------------------------------
Been brainstorming a possible route to fixing this...
Currently, the flow for operations that mutate a table object is:
{code}
lock version number (write)
| lock table
| | increment catalog version number and remember the newVersion
|-+-unlock version number
|
| modify table in place
| set version number of table to newVersion
unlock table
{code}
The "update collector" thread looks something like:
{code}
fromVersion = last version sent out
lock versionLock
toVersion = get version number
unlock versionLock
foreach table:
if table version > toVersion:
continue (skip the table)
lock table (blocking here causes IMPALA-6671)
if the version number is within the range we are collecting, copy it as
thrift into the delta
unlock table
{code}
Essentially, when the update collector hits a table which is locked (being
updated) it doesn't know whether the "newVersion" that it will have post-lock
is within the range to be collected or not, because that "newVersion" is only
stored within a local variable of the mutating thread. Additionally, it can't
_affect_ the new version. If it were to just do a "trylock" and skip this table
on failure, then in the next collection loop, it would incorrectly determine
that it already sent this table, and we'd end up with a case where we miss
propagating an update -- no good.
So, I've been toying with an idea loosely inspired by CockroachDB's handling of
read-write conflicts. Consider the "write" transaction here to be the mutating
thread, and the "read" transaction to be the "collector" thread. In the current
design, the writing transaction is essentially assigning a timestamp at its
start. But, there is no requirement that it commit with that same timestamp
that it started with -- it would be equally happy with any higher timestamp.
So, what if we have the read transaction "push" the write transaction's commit
timestamp out of the range to be read, essentially deferring it into the next
collection loop. The flow would look something like:
Mutator:
{code}
lock version number (write)
| lock table
| | table.pendingWriteVersion = catalog.incrementVersionNumber()
|-+-unlock version number
|
| modify table in place
| table.setVersionNumber(table.pendingWriteVersion)
| table.pendingWriteVersion = -1
unlock table
{code}
Update collector:
{code}
fromVersion = last version sent out
lock versionLock
toVersion = get version number
unlock versionLock
foreach table:
if table version > toVersion:
continue (skip the table)
try to lock table:
if successful, handle normally
else:
# push the write transaction to commit in the next update
table.pendingWriteVersion = max(table.pendingWriteVersion, toVersion + 1)
{code}
The pseudocode as written above has some races, since we need to ensure that
the lock and the 'pendingWriteVersion', etc, are operated upon atomically, but
I think that's pretty solvable. We also need to have similar treatment of the
"max skipped updates" thing to ensure that a constantly-refreshed table
_eventually_ gets sent out, but we already have that code path today.
Thoughts?
> Metadata operations that modify a table blocks topic updates for other
> unrelated operations
> -------------------------------------------------------------------------------------------
>
> Key: IMPALA-6671
> URL: https://issues.apache.org/jira/browse/IMPALA-6671
> Project: IMPALA
> Issue Type: Bug
> Components: Catalog
> Affects Versions: Impala 2.10.0, Impala 2.11.0, Impala 2.12.0
> Reporter: Mostafa Mokhtar
> Priority: Critical
> Labels: catalog-server, perfomance
>
> Metadata operations that mutate the state of a table like "compute stats foo"
> or "alter recover partitions" block topic updates for read only operations
> against unrelated tables as "describe bar".
> Thread for blocked operation
> {code}
> "Thread-7" prio=10 tid=0x0000000011613000 nid=0x21b3b waiting on condition
> [0x00007f5f2ef52000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00007f6f57ff0240> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
> at
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
> at
> org.apache.impala.catalog.CatalogServiceCatalog.addTableToCatalogDeltaHelper(CatalogServiceCatalog.java:639)
> at
> org.apache.impala.catalog.CatalogServiceCatalog.addTableToCatalogDelta(CatalogServiceCatalog.java:611)
> at
> org.apache.impala.catalog.CatalogServiceCatalog.addDatabaseToCatalogDelta(CatalogServiceCatalog.java:567)
> at
> org.apache.impala.catalog.CatalogServiceCatalog.getCatalogDelta(CatalogServiceCatalog.java:449)
> at
> org.apache.impala.service.JniCatalog.getCatalogDelta(JniCatalog.java:126)
> {code}
> Thread for blocking operation
> {code}
> "Thread-130" prio=10 tid=0x00000000113d5800 nid=0x2499d runnable
> [0x00007f5ef80d0000]
> java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> - locked <0x00007f5fffcd9f18> (a java.io.BufferedInputStream)
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:346)
> at
> org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:423)
> at
> org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:405)
> at
> org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.hadoop.hive.thrift.TFilterTransport.readAll(TFilterTransport.java:62)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
> at
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_add_partitions_req(ThriftHiveMetastore.java:1639)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.add_partitions_req(ThriftHiveMetastore.java:1626)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.add_partitions(HiveMetaStoreClient.java:609)
> at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:101)
> at com.sun.proxy.$Proxy10.add_partitions(Unknown Source)
> at
> org.apache.impala.service.CatalogOpExecutor.alterTableRecoverPartitions(CatalogOpExecutor.java:2651)
> at
> org.apache.impala.service.CatalogOpExecutor.alterTable(CatalogOpExecutor.java:525)
> at
> org.apache.impala.service.CatalogOpExecutor.execDdlRequest(CatalogOpExecutor.java:262)
> at org.apache.impala.service.JniCatalog.execDdl(JniCatalog.java:146)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]