[
https://issues.apache.org/jira/browse/TEZ-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695976#comment-14695976
]
Gopal V commented on TEZ-1997:
------------------------------
bq. A parallel spill and collect is possible if I'm not mistaken.
You're right - I'm thinking of PipelinedSorter (as the default).
bq. We may be better of passing in the value last seen by collect (that misses
any increments by a parallel collect, but at least gives some guarantees on
what will be read by the spill thread
Yes. It's better off to decide based on the state when the spill thread kicks
in (compute the boolean right there).
> Remove synchronization DefaultSorter::isRLENeeded() (Causes sorter to hang
> indefinitely in large jobs)
> -------------------------------------------------------------------------------------------------------
>
> Key: TEZ-1997
> URL: https://issues.apache.org/jira/browse/TEZ-1997
> Project: Apache Tez
> Issue Type: Bug
> Affects Versions: 0.7.0
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Fix For: 0.7.0
>
> Attachments: TEZ-1997.1.patch, TEZ-1997.2.patch, TEZ-1997.3.patch,
> TEZ-1997.4.patch
>
>
> {code}
> Thread 21822: (state = BLOCKED)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.isRLENeeded()
> @bci=0, line=724 (Interpreted frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.spill(int,
> int) @bci=99, line=754 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.sortAndSpill()
> @bci=29, line=732 (Interpreted frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter$SpillThread.run()
> @bci=69, line=660 (Interpreted frame)
> Thread 19983: (state = BLOCKED)
> - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information
> may be imprecise)
> - java.util.concurrent.locks.LockSupport.park(java.lang.Object) @bci=14,
> line=175 (Compiled frame)
> -
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await()
> @bci=42, line=2039 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter$Buffer.write(byte[],
> int, int) @bci=451, line=562 (Compiled frame)
> - java.io.DataOutputStream.write(byte[], int, int) @bci=7, line=107
> (Compiled frame)
> -
> org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization$TezBytesWritableSerializer.serialize(org.apache.hadoop.io.Writable)
> @bci=18, line=123 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization$TezBytesWritableSerializer.serialize(java.lang.Object)
> @bci=5, line=110 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.collect(java.lang.Object,
> java.lang.Object, int) @bci=544, line=283 (Compiled frame)
> -
> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.write(java.lang.Object,
> java.lang.Object) @bci=18, line=185 (Compiled frame)
> -
> org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput$1.write(java.lang.Object,
> java.lang.Object) @bci=9, line=126 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor$TezKVOutputCollector.collect(java.lang.Object,
> java.lang.Object) @bci=6, line=211 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.collect(org.apache.hadoop.io.BytesWritable,
> org.apache.hadoop.io.Writable) @bci=94, line=534 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(java.lang.Object,
> int) @bci=662, line=380 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator.processOp(java.lang.Object,
> int) @bci=30, line=77 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.Operator.forward(java.lang.Object,
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector) @bci=63,
> line=815 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.processOp(java.lang.Object,
> int) @bci=165, line=138 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.Operator.forward(java.lang.Object,
> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector) @bci=63,
> line=815 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(java.lang.Object,
> int) @bci=64, line=95 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(java.lang.Object)
> @bci=18, line=157 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(org.apache.hadoop.io.Writable)
> @bci=53, line=45 (Compiled frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(java.lang.Object)
> @bci=20, line=83 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord() @bci=40,
> line=68 (Compiled frame)
> - org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run() @bci=9,
> line=294 (Interpreted frame)
> -
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(java.util.Map,
> java.util.Map) @bci=224, line=163 (Interpreted frame)
> - org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(java.util.Map,
> java.util.Map) @bci=86, line=138 (Interpreted frame)
> - org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run() @bci=65,
> line=328 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run()
> @bci=119, line=179 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run()
> @bci=1, line=171 (Interpreted frame)
> -
> java.security.AccessController.doPrivileged(java.security.PrivilegedExceptionAction,
> java.security.AccessControlContext) @bci=0 (Compiled frame)
> - javax.security.auth.Subject.doAs(javax.security.auth.Subject,
> java.security.PrivilegedExceptionAction) @bci=42, line=422 (Interpreted frame)
> -
> org.apache.hadoop.security.UserGroupInformation.doAs(java.security.PrivilegedExceptionAction)
> @bci=14, line=1656 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call()
> @bci=15, line=171 (Interpreted frame)
> - org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call()
> @bci=1, line=166 (Interpreted frame)
> - java.util.concurrent.FutureTask.run() @bci=42, line=266 (Compiled frame)
> -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=95, line=1142 (Interpreted frame)
> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=617
> (Interpreted frame)
> - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> {code}
> "Thread 19983" invokes "synchronized void collect(Object key, Object value,
> final int partition)" and gets into waiting state.
> "Thread 21822" invokes "private synchronized boolean isRLENeeded()" and keeps
> waiting for the lock.
> Note: This can easily be reproduced with hive on tez (with tpch dataset) "set
> tez.runtime.sort.threads=1; create testData as select * from lineitem
> distribute by l_shipdate".
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)