[
https://issues.apache.org/jira/browse/TEZ-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695875#comment-14695875
]
Siddharth Seth commented on TEZ-1997:
-------------------------------------
bq. Yes, because it is read when a spill() happens - the mutator threads are
already waiting for the spill to finish (since it doesn't do any parallel
collect + spills).
A parallel spill and collect is possible if I'm not mistaken. As long as the
new k-v being written out fits in the remained of the buffer. While numRecords
is updated - only the $this lock is obtained (The spilllock may not have been
obtained at all - if conditions for requiring a spill are not met).
Taking the first spill as an example totalRecords has been updated throughout
with the $this lock. The spill kicks in (new thread) - triggered by a collect
(still in the same thread that did all the past increments). The spill never
obtains a lock on $this. I don't think there are any guarantees on it getting
an updated value of totalRecords. We may be better of passing in the value last
seen by collect (that misses any increments by a parallel collect, but at least
gives some guarantees on what will be read by the spill thread - total of what
has been collected so far).
> Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter to hang
> indefinitely in large jobs)
> -------------------------------------------------------------------------------------------------------
>
> Key: TEZ-1997
> URL: https://issues.apache.org/jira/browse/TEZ-1997
> Project: Apache Tez
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Fix For: 0.7.0
>
> Attachments: TEZ-1997.1.patch, TEZ-1997.2.patch, TEZ-1997.3.patch,
> TEZ-1997.4.patch
>
>
> {code}
> Thread 21822: (state = BLOCKED)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.isRLENeeded()
> @bci=0, line=724 (Interpreted frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.spill(int,
> int) @bci=99, line=754 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.sortAndSpill()
> @bci=29, line=732 (Interpreted frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter$SpillThread.run()
> @bci=69, line=660 (Interpreted frame)
> Thread 19983: (state = BLOCKED)
> - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information
> may be imprecise)
> - java.util.concurrent.locks.LockSupport.park(java.lang.Object) @bci=14,
> line=175 (Compiled frame)
> -
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await()
> @bci=42, line=2039 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter$Buffer.write(byte[],
> int, int) @bci=451, line=562 (Compiled frame)
> - java.io.DataOutputStream.write(byte[], int, int) @bci=7, line=107
> (Compiled frame)
> -
> org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization$TezBytesWritableSerializer.serialize(org.apache.hadoop.io.Writable)
> @bci=18, line=123 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization$TezBytesWritableSerializer.serialize(java.lang.Object)
> @bci=5, line=110 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.collect(java.lang.Object,
> java.lang.Object, int) @bci=544, line=283 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.write(java.lang.Object,
> java.lang.Object) @bci=18, line=185 (Compiled frame)
> -
> org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput$1.write(java.lang.Object,
> java.lang.Object) @bci=9, line=126 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor$TezKVOutputCollector.collect(java.lang.Object,
> java.lang.Object) @bci=6, line=211 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.collect(org.apache.hadoop.io.BytesWritable,
> org.apache.hadoop.io.Writable) @bci=94, line=534 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(java.lang.Object,
> int) @bci=662, line=380 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator.processOp(java.lang.Object,
> int) @bci=30, line=77 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.Operator.forward(java.lang.Object,
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector) @bci=63,
> line=815 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.processOp(java.lang.Object,
> int) @bci=165, line=138 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.Operator.forward(java.lang.Object,
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector) @bci=63,
> line=815 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(java.lang.Object,
> int) @bci=64, line=95 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(java.lang.Object)
> @bci=18, line=157 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(org.apache.hadoop.io.Writable)
> @bci=53, line=45 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(java.lang.Object)
> @bci=20, line=83 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord() @bci=40,
> line=68 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run() @bci=9,
> line=294 (Interpreted frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(java.util.Map,
> java.util.Map) @bci=224, line=163 (Interpreted frame)
> - org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(java.util.Map,
> java.util.Map) @bci=86, line=138 (Interpreted frame)
> - org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run() @bci=65,
> line=328 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run()
> @bci=119, line=179 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run()
> @bci=1, line=171 (Interpreted frame)
> -
> java.security.AccessController.doPrivileged(java.security.PrivilegedExceptionAction,
> java.security.AccessControlContext) @bci=0 (Compiled frame)
> - javax.security.auth.Subject.doAs(javax.security.auth.Subject,
> java.security.PrivilegedExceptionAction) @bci=42, line=422 (Interpreted frame)
> -
> org.apache.hadoop.security.UserGroupInformation.doAs(java.security.PrivilegedExceptionAction)
> @bci=14, line=1656 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call()
> @bci=15, line=171 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call()
> @bci=1, line=166 (Interpreted frame)
> - java.util.concurrent.FutureTask.run() @bci=42, line=266 (Compiled frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1142 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> "Thread 19983" invokes "synchronized void collect(Object key, Object value,
> final int partition)" and gets into waiting state.
> "Thread 21822" invokes "private synchronized boolean isRLENeeded()" and keeps
> waiting for the lock.
> Note: This can easily be reproduced with hive on tez (with tpch dataset) "set
> tez.runtime.sort.threads=1; create testData as select * from lineitem
> distribute by l_shipdate".
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)