[ 
https://issues.apache.org/jira/browse/TEZ-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005671#comment-16005671
 ] 

Siddharth Seth commented on TEZ-3708:
-------------------------------------

- {code}
+    minNumRecordForEstimation =
+      (long) Math.pow(minOpsPerWorker * maxParallelism, 1.0 / 
sourceList.size());
{code}
I think this needs to be made configurable. -1 for the default, otherwise a 
range between 0.0 and 1.0, indicating the fraction of the tasks that need to 
complete for estimation to run. Given the high cost of a CartesianProduct - I 
suspect waiting for all sources to complete, to get a better estimate is 
worthwhile.
- Related to the same config - this many records is required from every vertex 
before estimation is done. All vertices may not produce so much data. Think the 
check needs to be total estimated = this value, rather than per vertex.
- {code}for (SrcVertex srcV : srcVerticesByName.values()) {
      if (srcV.numRecord < minNumRecordForEstimation
        && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
        return false;
      }
    }{code}
Related to the previous comment. This likely needs a check for cases where the 
source has completed. (SourceComplete || recordCount -> estimation possible)

- {code}
    src.numRecord = src.estimateNumRecord();
      if (src.numRecord == 0) {
        reconfigureWithZeroTask();
      }
{code}
I think this will end up skipping data. Cannot rely on an estimatedValue of 0 
to skip all data. If the estimate is 0 - I think the system needs to wait till 
all tasks complete, otherwise (or at least some data generated), otherwise it 
cannot guarantee no data.

- {code}
+      if (disableGrouping) {
+        src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask * 
numPartitions,
+          Math.max(1, (int) (src.numRecord * k)));
{code}
Is the condition reversed?
- onVertexManagerEventReceived - does this need to process VM events from edges 
with type = CartesianProduct only (extension of the current restrictions to be 
from a connected source)
- Grouper - restrictions on numRecords > numGroups - is this required?
- TestFairCartesianProductVertexManager - parallelismCaptor not used. Why is 
setPartitioned(false) set multiple times?
- TestFairCartesianProductVertexManager - think some more unit tests are 
required to exercise corner cases with estimation, etc.
- RoundRobinPartitioner - Any concerns with Integer overflow?
- Is it required that initial parallelism be -1? If so, there should be a 
PreCondition check for this.
- TEZ_CARTESIAN_PRODUCT_DISABLE_GROUPING. Can this be called enable grouping 
instead? (Instead of a double negative)
- Nit: Missing Override annotation on some methods like 
onVertexManagerEventReceived

> Improve parallelism and auto grouping of unpartitioned cartesian product
> ------------------------------------------------------------------------
>
>                 Key: TEZ-3708
>                 URL: https://issues.apache.org/jira/browse/TEZ-3708
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Zhiyuan Yang
>            Assignee: Zhiyuan Yang
>         Attachments: TEZ-3708.1.patch, TEZ-3708.2.patch, TEZ-3708.3.patch
>
>
> Current unpartitioned cartesian product has a few limitations
> 1. parallelism can be not enough in case of large split and small # src task
> 2. parallelism can be too much in in case of large # src task
> 3. workload is not ideally distributed across the worker. Even with auto 
> grouping, grouping by size may not be accurate because same size can means 
> different #record and different cartesian product ops.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to