[
https://issues.apache.org/jira/browse/TEZ-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005671#comment-16005671
]
Siddharth Seth commented on TEZ-3708:
-------------------------------------
- {code}
+ minNumRecordForEstimation =
+ (long) Math.pow(minOpsPerWorker * maxParallelism, 1.0 /
sourceList.size());
{code}
I think this needs to be made configurable. -1 for the default, otherwise a
range between 0.0 and 1.0, indicating the fraction of the tasks that need to
complete for estimation to run. Given the high cost of a CartesianProduct - I
suspect waiting for all sources to complete, to get a better estimate is
worthwhile.
- Related to the same config - this many records is required from every vertex
before estimation is done. All vertices may not produce so much data. Think the
check needs to be total estimated = this value, rather than per vertex.
- {code}for (SrcVertex srcV : srcVerticesByName.values()) {
if (srcV.numRecord < minNumRecordForEstimation
&& srcV.taskWithVMEvent.getCardinality() < srcV.numTask) {
return false;
}
}{code}
Related to the previous comment. This likely needs a check for cases where the
source has completed. (SourceComplete || recordCount -> estimation possible)
- {code}
src.numRecord = src.estimateNumRecord();
if (src.numRecord == 0) {
reconfigureWithZeroTask();
}
{code}
I think this will end up skipping data. Cannot rely on an estimatedValue of 0
to skip all data. If the estimate is 0 - I think the system needs to wait till
all tasks complete, otherwise (or at least some data generated), otherwise it
cannot guarantee no data.
- {code}
+ if (disableGrouping) {
+ src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask *
numPartitions,
+ Math.max(1, (int) (src.numRecord * k)));
{code}
Is the condition reversed?
- onVertexManagerEventReceived - does this need to process VM events from edges
with type = CartesianProduct only (extension of the current restrictions to be
from a connected source)
- Grouper - restrictions on numRecords > numGroups - is this required?
- TestFairCartesianProductVertexManager - parallelismCaptor not used. Why is
setPartitioned(false) set multiple times?
- TestFairCartesianProductVertexManager - think some more unit tests are
required to exercise corner cases with estimation, etc.
- RoundRobinPartitioner - Any concerns with Integer overflow?
- Is it required that initial parallelism be -1? If so, there should be a
PreCondition check for this.
- TEZ_CARTESIAN_PRODUCT_DISABLE_GROUPING. Can this be called enable grouping
instead? (Instead of a double negative)
- Nit: Missing Override annotation on some methods like
onVertexManagerEventReceived
> Improve parallelism and auto grouping of unpartitioned cartesian product
> ------------------------------------------------------------------------
>
> Key: TEZ-3708
> URL: https://issues.apache.org/jira/browse/TEZ-3708
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: Zhiyuan Yang
> Assignee: Zhiyuan Yang
> Attachments: TEZ-3708.1.patch, TEZ-3708.2.patch, TEZ-3708.3.patch
>
>
> Current unpartitioned cartesian product has a few limitations
> 1. parallelism can be not enough in case of large split and small # src task
> 2. parallelism can be too much in in case of large # src task
> 3. workload is not ideally distributed across the worker. Even with auto
> grouping, grouping by size may not be accurate because same size can means
> different #record and different cartesian product ops.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)