TEZ-3654. Make CartesianProduct edge work with GroupInputEdge (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f355a050 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f355a050 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f355a050 Branch: refs/heads/master Commit: f355a050cf012854d1f7145d05f1981a3cf97e67 Parents: e0ee28a Author: Zhiyuan Yang <[email protected]> Authored: Mon Apr 10 11:18:33 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon Apr 10 11:18:33 2017 -0700 ---------------------------------------------------------------------- .../tez/dag/api/EdgeManagerPluginContext.java | 4 + .../tez/dag/api/VertexManagerPluginContext.java | 9 + .../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 12 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 4 +- .../tez/dag/app/dag/impl/VertexManager.java | 14 + .../apache/tez/dag/app/dag/impl/TestEdge.java | 32 ++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 6 +- .../tez/dag/app/dag/impl/TestVertexManager.java | 26 + .../CartesianProductCombination.java | 78 ++- .../CartesianProductConfig.java | 60 +- .../CartesianProductEdgeManagerConfig.java | 39 +- .../CartesianProductEdgeManagerPartitioned.java | 8 +- .../CartesianProductEdgeManagerReal.java | 1 - ...artesianProductEdgeManagerUnpartitioned.java | 57 +- .../CartesianProductVertexManager.java | 51 +- .../CartesianProductVertexManagerConfig.java | 40 +- ...artesianProductVertexManagerPartitioned.java | 10 +- ...tesianProductVertexManagerUnpartitioned.java | 316 +++++++---- .../main/proto/CartesianProductPayload.proto | 9 +- .../TestShuffleVertexManagerUtils.java | 5 + .../TestCartesianProductCombination.java | 30 +- .../TestCartesianProductConfig.java | 6 +- .../TestCartesianProductEdgeManager.java | 6 +- .../TestCartesianProductEdgeManagerConfig.java | 15 +- ...tCartesianProductEdgeManagerPartitioned.java | 13 +- ...artesianProductEdgeManagerUnpartitioned.java | 408 +++++++------ .../TestCartesianProductVertexManager.java | 19 + ...TestCartesianProductVertexManagerConfig.java | 8 +- ...tesianProductVertexManagerUnpartitioned.java | 565 +++++++++++-------- 30 files changed, 1099 insertions(+), 754 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java index 79f685d..ef6925b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java @@ -58,4 +58,8 @@ public interface EdgeManagerPluginContext { */ public int getDestinationVertexNumTasks(); + /** + * @return the name of vertex group that source vertex belongs to, or null + */ + String getVertexGroupName(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index aa99745..b858a65 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -350,4 +350,13 @@ public interface VertexManagerPluginContext { // TODO must be done later after TEZ-1714 //public void vertexManagerDone(); + /** + * Get input vertex groups of this vertex, including vertex group name and + * all members vertex name + * + * @return map whose key is vertex group name and value is list of members' name, + * or empty map if there is no input vertex group. + */ + Map<String, List<String>> getInputVertexGroups(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 1b3b39c..51847d4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -149,7 +149,7 @@ public interface Vertex extends Comparable<Vertex> { List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException; List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException; - List<GroupInputSpec> getGroupInputSpecList(int taskIndex); + List<GroupInputSpec> getGroupInputSpecList(); void addSharedOutputs(Set<String> outputs); Set<String> getSharedOutputs(); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 690df63..f78c9a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -28,6 +28,7 @@ import java.util.zip.Deflater; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,6 +130,17 @@ public class Edge { return destinationVertex.getTotalTasks(); } + @Override + public String getVertexGroupName() { + if (destinationVertex.getGroupInputSpecList() != null) { + for (GroupInputSpec group : destinationVertex.getGroupInputSpecList()) { + if (group.getGroupVertices().contains(getSourceVertexName())) { + return group.getGroupName(); + } + } + } + return null; + } } private EdgeProperty edgeProperty; http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 1ab3da8..ab17fe4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1581,7 +1581,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return TaskSpec.createBaseTaskSpec(getDAG().getName(), getName(), getTotalTasks(), getProcessorDescriptor(), getInputSpecList(taskIndex), getOutputSpecList(taskIndex), - getGroupInputSpecList(taskIndex), vertexOnlyConf); + getGroupInputSpecList(), vertexOnlyConf); } @Override @@ -4422,7 +4422,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @Override - public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) { + public List<GroupInputSpec> getGroupInputSpecList() { readLock.lock(); try { return groupInputSpecList; http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 45f72bd..b7d3428 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -24,6 +24,7 @@ import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +36,7 @@ import javax.annotation.Nullable; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.dag.app.dag.event.DAGEventInternalError; +import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -360,6 +362,18 @@ public class VertexManager { } @Override + public Map<String, List<String>> getInputVertexGroups() { + checkAndThrowIfDone(); + Map<String, List<String>> inputGroups = Maps.newHashMap(); + if (managedVertex.getGroupInputSpecList() != null) { + for (GroupInputSpec group : managedVertex.getGroupInputSpecList()) { + inputGroups.put(group.getGroupName(), Collections.unmodifiableList(group.getGroupVertices())); + } + } + return inputGroups; + } + + @Override public void onStateUpdated(VertexStateUpdate event) { // this is not called by the vertex manager plugin. // no need to synchronize this. similar to other external notification methods http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java index f53e505..1143395 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -34,6 +35,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashMap; @@ -67,7 +69,9 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.test.EdgeManagerForTest; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -476,4 +480,32 @@ public class TestEdge { return emConf.sourceTaskIndex; } } + + @Test(timeout = 5000) + public void testEdgeManagerPluginCtxGetVertexGroupName() throws TezException { + EdgeManagerPluginDescriptor edgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()); + EdgeProperty edgeProp = EdgeProperty.create(edgeManagerDescriptor, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), + InputDescriptor.create("In")); + Edge edge = new Edge(edgeProp, null, null); + + Vertex srcV = mock(Vertex.class), destV = mock(Vertex.class); + String srcName = "srcV", destName = "destV"; + when(srcV.getName()).thenReturn(srcName); + when(destV.getName()).thenReturn(destName); + edge.setSourceVertex(srcV); + edge.setDestinationVertex(destV); + + assertNull(edge.edgeManager.getContext().getVertexGroupName()); + + String group = "group"; + when(destV.getGroupInputSpecList()) + .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList("v1", "v3"), null))); + assertNull(edge.edgeManager.getContext().getVertexGroupName()); + + when(destV.getGroupInputSpecList()) + .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(srcName, "v3"), null))); + assertEquals(group, edge.edgeManager.getContext().getVertexGroupName()); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 90d675e..bc06fd0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -5940,10 +5940,10 @@ public class TestVertexImpl { VertexEventType.V_INIT)); dispatcher.await(); - Assert.assertNull(vA.getGroupInputSpecList(0)); - Assert.assertNull(vB.getGroupInputSpecList(0)); + Assert.assertNull(vA.getGroupInputSpecList()); + Assert.assertNull(vB.getGroupInputSpecList()); - List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0); + List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(); Assert.assertEquals(1, groupInSpec.size()); Assert.assertEquals("Group", groupInSpec.get(0).getGroupName()); assertTrue(groupInSpec.get(0).getGroupVertices().contains("A")); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java index b93b298..6bec26e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java @@ -20,6 +20,8 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; @@ -30,6 +32,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -55,6 +59,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Before; import org.junit.Test; @@ -205,6 +210,27 @@ public class TestVertexManager { assertEquals(Sets.newHashSet("input1","input2"), edgeVertexSet); } + @Test(timeout = 5000) + public void testVMPluginCtxGetInputVertexGroup() throws Exception { + VertexManager vm = + new VertexManager( + VertexManagerPluginDescriptor.create(CustomVertexManager.class + .getName()), UserGroupInformation.getCurrentUser(), + mockVertex, mockAppContext, mock(StateChangeNotifier.class)); + + assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty()); + + String group = "group", v1 = "v1", v2 = "v2"; + when(mockVertex.getGroupInputSpecList()) + .thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(v1, v2), null))); + Map<String, List<String>> groups = vm.pluginContext.getInputVertexGroups(); + assertEquals(1, groups.size()); + assertTrue(groups.containsKey(group)); + assertEquals(2, groups.get(group).size()); + assertTrue(groups.get(group).contains(v1)); + assertTrue(groups.get(group).contains(v2)); + } + public static class CustomVertexManager extends VertexManagerPlugin { private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>(); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java index c6c95f2..97f3eb2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java @@ -25,49 +25,47 @@ import java.util.Collections; import java.util.List; /** - * Represent the combination of source partitions or tasks. + * Represent the combination of source chunks. A chunk is one or more source tasks or partitions. * - * For example, if we have two source vertices and each generates two partition, we will have 2*2=4 - * destination tasks. The mapping from source partition/task to destination task is like this: + * For example, if we have two source vertices and each generates two chunks, we will have 2*2=4 + * destination tasks. The mapping from source chunks to destination task is like this: * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3; * - * Basically, it stores the source partition/task combination and can compute corresponding + * Basically, it stores the source chunk id combination and can compute corresponding * destination task. It can also figure out the source combination from a given destination task. - * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field> - * is the helper array to computer task id, so task id = (combination) dot-product (factor) + * Task id is mapped in the ascending order of combinations, starting from 0. * * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>, * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>. * - * Or you can also traverse all combinations that has one specific partition with - * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>, + * Or you can also traverse all combinations that has one specific chunk with + * <method>firstTaskWithFixedChunk</method> and <method>nextTaskWithFixedChunk</method>, * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd - * partition. + * chunk. */ class CartesianProductCombination { - // numPartitions for partitioned case, numTasks for unpartitioned case - private int[] numPartitionOrTask; - // at which position (in source vertices array) our vertex is + private int[] numChunk; + // which position (in source vertices array) we care about private int positionId = -1; - // The i-th element Ci represents partition/task Ci of source vertex i. + // The i-th element Ci represents chunk Ci of source vertex i. private final Integer[] combination; - // the weight of each vertex when computing the task id + // helper array to computer task id: task id = (combination) dot-product (factor) private final Integer[] factor; - public CartesianProductCombination(int[] numPartitionOrTask) { - Preconditions.checkArgument(!Ints.contains(numPartitionOrTask, 0), - "CartesianProductCombination doesn't allow zero partition or task"); - this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length); - combination = new Integer[numPartitionOrTask.length]; - factor = new Integer[numPartitionOrTask.length]; + public CartesianProductCombination(int[] numChunk) { + Preconditions.checkArgument(!Ints.contains(numChunk, 0), + "CartesianProductCombination doesn't allow zero chunk"); + this.numChunk = Arrays.copyOf(numChunk, numChunk.length); + combination = new Integer[numChunk.length]; + factor = new Integer[numChunk.length]; factor[factor.length-1] = 1; for (int i = combination.length-2; i >= 0; i--) { - factor[i] = factor[i+1]*numPartitionOrTask[i+1]; + factor[i] = factor[i+1]* numChunk[i+1]; } } - public CartesianProductCombination(int[] numPartitionOrTask, int positionId) { - this(numPartitionOrTask); + public CartesianProductCombination(int[] numChunk, int positionId) { + this(numChunk); this.positionId = positionId; } @@ -79,24 +77,24 @@ class CartesianProductCombination { } /** - * first combination with given partition id in current position - * @param partition + * first combination with given chunk id in current position + * @param chunkId */ - public void firstTaskWithFixedPartition(int partition) { + public void firstTaskWithFixedChunk(int chunkId) { Preconditions.checkArgument(positionId >= 0 && positionId < combination.length); Arrays.fill(combination, 0); - combination[positionId] = partition; + combination[positionId] = chunkId; } /** - * next combination without current partition in current position + * next combination without current chunk in current position * @return false if there is no next combination */ - public boolean nextTaskWithFixedPartition() { + public boolean nextTaskWithFixedChunk() { Preconditions.checkArgument(positionId >= 0 && positionId < combination.length); int i; for (i = combination.length-1; i >= 0; i--) { - if (i != positionId && combination[i] != numPartitionOrTask[i]-1) { + if (i != positionId && combination[i] != numChunk[i]-1) { break; } } @@ -117,20 +115,20 @@ class CartesianProductCombination { } /** - * first combination with given partition id in current position + * first combination with given chunk id in current position */ public void firstTask() { Arrays.fill(combination, 0); } /** - * next combination without current partition in current position + * next combination without current chunk in current position * @return false if there is no next combination */ public boolean nextTask() { int i; for (i = combination.length-1; i >= 0; i--) { - if (combination[i] != numPartitionOrTask[i]-1) { + if (combination[i] != numChunk[i]-1) { break; } } @@ -145,19 +143,19 @@ class CartesianProductCombination { } /** - * @return corresponding task id for current combination + * @return corresponding chunk id for current combination */ - public int getTaskId() { - int taskId = 0; + public int getChunkId() { + int chunkId = 0; for (int i = 0; i < combination.length; i++) { - taskId += combination[i]*factor[i]; + chunkId += combination[i]*factor[i]; } - return taskId; + return chunkId; } - public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask, + public static CartesianProductCombination fromTaskId(int[] numChunk, int taskId) { - CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask); + CartesianProductCombination result = new CartesianProductCombination(numChunk); for (int i = 0; i < result.combination.length; i++) { result.combination[i] = taskId/result.factor[i]; taskId %= result.factor[i]; http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java index b57ed84..12a17cb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java @@ -46,21 +46,23 @@ import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUs @Evolving public class CartesianProductConfig { private final boolean isPartitioned; - private final String[] sourceVertices; + private final String[] sources; + // numPartition[i] means how many partitions sourceVertices[i] will generate + // (not used in unpartitioned case) private final int[] numPartitions; private final CartesianProductFilterDescriptor filterDescriptor; /** * create config for unpartitioned case - * @param sourceVertices list of source vertices names + * @param sources list of names of source vertices or vertex groups */ - public CartesianProductConfig(List<String> sourceVertices) { - Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null"); - Preconditions.checkArgument(sourceVertices.size() > 1, - "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size()); + public CartesianProductConfig(List<String> sources) { + Preconditions.checkArgument(sources != null, "source list cannot be null"); + Preconditions.checkArgument(sources.size() > 1, + "there must be more than 1 source " + "67, currently only " + sources.size()); this.isPartitioned = false; - this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]); + this.sources = sources.toArray(new String[sources.size()]); this.numPartitions = null; this.filterDescriptor = null; } @@ -86,12 +88,12 @@ public class CartesianProductConfig { this.isPartitioned = true; this.numPartitions = new int[vertexPartitionMap.size()]; - this.sourceVertices = new String[vertexPartitionMap.size()]; + this.sources = new String[vertexPartitionMap.size()]; this.filterDescriptor = filterDescriptor; int i = 0; for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) { - this.sourceVertices[i] = entry.getKey(); + this.sources[i] = entry.getKey(); this.numPartitions[i] = entry.getValue(); i++; } @@ -102,23 +104,23 @@ public class CartesianProductConfig { /** * create config for partitioned case, with specified source vertices order * @param numPartitions - * @param sourceVertices + * @param sources * @param filterDescriptor */ @VisibleForTesting - protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices, + protected CartesianProductConfig(int[] numPartitions, String[] sources, CartesianProductFilterDescriptor filterDescriptor) { Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null"); - Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null"); - Preconditions.checkArgument(numPartitions.length == sourceVertices.length, - "partitions count array(length: " + numPartitions.length + ") and source vertices array " + - "(length: " + sourceVertices.length + ") cannot have different length"); - Preconditions.checkArgument(sourceVertices.length > 1, - "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length); + Preconditions.checkArgument(sources != null, "source array can't be null"); + Preconditions.checkArgument(numPartitions.length == sources.length, + "partitions count array(length: " + numPartitions.length + ") and source array " + + "(length: " + sources.length + ") cannot have different length"); + Preconditions.checkArgument(sources.length > 1, + "there must be more than 1 source " + ", currently only " + sources.length); this.isPartitioned = true; this.numPartitions = numPartitions; - this.sourceVertices = sourceVertices; + this.sources = sources; this.filterDescriptor = filterDescriptor; checkNumPartitions(); @@ -128,11 +130,11 @@ public class CartesianProductConfig { * create config for both cases, used by subclass */ protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions, - String[] sourceVertices, + String[] sources, CartesianProductFilterDescriptor filterDescriptor) { this.isPartitioned = isPartitioned; this.numPartitions = numPartitions; - this.sourceVertices = sourceVertices; + this.sources = sources; this.filterDescriptor = filterDescriptor; } @@ -142,11 +144,11 @@ public class CartesianProductConfig { boolean isUnpartitioned = true; for (int i = 0; i < numPartitions.length; i++) { Preconditions.checkArgument(this.numPartitions[i] > 0, - "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions"); + "Vertex " + sources[i] + "has negative (" + numPartitions[i] + ") partitions"); isUnpartitioned = isUnpartitioned && numPartitions[i] == 1; } Preconditions.checkArgument(!isUnpartitioned, - "every source vertex has 1 partition in a partitioned case"); + "every source has 1 partition in a partitioned case"); } else { Preconditions.checkArgument(this.numPartitions == null, "partition counts should be null in unpartitioned case"); @@ -154,10 +156,10 @@ public class CartesianProductConfig { } /** - * @return the array of source vertices names + * @return the array of source vertices (or source vertex group) names */ public List<String> getSourceVertices() { - return Collections.unmodifiableList(Arrays.asList(sourceVertices)); + return Collections.unmodifiableList(Arrays.asList(sources)); } /** @@ -187,7 +189,7 @@ public class CartesianProductConfig { CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); builder.setIsPartitioned(this.isPartitioned) - .addAllSourceVertices(Arrays.asList(sourceVertices)); + .addAllSources(Arrays.asList(sources)); if (isPartitioned) { builder.addAllNumPartitions(Ints.asList(numPartitions)); @@ -220,7 +222,7 @@ public class CartesianProductConfig { String desiredBytesPerGroup = conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP); if (desiredBytesPerGroup != null) { - builder.setDesiredBytesPerGroup(Long.parseLong(desiredBytesPerGroup)); + builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup)); } } Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(), @@ -246,10 +248,10 @@ public class CartesianProductConfig { protected static CartesianProductConfig fromProto( CartesianProductConfigProto proto) { if (!proto.getIsPartitioned()) { - return new CartesianProductConfig(proto.getSourceVerticesList()); + return new CartesianProductConfig(proto.getSourcesList()); } else { - String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; - proto.getSourceVerticesList().toArray(sourceVertices); + String[] sourceVertices = new String[proto.getSourcesList().size()]; + proto.getSourcesList().toArray(sourceVertices); CartesianProductFilterDescriptor filterDescriptor = null; if (proto.hasFilterClassName()) { filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName()); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java index 0347f67..df0bcfa 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java @@ -27,23 +27,18 @@ import java.nio.ByteBuffer; import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; class CartesianProductEdgeManagerConfig extends CartesianProductConfig { - private final int[] numTasks; - private final int[] numGroups; + final int[] numChunksPerSrc; + final int numChunk; + final int chunkIdOffset; protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices, - int[] numPartitions, int[] numTasks, int[] numGroups, - CartesianProductFilterDescriptor filterDescriptor) { + int[] numPartitions, int[] numChunksPerSrc, int numChunk, + int chunkIdOffset, + CartesianProductFilterDescriptor filterDescriptor) { super(isPartitioned, numPartitions, sourceVertices, filterDescriptor); - this.numTasks = numTasks; - this.numGroups = numGroups; - } - - public int[] getNumTasks() { - return this.numTasks; - } - - public int[] getNumGroups() { - return this.numGroups; + this.numChunksPerSrc = numChunksPerSrc; + this.numChunk = numChunk; + this.chunkIdOffset = chunkIdOffset; } public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload) @@ -52,8 +47,8 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig { CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); boolean isPartitioned = proto.getIsPartitioned(); - String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; - proto.getSourceVerticesList().toArray(sourceVertices); + String[] sources = new String[proto.getSourcesList().size()]; + proto.getSourcesList().toArray(sources); int[] numPartitions = proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() @@ -62,11 +57,11 @@ class CartesianProductEdgeManagerConfig extends CartesianProductConfig { filterDescriptor.setUserPayload( UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); } - int[] numTasks = - proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList()); - int[] numGroups = - proto.getNumGroupsCount() == 0 ? null : Ints.toArray(proto.getNumGroupsList()); - return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions, - numTasks, numGroups, filterDescriptor); + int[] humChunksPerSrc = + proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList()); + int numChunk = proto.getNumChunk(); + int chunkIdOffset = proto.getChunkIdOffset(); + return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions, + humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java index 068da81..5ece5cf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java @@ -107,13 +107,13 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager CartesianProductCombination combination = new CartesianProductCombination(numPartitions); combination.firstTask(); - List<String> sourceVertices = config.getSourceVertices(); + List<String> sources = config.getSourceVertices(); do { - for (int i = 0; i < sourceVertices.size(); i++) { - vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i)); + for (int i = 0; i < sources.size(); i++) { + vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i)); } if (filter == null || filter.isValidCombination(vertexPartitionMap)) { - idealTaskId.add(combination.getTaskId()); + idealTaskId.add(combination.getChunkId()); } } while (combination.nextTask()); this.taskIdMapping = Ints.toArray(idealTaskId); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java index 3e1407c..f22035b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java @@ -18,7 +18,6 @@ package org.apache.tez.runtime.library.cartesianproduct; import org.apache.tez.dag.api.EdgeManagerPluginContext; -import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java index b9cb155..80d7dc1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java @@ -27,7 +27,9 @@ import javax.annotation.Nullable; class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal { private int positionId; - private int[] numGroups; + private int numChunk; + private int chunkIdOffset; + private int[] numChunkPerSrc; private int numDestinationConsumerTasks; private Grouper grouper = new Grouper(); @@ -36,32 +38,38 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag } public void initialize(CartesianProductEdgeManagerConfig config) { - positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName()); - this.numGroups = config.getNumGroups(); + String groupName = getContext().getVertexGroupName(); + String srcName = groupName != null ? groupName : getContext().getSourceVertexName(); + this.positionId = config.getSourceVertices().indexOf(srcName); + this.numChunkPerSrc = config.numChunksPerSrc; + this.numChunk = config.numChunk; + this.chunkIdOffset = config.chunkIdOffset; - if (numGroups != null && numGroups[positionId] != 0) { - grouper.init(config.getNumTasks()[positionId], numGroups[positionId]); + if (numChunk != 0) { + grouper.init(getContext().getSourceVertexNumTasks(), numChunk); numDestinationConsumerTasks = 1; - for (int numGroup : numGroups) { + for (int numGroup : numChunkPerSrc) { numDestinationConsumerTasks *= numGroup; } - numDestinationConsumerTasks /= numGroups[positionId]; + numDestinationConsumerTasks /= numChunkPerSrc[positionId]; } } @Override public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { return failedInputId + grouper.getFirstTaskInGroup( - CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId)); + CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) + - chunkIdOffset); } @Override public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId, int destTaskId) throws Exception { - int groupId = - CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId); - if (grouper.isInGroup(srcTaskId, groupId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId); + int chunkId = + CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) + - chunkIdOffset; + if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { + int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); return EventRouteMetadata.create(1, new int[] {idx}); } return null; @@ -72,10 +80,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, int destTaskId) throws Exception { - int groupId = - CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId); - if (grouper.isInGroup(srcTaskId, groupId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId); + int chunkId = + CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) + - chunkIdOffset; + if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { + int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); return CompositeEventRouteMetadata.create(1, idx, 0); } return null; @@ -86,10 +95,11 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, int destTaskId) throws Exception { - int groupId = - CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId); - if (grouper.isInGroup(srcTaskId, groupId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(groupId); + int chunkId = + CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) + - chunkIdOffset; + if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { + int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); return EventRouteMetadata.create(1, new int[] {idx}); } return null; @@ -97,9 +107,10 @@ class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManag @Override public int getNumDestinationTaskPhysicalInputs(int destTaskId) { - int groupId = - CartesianProductCombination.fromTaskId(numGroups, destTaskId).getCombination().get(positionId); - return grouper.getNumTasksInGroup(groupId); + int chunkId = + CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) + - chunkIdOffset; + return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java index a104904..857f11e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java @@ -30,6 +30,8 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,6 +47,13 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; * * Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be * determined by vertex manager. + * + * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast + * edge. + * + * Sources can be either vertices or vertex groups (only in unpartitioned case). + * + * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case. */ public class CartesianProductVertexManager extends VertexManagerPlugin { /** @@ -95,26 +104,37 @@ public class CartesianProductVertexManager extends VertexManagerPlugin { // check whether DAG and config are is consistent Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties(); Set<String> sourceVerticesDAG = edgePropertyMap.keySet(); - Set<String> sourceVerticesConfig = new HashSet<>(); - sourceVerticesConfig.addAll(config.getSourceVertices()); + Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices()); + + Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups(); + Map<String, String> vertexToGroup = new HashMap<>(); + for (Map.Entry<String, List<String>> group : vertexGroups.entrySet()) { + for (String vertex : group.getValue()) { + vertexToGroup.put(vertex, group.getKey()); + } + } for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) { String vertex = entry.getKey(); + String group = vertexToGroup.get(vertex); EdgeProperty edgeProperty = entry.getValue(); EdgeManagerPluginDescriptor empDescriptor = edgeProperty.getEdgeManagerDescriptor(); if (empDescriptor != null && empDescriptor.getClassName().equals(CartesianProductEdgeManager.class.getName())) { - Preconditions.checkArgument(sourceVerticesConfig.contains(vertex), + Preconditions.checkArgument( + sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group), vertex + " has CartesianProductEdgeManager but isn't in " + "CartesianProductVertexManagerConfig"); } else { - Preconditions.checkArgument(!sourceVerticesConfig.contains(vertex), + Preconditions.checkArgument( + !sourceVerticesConfig.contains(vertex) && !sourceVerticesConfig.contains(group), vertex + " has no CartesianProductEdgeManager but is in " + "CartesianProductVertexManagerConfig"); } if (edgeProperty.getDataMovementType() == CUSTOM) { - Preconditions.checkArgument(sourceVerticesConfig.contains(vertex), + Preconditions.checkArgument( + sourceVerticesConfig.contains(vertex) || sourceVerticesConfig.contains(group), "Only broadcast and cartesian product edges are allowed in cartesian product vertex"); } else { Preconditions.checkArgument(edgeProperty.getDataMovementType() == BROADCAST, @@ -122,14 +142,19 @@ public class CartesianProductVertexManager extends VertexManagerPlugin { } } - for (String vertex : sourceVerticesConfig) { - Preconditions.checkArgument(sourceVerticesDAG.contains(vertex), - vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG"); - Preconditions.checkArgument( - edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName() - .equals(CartesianProductEdgeManager.class.getName()), - vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " + - "CartesianProductEdgeManager"); + for (String src : sourceVerticesConfig) { + List<String> vertices = + vertexGroups.containsKey(src) ? vertexGroups.get(src) : Collections.singletonList(src); + for (String v : vertices) { + Preconditions.checkArgument( + sourceVerticesDAG.contains(v), + v + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG"); + Preconditions.checkArgument( + edgePropertyMap.get(v).getEdgeManagerDescriptor().getClassName() + .equals(CartesianProductEdgeManager.class.getName()), + v + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " + + "CartesianProductEdgeManager"); + } } vertexManagerReal = config.getIsPartitioned() http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java index f43f494..e082ec3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java @@ -28,32 +28,24 @@ import java.nio.ByteBuffer; import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; class CartesianProductVertexManagerConfig extends CartesianProductConfig { - private final float minFraction; - private final float maxFraction; - private final boolean enableAutoGrouping; - private final long desiredBytesPerGroup; + final float minFraction; + final float maxFraction; + final boolean enableAutoGrouping; + final long desiredBytesPerChunk; - public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices, + public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources, int[] numPartitions, float minFraction, float maxFraction, - boolean enableAutoGrouping, long desiredBytesPerGroup, + boolean enableAutoGrouping, long desiredBytesPerChunk, CartesianProductFilterDescriptor filterDescriptor) { - super(isPartitioned, numPartitions, sourceVertices, filterDescriptor); + super(isPartitioned, numPartitions, sources, filterDescriptor); Preconditions.checkArgument(minFraction <= maxFraction, "min fraction(" + minFraction + ") should be less than max fraction(" + maxFraction + ") in cartesian product slow start"); this.minFraction = minFraction; this.maxFraction = maxFraction; this.enableAutoGrouping = enableAutoGrouping; - this.desiredBytesPerGroup = desiredBytesPerGroup; - } - - public float getMinFraction() { - return minFraction; - } - - public float getMaxFraction() { - return maxFraction; + this.desiredBytesPerChunk = desiredBytesPerChunk; } public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload) @@ -62,8 +54,8 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig { CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); boolean isPartitioned = proto.getIsPartitioned(); - String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; - proto.getSourceVerticesList().toArray(sourceVertices); + String[] sources = new String[proto.getSourcesList().size()]; + proto.getSourcesList().toArray(sources); int[] numPartitions = proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() @@ -77,17 +69,9 @@ class CartesianProductVertexManagerConfig extends CartesianProductConfig { boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping() : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT; - long desiredBytesPerGroup = proto.hasDesiredBytesPerGroup() ? proto.getDesiredBytesPerGroup() + long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk() : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT; - return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions, + return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions, minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor); } - - public boolean isEnableAutoGrouping() { - return enableAutoGrouping; - } - - public long getDesiredBytesPerGroup() { - return desiredBytesPerGroup; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java index 85c04d2..ddff37d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java @@ -174,12 +174,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan // determine the destination task with largest id to schedule float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks; int numTaskToSchedule; - if (percentFinishedSrcTask < config.getMinFraction()) { + if (percentFinishedSrcTask < config.minFraction) { numTaskToSchedule = 0; - } else if (config.getMinFraction() <= percentFinishedSrcTask && - percentFinishedSrcTask <= config.getMaxFraction()) { - numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction()) - /(config.getMaxFraction()-config.getMinFraction())*parallelism); + } else if (config.minFraction <= percentFinishedSrcTask && + percentFinishedSrcTask <= config.maxFraction) { + numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction) + /(config.maxFraction-config.minFraction)*parallelism); } else { numTaskToSchedule = parallelism; } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java index 993cb40..46ea76e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java @@ -19,7 +19,6 @@ package org.apache.tez.runtime.library.cartesianproduct; import com.google.common.primitives.Ints; import com.google.protobuf.ByteString; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexManagerPluginContext; @@ -36,7 +35,6 @@ import org.slf4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -50,31 +48,132 @@ import java.util.Set; import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; +/** + * In unpartitioned case, we have one destination task for each source chunk combination. A source + * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping) + * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across + * vertices. The mapping from source chunk to destination task id is done by + * {@link <CartesianProductCombination>}. + * + * If auto grouping is enabled, this vertex manager will estimate output size of each source and + * group source tasks of each source in chunk according to desired grouping size configured by user. + * + * + */ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal { + /** + * a cartesian product source + */ + static class Source { + // list of source vertices of this source + List<SrcVertex> srcVertices = new ArrayList<>(); + // position of this source in all sources + int position; + // name of source vertex or vertex group + String name; + + // total number of chunks in this source + public int getNumChunk() { + int numChunk = 0; + for (SrcVertex srcV : srcVertices) { + numChunk += srcV.numChunk; + } + return numChunk; + } + + // whether this source has any task completed + public boolean hasTaskCompleted() { + for (SrcVertex srcV : srcVertices) { + if (!srcV.taskCompleted.isEmpty()) { + return true; + } + } + return false; + } + + public String toString(boolean afterReconfigure) { + StringBuilder sb = new StringBuilder(); + sb.append("Source at position "); + sb.append(position); + if (name != null) { + sb.append(", "); + sb.append("vertex group "); + sb.append(name); + + } + sb.append(": {"); + for (SrcVertex srcV : srcVertices) { + sb.append("["); + sb.append(srcV.toString(afterReconfigure)); + sb.append("], "); + } + sb.deleteCharAt(sb.length() - 1); + sb.setCharAt(sb.length() - 1, '}'); + return sb.toString(); + } + } + + /** + * a cartesian product source vertex + */ + class SrcVertex { + // which source this vertex belongs to + Source source; + // vertex name + String name; + int numTask; + // num chunks of this source vertex + int numChunk; + // offset of chunk id in vertex group + // we need sequence chunks in the vertex group to make them look like from single vertex + int chunkIdOffset = 0; + RoaringBitmap taskCompleted = new RoaringBitmap(); + RoaringBitmap taskWithVMEvent = new RoaringBitmap(); + long outputBytes; + + public void doGrouping() { + numChunk = numTask; + if (config.enableAutoGrouping) { + outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality(); + numChunk = Math.min(numChunk, + (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk)); + } + } + + public String toString(boolean afterReconfigure) { + StringBuilder sb = new StringBuilder(); + sb.append("vertex ").append(name).append(", "); + if (afterReconfigure) { + sb.append("estimated output ").append(outputBytes).append(" bytes, "); + sb.append(numChunk).append(" chunks"); + } else { + sb.append(numTask).append(" tasks, "); + sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, "); + sb.append("output ").append(outputBytes).append(" bytes"); + } + return sb.toString(); + } + } + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class); - List<String> sourceVertices; - private int parallelism = 1; + CartesianProductVertexManagerConfig config; + Map<String, Source> sourcesByName = new HashMap<>(); + Map<String, SrcVertex> srcVerticesByName = new HashMap<>(); + private boolean vertexReconfigured = false; private boolean vertexStarted = false; private boolean vertexStartSchedule = false; private int numCPSrcNotInConfigureState = 0; private int numBroadcastSrcNotInRunningState = 0; - private int[] numTasks; - private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>(); - private Map<String, RoaringBitmap> sourceTaskCompleted = new HashMap<>(); private RoaringBitmap scheduledTasks = new RoaringBitmap(); - private CartesianProductConfig config; /* auto reduce related */ - private int[] numGroups; + // num of chunks of source at the corresponding position in source list + private int[] numChunksPerSrc; private Set<String> vertexSentVME = new HashSet<>(); - private long[] vertexOutputBytes; - private int[] numVertexManagerEventsReceived; - private long desiredBytesPerGroup; - private boolean enableGrouping; private Grouper grouper = new Grouper(); public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) { @@ -83,29 +182,39 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM @Override public void initialize(CartesianProductVertexManagerConfig config) throws Exception { - sourceVertices = config.getSourceVertices(); - numTasks = new int[sourceVertices.size()]; - numGroups = new int[sourceVertices.size()]; - vertexOutputBytes = new long[sourceVertices.size()]; - numVertexManagerEventsReceived = new int[sourceVertices.size()]; - - enableGrouping = config.isEnableAutoGrouping(); - desiredBytesPerGroup = config.getDesiredBytesPerGroup(); - - for (String vertex : sourceVertices) { - sourceTaskCompleted.put(vertex, new RoaringBitmap()); - } - - for (String vertex : getContext().getInputVertexEdgeProperties().keySet()) { - if (sourceVertices.indexOf(vertex) != -1) { - sourceTaskCompleted.put(vertex, new RoaringBitmap()); - getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED)); + for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) { + if (e.getValue().getDataMovementType() == CUSTOM + && e.getValue().getEdgeManagerDescriptor().getClassName() + .equals(CartesianProductEdgeManager.class.getName())) { + srcVerticesByName.put(e.getKey(), new SrcVertex()); + srcVerticesByName.get(e.getKey()).name = e.getKey(); + getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED)); numCPSrcNotInConfigureState++; } else { - getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.RUNNING)); + getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING)); numBroadcastSrcNotInRunningState++; } } + + Map<String, List<String>> srcGroups = getContext().getInputVertexGroups(); + for (int i = 0; i < config.getSourceVertices().size(); i++) { + String srcName = config.getSourceVertices().get(i); + Source source = new Source(); + source.position = i; + if (srcGroups.containsKey(srcName)) { + source.name = srcName; + for (String srcVName : srcGroups.get(srcName)) { + source.srcVertices.add(srcVerticesByName.get(srcVName)); + srcVerticesByName.get(srcVName).source = source; + } + } else { + source.srcVertices.add(srcVerticesByName.get(srcName)); + srcVerticesByName.get(srcName).source = source; + } + sourcesByName.put(srcName, source); + } + + numChunksPerSrc = new int[sourcesByName.size()]; this.config = config; getContext().vertexReconfigurationPlanned(); } @@ -128,7 +237,7 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM VertexState state = stateUpdate.getVertexState(); if (state == VertexState.CONFIGURED) { - numTasks[sourceVertices.indexOf(vertex)] = getContext().getVertexNumTasks(vertex); + srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex); numCPSrcNotInConfigureState--; } else if (state == VertexState.RUNNING) { numBroadcastSrcNotInRunningState--; @@ -145,22 +254,20 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) { int taskId = attempt.getTaskIdentifier().getIdentifier(); String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); - if (sourceVertices.indexOf(vertex) == -1) { - return; - } - if (sourceTaskCompleted.get(vertex).contains(taskId)) { - return; + SrcVertex srcV = srcVerticesByName.get(vertex); + if (srcV != null && !srcV.taskCompleted.contains(taskId)) { + srcV.taskCompleted.add(taskId); + completedSrcTaskToProcess.add(attempt); } - sourceTaskCompleted.get(vertex).add(taskId); - completedSrcTaskToProcess.add(attempt); } private boolean tryStartSchedule() { if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) { return false; } - for (RoaringBitmap bitmap: sourceTaskCompleted.values()) { - if (bitmap.isEmpty()) { + + for (Source src : sourcesByName.values()) { + if (!src.hasTaskCompleted()) { return false; } } @@ -178,15 +285,17 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM if (vmEvent.getUserPayload() != null) { String srcVertex = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName(); - int position = sourceVertices.indexOf(srcVertex); + SrcVertex srcV = srcVerticesByName.get(srcVertex); + // vmEvent from non-cp vertex doesn't matter - if (position == -1) { + if (srcV == null) { return; } + VertexManagerEventPayloadProto proto = VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload())); - vertexOutputBytes[position] += proto.getOutputSize(); - numVertexManagerEventsReceived[position]++; + srcV.outputBytes += proto.getOutputSize(); + srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier()); vertexSentVME.add(srcVertex); } @@ -197,60 +306,65 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM if (numCPSrcNotInConfigureState > 0) { return false; } - if (enableGrouping) { - if (vertexSentVME.size() != sourceVertices.size()) { + if (config.enableAutoGrouping) { + if (vertexSentVME.size() != srcVerticesByName.size()) { return false; } - for (int i = 0; i < vertexOutputBytes.length; i++) { - if (vertexOutputBytes[i] < desiredBytesPerGroup - && numVertexManagerEventsReceived[i] < numTasks[i]) { + // every src v must output at least one chunk size + for (SrcVertex srcV : srcVerticesByName.values()) { + if (srcV.outputBytes < config.desiredBytesPerChunk + && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) { return false; } } } - LOG.info("Start reconfigure, grouping: " + enableGrouping - + ", group size: " + desiredBytesPerGroup); - LOG.info("src vertices: " + sourceVertices); - LOG.info("number of source tasks in each src: " + Arrays.toString(numTasks)); - LOG.info("number of vmEvent from each src: " - + Arrays.toString(numVertexManagerEventsReceived)); - LOG.info("output stats of each src: " + Arrays.toString(vertexOutputBytes)); - - for (int i = 0; i < numTasks.length; i++) { - if (enableGrouping) { - vertexOutputBytes[i] = - vertexOutputBytes[i] * numTasks[i] / numVertexManagerEventsReceived[i]; - int desiredNumGroup = - (int) ((vertexOutputBytes[i] + desiredBytesPerGroup - 1) / desiredBytesPerGroup); - numGroups[i] = Math.min(numTasks[i], desiredNumGroup); - } else { - numGroups[i] = numTasks[i]; + LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping + + ", chunk size: " + config.desiredBytesPerChunk + " bytes."); + for (String srcName : config.getSourceVertices()) { + LOG.info(sourcesByName.get(srcName).toString(false)); + } + + for (Source src : sourcesByName.values()) { + for (int i = 0; i < src.srcVertices.size(); i++) { + src.srcVertices.get(i).doGrouping(); + if (i > 0) { + src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk; + } } - parallelism *= numGroups[i]; + numChunksPerSrc[src.position] = src.getNumChunk(); } - LOG.info("estimated output size of each src: " + Arrays.toString(vertexOutputBytes)); - LOG.info("number of groups for each src: " + Arrays.toString(numGroups)); + int parallelism = 1; + for (Source src : sourcesByName.values()) { + parallelism *= src.getNumChunk(); + } + + LOG.info("After reconfigure, "); + for (String srcName : config.getSourceVertices()) { + LOG.info(sourcesByName.get(srcName).toString(true)); + } LOG.info("Final parallelism: " + parallelism); - UserPayload payload = null; + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + for (int i = 0; i < numChunksPerSrc.length; i++) { + numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk(); + } + builder.setIsPartitioned(false).addAllSources(config.getSourceVertices()) + .addAllNumChunks(Ints.asList(this.numChunksPerSrc)); + Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties(); Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator(); while (iter.hasNext()) { - EdgeProperty edgeProperty = iter.next().getValue(); - if (edgeProperty.getDataMovementType() != CUSTOM) { + Map.Entry<String, EdgeProperty> e = iter.next(); + if (e.getValue().getDataMovementType() != CUSTOM) { iter.remove(); - continue; - } - EdgeManagerPluginDescriptor descriptor = edgeProperty.getEdgeManagerDescriptor(); - if (payload == null) { - CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); - builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks)) - .addAllNumGroups(Ints.asList(numGroups)).addAllSourceVertices(config.getSourceVertices()); - payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); + } else { + SrcVertex srcV = srcVerticesByName.get(e.getKey()); + builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset); + e.getValue().getEdgeManagerDescriptor() + .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()))); } - descriptor.setUserPayload(payload); } getContext().reconfigureVertex(parallelism, null, edgeProperties); vertexReconfigured = true; @@ -267,32 +381,42 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM } while (!completedSrcTaskToProcess.isEmpty()) { - scheduledTasksDependOnCompletion(completedSrcTaskToProcess.poll()); + scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll()); } } - private void scheduledTasksDependOnCompletion(TaskAttemptIdentifier attempt) { + private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) { int taskId = attempt.getTaskIdentifier().getIdentifier(); String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); - int position = sourceVertices.indexOf(vertex); + SrcVertex srcV = srcVerticesByName.get(vertex); + Source src = srcV.source; List<ScheduleTaskRequest> requests = new ArrayList<>(); CartesianProductCombination combination = - new CartesianProductCombination(numGroups, position); - grouper.init(numTasks[position], numGroups[position]); - combination.firstTaskWithFixedPartition(grouper.getGroupId(taskId)); + new CartesianProductCombination(numChunksPerSrc, src.position); + grouper.init(srcV.numTask, srcV.numChunk); + combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset); do { List<Integer> list = combination.getCombination(); - if (scheduledTasks.contains(combination.getTaskId())) { + if (scheduledTasks.contains(combination.getChunkId())) { continue; } boolean readyToSchedule = true; for (int i = 0; i < list.size(); i++) { - int group = list.get(i); - grouper.init(numTasks[i], numGroups[i]); - for (int j = grouper.getFirstTaskInGroup(group); j <= grouper.getLastTaskInGroup(group); j++) { - if (!sourceTaskCompleted.get(sourceVertices.get(i)).contains(j)) { + int chunkId = list.get(i); + SrcVertex srcVHasGroup = null; + for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) { + if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) { + srcVHasGroup = v; + break; + } + } + assert srcVHasGroup != null; + grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk); + chunkId -= srcVHasGroup.chunkIdOffset; + for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) { + if (!srcVHasGroup.taskCompleted.contains(j)) { readyToSchedule = false; break; } @@ -303,10 +427,10 @@ class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexM } if (readyToSchedule) { - requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null)); - scheduledTasks.add(combination.getTaskId()); + requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null)); + scheduledTasks.add(combination.getChunkId()); } - } while (combination.nextTaskWithFixedPartition()); + } while (combination.nextTaskWithFixedChunk()); if (!requests.isEmpty()) { getContext().scheduleTasks(requests); } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/main/proto/CartesianProductPayload.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto index dd7d06f..cb503ea 100644 --- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto +++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto @@ -21,14 +21,15 @@ option java_outer_classname = "CartesianProductUserPayload"; message CartesianProductConfigProto { required bool isPartitioned = 1; - repeated string sourceVertices = 2; + repeated string sources = 2; repeated int32 numPartitions = 3; optional string filterClassName = 4; optional bytes filterUserPayload = 5; optional float minFraction = 6; optional float maxFraction = 7; optional bool enableAutoGrouping = 8; - optional int64 desiredBytesPerGroup = 9; - repeated int32 numTasks = 10; - repeated int32 numGroups = 11; + optional int64 desiredBytesPerChunk = 9; + repeated int32 numChunks = 10; + optional int32 numChunk = 11; + optional int32 chunkIdOffset = 12; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java index 9a5614e..439d650 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerUtils.java @@ -331,6 +331,11 @@ public class TestShuffleVertexManagerUtils { public int getDestinationVertexNumTasks() { return numTasks; } + + @Override + public String getVertexGroupName() { + return null; + } }; if (newEdgeManagers != null) { EdgeManagerPlugin edgeManager = ReflectionUtils http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java index 06d3e90..3755ac8 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java @@ -30,44 +30,44 @@ import static org.junit.Assert.assertTrue; public class TestCartesianProductCombination { private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) { assertArrayEquals(result, Ints.toArray(combination.getCombination())); - assertEquals(taskId, combination.getTaskId()); + assertEquals(taskId, combination.getChunkId()); } private void testCombinationTwoWayVertex0() { CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 0); - combination.firstTaskWithFixedPartition(1); + combination.firstTaskWithFixedChunk(1); verifyCombination(combination, new int[]{1,0}, 3); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{1,1}, 4); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{1,2}, 5); - assertFalse(combination.nextTaskWithFixedPartition()); + assertFalse(combination.nextTaskWithFixedChunk()); } private void testCombinationTwoWayVertex1() { CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,3}, 1); - combination.firstTaskWithFixedPartition(1); + combination.firstTaskWithFixedChunk(1); verifyCombination(combination, new int[]{0,1}, 1); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{1,1}, 4); - assertFalse(combination.nextTaskWithFixedPartition()); + assertFalse(combination.nextTaskWithFixedChunk()); } private void testCombinationThreeWay() { CartesianProductCombination combination = new CartesianProductCombination(new int[]{2,2,2}, 1); - combination.firstTaskWithFixedPartition(1); + combination.firstTaskWithFixedChunk(1); verifyCombination(combination, new int[]{0,1,0}, 2); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{0,1,1}, 3); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{1,1,0}, 6); - assertTrue(combination.nextTaskWithFixedPartition()); + assertTrue(combination.nextTaskWithFixedChunk()); verifyCombination(combination, new int[]{1,1,1}, 7); - assertFalse(combination.nextTaskWithFixedPartition()); + assertFalse(combination.nextTaskWithFixedChunk()); } @Test(timeout = 5000) @@ -110,9 +110,9 @@ public class TestCartesianProductCombination { @Test(timeout = 5000) public void testRejectZero() { - int[] numTasks = new int[] {0 ,1}; + int[] numChunk = new int[] {0 ,1}; try { - new CartesianProductCombination(numTasks); + new CartesianProductCombination(numChunk); assertTrue(false); } catch (Exception ignored) {} } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java index c9e49a3..4857749 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java @@ -122,15 +122,15 @@ public class TestCartesianProductConfig { // auto grouping conf not set CartesianProductConfigProto proto = config.toProto(conf); assertFalse(proto.hasEnableAutoGrouping()); - assertFalse(proto.hasDesiredBytesPerGroup()); + assertFalse(proto.hasDesiredBytesPerChunk()); // auto groupinig conf not set conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true); conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000); proto = config.toProto(conf); assertTrue(proto.hasEnableAutoGrouping()); - assertTrue(proto.hasDesiredBytesPerGroup()); + assertTrue(proto.hasDesiredBytesPerChunk()); assertEquals(true, proto.getEnableAutoGrouping()); - assertEquals(1000, proto.getDesiredBytesPerGroup()); + assertEquals(1000, proto.getDesiredBytesPerChunk()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java index 12aee3b..d722932 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java @@ -40,7 +40,7 @@ public class TestCartesianProductEdgeManager { // partitioned case CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); builder.setIsPartitioned(true) - .addAllSourceVertices(Arrays.asList("v0", "v1")) + .addAllSources(Arrays.asList("v0", "v1")) .addAllNumPartitions(Ints.asList(2,3)); UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); when(context.getUserPayload()).thenReturn(payload); @@ -51,8 +51,8 @@ public class TestCartesianProductEdgeManager { // unpartitioned case builder.clear(); builder.setIsPartitioned(false) - .addAllSourceVertices(Arrays.asList("v0", "v1")) - .addAllNumTasks(Ints.asList(2,3)); + .addAllSources(Arrays.asList("v0", "v1")) + .addAllNumChunks(Ints.asList(2,3)); payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); when(context.getUserPayload()).thenReturn(payload); when(context.getSourceVertexNumTasks()).thenReturn(2); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java index 9f6fa09..3ba6aad 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java @@ -28,23 +28,26 @@ import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; public class TestCartesianProductEdgeManagerConfig { @Test(timeout = 5000) - public void testAutoGroupingConfig() throws IOException { + public void testUnpartitionedAutoGroupingConfig() throws IOException { List<String> sourceVertices = new ArrayList<>(); sourceVertices.add("v0"); sourceVertices.add("v1"); - int[] numTasks = new int[] {4, 5}; - int[] numGroups = new int[] {2, 3}; + int[] numChunkPerSrc = new int[] {2, 3}; + int numGroup = 3, chunkIdOffset = 0; CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); - builder.setIsPartitioned(false).addAllNumTasks(Ints.asList(numTasks)) - .addAllSourceVertices(sourceVertices).addAllNumGroups(Ints.asList(numGroups)); + builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc)) + .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset); UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload(payload); - assertArrayEquals(numGroups, config.getNumGroups()); + assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc); + assertEquals(numGroup, config.numChunk); + assertEquals(chunkIdOffset, config.chunkIdOffset); } }
