[ 
https://issues.apache.org/jira/browse/TEZ-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308873#comment-15308873
 ] 

Hitesh Shah edited comment on TEZ-3230 at 5/31/16 11:38 PM:
------------------------------------------------------------

Some comments on the patch: 

CartesianProductFilter
  - the API needs to be changed to more explicit instead of using a combination 
object. 

CartesianProductConfig
  - private/final member vars? 
  - more docs
  - Why is the same config object being used for both VM and EM? 
  - String filterClassName - shouldn’t this be an entity descriptor? How does a 
user configure the filter to implement the correct filtering rules? At this 
point, it seems like the class impl needs to be hardcoded for every 
permutation? 
  - slowStartMinFraction/slowStartMaxFraction - can these be driven via vertex 
specific configuration i.e. make them non-optional via the constructor?
  - Preconditions.checkArgument should have more clear error messages if the 
check fails.
  - I think the private c’tor could be changed to accept a map of String to Int 
for vertex name to partition count mapping and be made the public API.
  - How does this API work for non-partitioned case where numTasks could be 
determined at run-time? A user should not need to specify the numTasks for 
non-partitioned case anyway. A separate c’tor is needed for that?

{code}
Preconditions.checkArgument(payload != null);

Preconditions.checkArgument(payload.getPayload() != null);
{code}
  - above could be combined? 
  - class does not need “implements Serializable“ ?

CartesianProductCombination
  - what is positionId? 
  - could you clarify what Integer[] combination and Integer[] factor are? 
  - how does this class work for 3 vertices? 

{code}
public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
 
                                                    int taskId) {
{code}
  - why is this function needed? 

CartesianProductVertexManager
  - It would be better if the partitioned and non-partitioned impls were not 
using the same base class as CartesianProductVertexManager especially 
considering that CartesianProductVertexManager wraps the impl.
  - any reason why initialize() calls another 
initialize(CartesianProductConfig) instead of directly invoking vm.initialize()
  - partitioned vs non-partitioned could be an explicit boolean in the config 
instead of a numPartitions check. Additional check to verify all partition 
counts > 0? Do all sources need to have the same no. of partitions?

  
{code}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {}

{code}
  - I don’t think this should be a no-op


CartesianProductVertexManagerPartitioned
  - more docs please. 
  
{code}
try {
  filter = ReflectionUtils.createClazzInstance(config.filterClassName);
} 
catch (TezReflectionException e) {
  throw new RuntimeException("Creating 
filter failed", e);
}
{code}
  - why not rethrow the same exception? A better error message would indicate 
what filter class name failed to initialize. This could be done by a simple log 
and then a rethrow. 

  - What happens if the config is misconfigured to have a vertex which does not 
an inbound edge to this vertex? Will the DAG hang? 
  - initialize() should do config validation checks too. 

  - getContext().getInputVertexEdgeProperties().size() doesn’t change so could 
be cached in a local field for comparing to numSourceVertexConfigured.
  
{code}
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) 
throws IOException{
  Preconditions.checkArgument(stateUpdate.getVertexState() 
== VertexState.CONFIGURED);
  if (!vertexReconfigured) {
    
reconfigureVertex();
  }
{code}
  - shouldn’t reconfigureVertex() be called only once after all upstream 
vertices are configured? 


CartesianProductVertexManagerUnpartitioned
  - logic of onSourceTaskCompleted(TaskAttemptIdentifier attempt) seems 
incorrect if the vertex has already been started. 
 
in handleCompletedSrcTask:
{code}
CartesianProductCombination combination = new 
CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
{code}
  - why is the combination class created each time a source task completes? 

{code}
s/requests.size() > 0/!requests.empty/
{code}

{code}
for (EdgeProperty edgeProperty : edgeProperties.values()) {
  
EdgeManagerPluginDescriptor descriptor = 
edgeProperty.getEdgeManagerDescriptor();
  if (payload == null) {
    
CartesianProductConfig config =
      
CartesianProductConfig.fromUserPayload(descriptor.getUserPayload());
    
config.numTasks = numTasks;
    payload = config.toUserPayload();
  }
  
descriptor.setUserPayload(payload);
}
{code}
  - what if payload is not null? Will the correct numTasks never be set? 

CartesianProductEdgeManager
  - Similar comments to VM. Wrapper and impl should be different class types. 
  - initialize() calling initialize(config) could be simplified
  - partitioned vs non-partitioned could be an explicit boolean in the config 
instead of a numPartitions check. Additional check to verify all partition 
counts > 0? Do all sources need to have the same no. of partitions?

CartesianProductEdgeManager impls
  - what is the overhead of constructing 
CartesianProductCombination.fromTaskId(numTasks, destTaskId) each time an event 
needs to be routed? 



was (Author: hitesh):
Some comments on the patch: 

CartesianProductFilter
  - the API needs to be changed to more explicit instead of using a combination 
object. 

CartesianProductConfig
  - private/final member vars? 
  - more docs
  - Why is the same config object being used for both VM and EM? 
  - String filterClassName - shouldn’t this be an entity descriptor? How does a 
user configure the filter to implement the correct filtering rules? At this 
point, it seems like the class impl needs to be hardcoded for every 
permutation? 
  - slowStartMinFraction/slowStartMaxFraction - can these be driven via vertex 
specific configuration i.e. make them non-optional via the constructor?
  - Preconditions.checkArgument should have more clear error messages if the 
check fails.
  - I think the private c’tor could be changed to accept a map of String to Int 
for vertex name to partition count mapping and be made the public API.
  - How does this API work for non-partitioned case where numTasks could be 
determined at run-time? A user should not need to specify the numTasks for 
non-partitioned case anyway. A separate c’tor is needed for that?

{code}
Preconditions.checkArgument(payload != null);

Preconditions.checkArgument(payload.getPayload() != null);
{code}
  - above could be combined? 
  - class does not need “implements Serializable“ ?

CartesianProductCombination
  - what is positionId? 
  - could you clarify what Integer[] combination and Integer[] factor are? 
  - how does this class work for 3 vertices? 

{code}
public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
 
                                                    int taskId) {
{code}
  - why is this function needed? 

CartesianProductVertexManager
  - It would be better if the partitioned and non-partitioned impls were not 
using the same base class as CartesianProductVertexManager especially 
considering that CartesianProductVertexManager wraps the impl.
  - any reason why initialize() calls another 
initialize(CartesianProductConfig) instead of directly invoking vm.initialize()
  - partitioned vs non-partitioned could be an explicit boolean in the config 
instead of a numPartitions check. Additional check to verify all partition 
counts > 0? Do all sources need to have the same no. of partitions?

  
{code}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {}
{code}
  - I don’t think this should be a no-op


CartesianProductVertexManagerPartitioned
  - more docs please. 
  
{code}
try {
  filter = ReflectionUtils.createClazzInstance(config.filterClassName);
} 
catch (TezReflectionException e) {
  throw new RuntimeException("Creating 
filter failed", e);
}
  - why not rethrow the same exception? A better error message would indicate 
what filter class name failed to initialize. This could be done by a simple log 
and then a rethrow. 

  - What happens if the config is misconfigured to have a vertex which does not 
an inbound edge to this vertex? Will the DAG hang? 
  - initialize() should do config validation checks too. 

  - getContext().getInputVertexEdgeProperties().size() doesn’t change so could 
be cached in a local field for comparing to numSourceVertexConfigured.
  
{code}
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) 
throws IOException{
  Preconditions.checkArgument(stateUpdate.getVertexState() 
== VertexState.CONFIGURED);
  if (!vertexReconfigured) {
    
reconfigureVertex();
  }
{code}
  - shouldn’t reconfigureVertex() be called only once after all upstream 
vertices are configured? 


CartesianProductVertexManagerUnpartitioned
  - logic of onSourceTaskCompleted(TaskAttemptIdentifier attempt) seems 
incorrect if the vertex has already been started. 
 
in handleCompletedSrcTask:
{code}
CartesianProductCombination combination = new 
CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
{code}
  - why is the combination class created each time a source task completes? 

s/requests.size() > 0/!requests.empty/

{code}
for (EdgeProperty edgeProperty : edgeProperties.values()) {
  
EdgeManagerPluginDescriptor descriptor = 
edgeProperty.getEdgeManagerDescriptor();
  if (payload == null) {
    
CartesianProductConfig config =
      
CartesianProductConfig.fromUserPayload(descriptor.getUserPayload());
    
config.numTasks = numTasks;
    payload = config.toUserPayload();
  }
  
descriptor.setUserPayload(payload);
}
  - what if payload is not null? Will the correct numTasks never be set? 

CartesianProductEdgeManager
  - Similar comments to VM. Wrapper and impl should be different class types. 
  - initialize() calling initialize(config) could be simplified
  - partitioned vs non-partitioned could be an explicit boolean in the config 
instead of a numPartitions check. Additional check to verify all partition 
counts > 0? Do all sources need to have the same no. of partitions?

CartesianProductEdgeManager impls
  - what is the overhead of constructing 
CartesianProductCombination.fromTaskId(numTasks, destTaskId) each time an event 
needs to be routed? 


> Implement vertex manager and edge manager of cartesian product edge
> -------------------------------------------------------------------
>
>                 Key: TEZ-3230
>                 URL: https://issues.apache.org/jira/browse/TEZ-3230
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Zhiyuan Yang
>            Assignee: Zhiyuan Yang
>         Attachments: TEZ-3230.1.patch, TEZ-3230.2.patch, TEZ-3230.WIP.1.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to