[
https://issues.apache.org/jira/browse/TEZ-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308873#comment-15308873
]
Hitesh Shah edited comment on TEZ-3230 at 5/31/16 11:38 PM:
------------------------------------------------------------
Some comments on the patch:
CartesianProductFilter
- the API needs to be changed to more explicit instead of using a combination
object.
CartesianProductConfig
- private/final member vars?
- more docs
- Why is the same config object being used for both VM and EM?
- String filterClassName - shouldn’t this be an entity descriptor? How does a
user configure the filter to implement the correct filtering rules? At this
point, it seems like the class impl needs to be hardcoded for every
permutation?
- slowStartMinFraction/slowStartMaxFraction - can these be driven via vertex
specific configuration i.e. make them non-optional via the constructor?
- Preconditions.checkArgument should have more clear error messages if the
check fails.
- I think the private c’tor could be changed to accept a map of String to Int
for vertex name to partition count mapping and be made the public API.
- How does this API work for non-partitioned case where numTasks could be
determined at run-time? A user should not need to specify the numTasks for
non-partitioned case anyway. A separate c’tor is needed for that?
{code}
Preconditions.checkArgument(payload != null);
Preconditions.checkArgument(payload.getPayload() != null);
{code}
- above could be combined?
- class does not need “implements Serializable“ ?
CartesianProductCombination
- what is positionId?
- could you clarify what Integer[] combination and Integer[] factor are?
- how does this class work for 3 vertices?
{code}
public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
int taskId) {
{code}
- why is this function needed?
CartesianProductVertexManager
- It would be better if the partitioned and non-partitioned impls were not
using the same base class as CartesianProductVertexManager especially
considering that CartesianProductVertexManager wraps the impl.
- any reason why initialize() calls another
initialize(CartesianProductConfig) instead of directly invoking vm.initialize()
- partitioned vs non-partitioned could be an explicit boolean in the config
instead of a numPartitions check. Additional check to verify all partition
counts > 0? Do all sources need to have the same no. of partitions?
{code}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
throws Exception {}
{code}
- I don’t think this should be a no-op
CartesianProductVertexManagerPartitioned
- more docs please.
{code}
try {
filter = ReflectionUtils.createClazzInstance(config.filterClassName);
}
catch (TezReflectionException e) {
throw new RuntimeException("Creating
filter failed", e);
}
{code}
- why not rethrow the same exception? A better error message would indicate
what filter class name failed to initialize. This could be done by a simple log
and then a rethrow.
- What happens if the config is misconfigured to have a vertex which does not
an inbound edge to this vertex? Will the DAG hang?
- initialize() should do config validation checks too.
- getContext().getInputVertexEdgeProperties().size() doesn’t change so could
be cached in a local field for comparing to numSourceVertexConfigured.
{code}
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
throws IOException{
Preconditions.checkArgument(stateUpdate.getVertexState()
== VertexState.CONFIGURED);
if (!vertexReconfigured) {
reconfigureVertex();
}
{code}
- shouldn’t reconfigureVertex() be called only once after all upstream
vertices are configured?
CartesianProductVertexManagerUnpartitioned
- logic of onSourceTaskCompleted(TaskAttemptIdentifier attempt) seems
incorrect if the vertex has already been started.
in handleCompletedSrcTask:
{code}
CartesianProductCombination combination = new
CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
{code}
- why is the combination class created each time a source task completes?
{code}
s/requests.size() > 0/!requests.empty/
{code}
{code}
for (EdgeProperty edgeProperty : edgeProperties.values()) {
EdgeManagerPluginDescriptor descriptor =
edgeProperty.getEdgeManagerDescriptor();
if (payload == null) {
CartesianProductConfig config =
CartesianProductConfig.fromUserPayload(descriptor.getUserPayload());
config.numTasks = numTasks;
payload = config.toUserPayload();
}
descriptor.setUserPayload(payload);
}
{code}
- what if payload is not null? Will the correct numTasks never be set?
CartesianProductEdgeManager
- Similar comments to VM. Wrapper and impl should be different class types.
- initialize() calling initialize(config) could be simplified
- partitioned vs non-partitioned could be an explicit boolean in the config
instead of a numPartitions check. Additional check to verify all partition
counts > 0? Do all sources need to have the same no. of partitions?
CartesianProductEdgeManager impls
- what is the overhead of constructing
CartesianProductCombination.fromTaskId(numTasks, destTaskId) each time an event
needs to be routed?
was (Author: hitesh):
Some comments on the patch:
CartesianProductFilter
- the API needs to be changed to more explicit instead of using a combination
object.
CartesianProductConfig
- private/final member vars?
- more docs
- Why is the same config object being used for both VM and EM?
- String filterClassName - shouldn’t this be an entity descriptor? How does a
user configure the filter to implement the correct filtering rules? At this
point, it seems like the class impl needs to be hardcoded for every
permutation?
- slowStartMinFraction/slowStartMaxFraction - can these be driven via vertex
specific configuration i.e. make them non-optional via the constructor?
- Preconditions.checkArgument should have more clear error messages if the
check fails.
- I think the private c’tor could be changed to accept a map of String to Int
for vertex name to partition count mapping and be made the public API.
- How does this API work for non-partitioned case where numTasks could be
determined at run-time? A user should not need to specify the numTasks for
non-partitioned case anyway. A separate c’tor is needed for that?
{code}
Preconditions.checkArgument(payload != null);
Preconditions.checkArgument(payload.getPayload() != null);
{code}
- above could be combined?
- class does not need “implements Serializable“ ?
CartesianProductCombination
- what is positionId?
- could you clarify what Integer[] combination and Integer[] factor are?
- how does this class work for 3 vertices?
{code}
public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask,
int taskId) {
{code}
- why is this function needed?
CartesianProductVertexManager
- It would be better if the partitioned and non-partitioned impls were not
using the same base class as CartesianProductVertexManager especially
considering that CartesianProductVertexManager wraps the impl.
- any reason why initialize() calls another
initialize(CartesianProductConfig) instead of directly invoking vm.initialize()
- partitioned vs non-partitioned could be an explicit boolean in the config
instead of a numPartitions check. Additional check to verify all partition
counts > 0? Do all sources need to have the same no. of partitions?
{code}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
throws Exception {}
{code}
- I don’t think this should be a no-op
CartesianProductVertexManagerPartitioned
- more docs please.
{code}
try {
filter = ReflectionUtils.createClazzInstance(config.filterClassName);
}
catch (TezReflectionException e) {
throw new RuntimeException("Creating
filter failed", e);
}
- why not rethrow the same exception? A better error message would indicate
what filter class name failed to initialize. This could be done by a simple log
and then a rethrow.
- What happens if the config is misconfigured to have a vertex which does not
an inbound edge to this vertex? Will the DAG hang?
- initialize() should do config validation checks too.
- getContext().getInputVertexEdgeProperties().size() doesn’t change so could
be cached in a local field for comparing to numSourceVertexConfigured.
{code}
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
throws IOException{
Preconditions.checkArgument(stateUpdate.getVertexState()
== VertexState.CONFIGURED);
if (!vertexReconfigured) {
reconfigureVertex();
}
{code}
- shouldn’t reconfigureVertex() be called only once after all upstream
vertices are configured?
CartesianProductVertexManagerUnpartitioned
- logic of onSourceTaskCompleted(TaskAttemptIdentifier attempt) seems
incorrect if the vertex has already been started.
in handleCompletedSrcTask:
{code}
CartesianProductCombination combination = new
CartesianProductCombination(numTasks, sourceVertices.indexOf(vertex));
{code}
- why is the combination class created each time a source task completes?
s/requests.size() > 0/!requests.empty/
{code}
for (EdgeProperty edgeProperty : edgeProperties.values()) {
EdgeManagerPluginDescriptor descriptor =
edgeProperty.getEdgeManagerDescriptor();
if (payload == null) {
CartesianProductConfig config =
CartesianProductConfig.fromUserPayload(descriptor.getUserPayload());
config.numTasks = numTasks;
payload = config.toUserPayload();
}
descriptor.setUserPayload(payload);
}
- what if payload is not null? Will the correct numTasks never be set?
CartesianProductEdgeManager
- Similar comments to VM. Wrapper and impl should be different class types.
- initialize() calling initialize(config) could be simplified
- partitioned vs non-partitioned could be an explicit boolean in the config
instead of a numPartitions check. Additional check to verify all partition
counts > 0? Do all sources need to have the same no. of partitions?
CartesianProductEdgeManager impls
- what is the overhead of constructing
CartesianProductCombination.fromTaskId(numTasks, destTaskId) each time an event
needs to be routed?
> Implement vertex manager and edge manager of cartesian product edge
> -------------------------------------------------------------------
>
> Key: TEZ-3230
> URL: https://issues.apache.org/jira/browse/TEZ-3230
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: Zhiyuan Yang
> Assignee: Zhiyuan Yang
> Attachments: TEZ-3230.1.patch, TEZ-3230.2.patch, TEZ-3230.WIP.1.patch
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)