[ 
https://issues.apache.org/jira/browse/IMPALA-6671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563974#comment-16563974
 ] 

Todd Lipcon commented on IMPALA-6671:
-------------------------------------

Been brainstorming a possible route to fixing this...

Currently, the flow for operations that mutate a table object is:

{code}
lock version number (write)             
|  lock table                           
| |  increment catalog version number and remember the newVersion               
                
|-+-unlock version number               
  |                                     
  | modify table in place               
  | set version number of table to newVersion
  unlock table
{code}

The "update collector" thread looks something like:

{code}
fromVersion = last version sent out                 
lock versionLock                     
  toVersion = get version number        
unlock versionLock                   
                                        
foreach table:                          
  if table version > toVersion:
    continue (skip the table)
  lock table (blocking here causes IMPALA-6671)                          
    if the version number is within the range we are collecting, copy it as 
thrift into the delta
  unlock table
{code}

Essentially, when the update collector hits a table which is locked (being 
updated) it doesn't know whether the "newVersion" that it will have post-lock 
is within the range to be collected or not, because that "newVersion" is only 
stored within a local variable of the mutating thread. Additionally, it can't 
_affect_ the new version. If it were to just do a "trylock" and skip this table 
on failure, then in the next collection loop, it would incorrectly determine 
that it already sent this table, and we'd end up with a case where we miss 
propagating an update -- no good.

So, I've been toying with an idea loosely inspired by CockroachDB's handling of 
read-write conflicts. Consider the "write" transaction here to be the mutating 
thread, and the "read" transaction to be the "collector" thread. In the current 
design, the writing transaction is essentially assigning a timestamp at its 
start. But, there is no requirement that it commit with that same timestamp 
that it started with -- it would be equally happy with any higher timestamp.

So, what if we have the read transaction "push" the write transaction's commit 
timestamp out of the range to be read, essentially deferring it into the next 
collection loop. The flow would look something like:

Mutator:

{code}                                  

lock version number (write)             
|  lock table                           
| |  table.pendingWriteVersion = catalog.incrementVersionNumber()
|-+-unlock version number               
  |                                     
  | modify table in place               
  | table.setVersionNumber(table.pendingWriteVersion)
  | table.pendingWriteVersion = -1
  unlock table
{code}

Update collector:

{code}
fromVersion = last version sent out
lock versionLock
  toVersion = get version number
unlock versionLock

foreach table:
  if table version > toVersion:
    continue (skip the table)
  try to lock table:
    if successful, handle normally
  else:
    # push the write transaction to commit in the next update
    table.pendingWriteVersion = max(table.pendingWriteVersion, toVersion + 1)
{code}

The pseudocode as written above has some races, since we need to ensure that 
the lock and the 'pendingWriteVersion', etc, are operated upon atomically, but 
I think that's pretty solvable. We also need to have similar treatment of the 
"max skipped updates" thing to ensure that a constantly-refreshed table 
_eventually_ gets sent out, but we already have that code path today.

Thoughts?



> Metadata operations that modify a table blocks topic updates for other 
> unrelated operations
> -------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-6671
>                 URL: https://issues.apache.org/jira/browse/IMPALA-6671
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Catalog
>    Affects Versions: Impala 2.10.0, Impala 2.11.0, Impala 2.12.0
>            Reporter: Mostafa Mokhtar
>            Priority: Critical
>              Labels: catalog-server, perfomance
>
> Metadata operations that mutate the state of a table like "compute stats foo" 
> or "alter recover partitions" block topic updates for read only operations 
> against unrelated tables as "describe bar".
> Thread for blocked operation
> {code}
> "Thread-7" prio=10 tid=0x0000000011613000 nid=0x21b3b waiting on condition 
> [0x00007f5f2ef52000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f6f57ff0240> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
>         at 
> org.apache.impala.catalog.CatalogServiceCatalog.addTableToCatalogDeltaHelper(CatalogServiceCatalog.java:639)
>         at 
> org.apache.impala.catalog.CatalogServiceCatalog.addTableToCatalogDelta(CatalogServiceCatalog.java:611)
>         at 
> org.apache.impala.catalog.CatalogServiceCatalog.addDatabaseToCatalogDelta(CatalogServiceCatalog.java:567)
>         at 
> org.apache.impala.catalog.CatalogServiceCatalog.getCatalogDelta(CatalogServiceCatalog.java:449)
>         at 
> org.apache.impala.service.JniCatalog.getCatalogDelta(JniCatalog.java:126)
> {code}
> Thread for blocking operation 
> {code}
> "Thread-130" prio=10 tid=0x00000000113d5800 nid=0x2499d runnable 
> [0x00007f5ef80d0000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketInputStream.socketRead0(Native Method)
>         at java.net.SocketInputStream.read(SocketInputStream.java:152)
>         at java.net.SocketInputStream.read(SocketInputStream.java:122)
>         at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
>         at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
>         at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>         - locked <0x00007f5fffcd9f18> (a java.io.BufferedInputStream)
>         at 
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
>         at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>         at 
> org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:346)
>         at 
> org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:423)
>         at 
> org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:405)
>         at 
> org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
>         at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>         at 
> org.apache.hadoop.hive.thrift.TFilterTransport.readAll(TFilterTransport.java:62)
>         at 
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
>         at 
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
>         at 
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
>         at 
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_add_partitions_req(ThriftHiveMetastore.java:1639)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.add_partitions_req(ThriftHiveMetastore.java:1626)
>         at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.add_partitions(HiveMetaStoreClient.java:609)
>         at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:101)
>         at com.sun.proxy.$Proxy10.add_partitions(Unknown Source)
>         at 
> org.apache.impala.service.CatalogOpExecutor.alterTableRecoverPartitions(CatalogOpExecutor.java:2651)
>         at 
> org.apache.impala.service.CatalogOpExecutor.alterTable(CatalogOpExecutor.java:525)
>         at 
> org.apache.impala.service.CatalogOpExecutor.execDdlRequest(CatalogOpExecutor.java:262)
>         at org.apache.impala.service.JniCatalog.execDdl(JniCatalog.java:146)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to