TEZ-3708. Improve parallelism and auto grouping of unpartitioned cartesian product (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a55fe80b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a55fe80b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a55fe80b Branch: refs/heads/master Commit: a55fe80bfa7beef646c95ca73a844ffc7cae999c Parents: dec7c1b Author: Zhiyuan Yang <[email protected]> Authored: Thu May 11 15:19:39 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Thu May 11 15:19:39 2017 -0700 ---------------------------------------------------------------------- .../CartesianProductCombination.java | 4 +- .../CartesianProductConfig.java | 45 +- .../CartesianProductEdgeManager.java | 11 +- .../CartesianProductEdgeManagerConfig.java | 67 --- .../CartesianProductEdgeManagerPartitioned.java | 31 +- .../CartesianProductEdgeManagerReal.java | 3 +- ...artesianProductEdgeManagerUnpartitioned.java | 125 ----- .../CartesianProductVertexManager.java | 64 ++- .../CartesianProductVertexManagerConfig.java | 77 --- ...artesianProductVertexManagerPartitioned.java | 38 +- .../CartesianProductVertexManagerReal.java | 3 +- ...tesianProductVertexManagerUnpartitioned.java | 438 --------------- .../FairCartesianProductEdgeManager.java | 174 ++++++ .../FairCartesianProductVertexManager.java | 551 +++++++++++++++++++ .../partitioner/RoundRobinPartitioner.java | 30 + .../tez/runtime/library/utils/Grouper.java | 66 +-- .../main/proto/CartesianProductPayload.proto | 11 +- .../TestCartesianProductCombination.java | 2 +- .../TestCartesianProductConfig.java | 37 +- .../TestCartesianProductEdgeManager.java | 8 +- .../TestCartesianProductEdgeManagerConfig.java | 53 -- ...tCartesianProductEdgeManagerPartitioned.java | 77 +-- ...artesianProductEdgeManagerUnpartitioned.java | 288 ---------- .../TestCartesianProductVertexManager.java | 2 +- ...TestCartesianProductVertexManagerConfig.java | 53 -- ...artesianProductVertexManagerPartitioned.java | 26 +- ...tesianProductVertexManagerUnpartitioned.java | 460 ---------------- .../TestFairCartesianProductEdgeManager.java | 245 +++++++++ .../TestFairCartesianProductVertexManager.java | 500 +++++++++++++++++ .../library/cartesianproduct/TestGrouper.java | 36 +- .../mapreduce/examples/CartesianProduct.java | 385 +++++++++++++ .../tez/mapreduce/examples/ExampleDriver.java | 2 + .../org/apache/tez/test/TestFaultTolerance.java | 5 +- .../java/org/apache/tez/test/TestOutput.java | 11 + .../java/org/apache/tez/test/TestTezJobs.java | 13 + 35 files changed, 2191 insertions(+), 1750 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java index 97f3eb2..8de8a02 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java @@ -143,9 +143,9 @@ class CartesianProductCombination { } /** - * @return corresponding chunk id for current combination + * @return corresponding task id for current combination */ - public int getChunkId() { + public int getTaskId() { int chunkId = 0; for (int i = 0; i < combination.length; i++) { chunkId += combination[i]*factor[i]; http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java index 12a17cb..7aac1d7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java @@ -48,12 +48,12 @@ public class CartesianProductConfig { private final boolean isPartitioned; private final String[] sources; // numPartition[i] means how many partitions sourceVertices[i] will generate - // (not used in unpartitioned case) + // (not used in fair cartesian product) private final int[] numPartitions; private final CartesianProductFilterDescriptor filterDescriptor; /** - * create config for unpartitioned case + * create config for fair cartesian product * @param sources list of names of source vertices or vertex groups */ public CartesianProductConfig(List<String> sources) { @@ -84,7 +84,7 @@ public class CartesianProductConfig { CartesianProductFilterDescriptor filterDescriptor) { Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null"); Preconditions.checkArgument(vertexPartitionMap.size() > 1, - "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size()); + "there must be more than 1 source vertices, currently only " + vertexPartitionMap.size()); this.isPartitioned = true; this.numPartitions = new int[vertexPartitionMap.size()]; @@ -151,7 +151,7 @@ public class CartesianProductConfig { "every source has 1 partition in a partitioned case"); } else { Preconditions.checkArgument(this.numPartitions == null, - "partition counts should be null in unpartitioned case"); + "partition counts should be null in fair cartesian product"); } } @@ -202,11 +202,6 @@ public class CartesianProductConfig { } } - builder.setMinFraction( - CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT); - builder.setMaxFraction( - CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT); - if (conf != null) { builder.setMinFraction(conf.getFloat( CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION, @@ -214,20 +209,36 @@ public class CartesianProductConfig { builder.setMaxFraction(conf.getFloat( CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION, CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT)); - String enableAutoGrouping = - conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING); - if (enableAutoGrouping != null) { - builder.setEnableAutoGrouping(Boolean.parseBoolean(enableAutoGrouping)); + builder.setMaxParallelism(conf.getInt( + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT)); + builder.setMinOpsPerWorker(conf.getLong( + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER, + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT)); + builder.setEnableGrouping(conf.getBoolean( + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING, + CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT)); + if (conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION) != null) { + builder.setGroupingFraction(Float.parseFloat( + conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION))); + Preconditions.checkArgument(0 < builder.getGroupingFraction() && + builder.getGroupingFraction() <= 1, "grouping fraction should be larger than 0 and less" + + " or equal to 1, current value: " + builder.getGroupingFraction()); } - String desiredBytesPerGroup = - conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP); - if (desiredBytesPerGroup != null) { - builder.setDesiredBytesPerChunk(Long.parseLong(desiredBytesPerGroup)); + if (conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS) != null) { + builder.setNumPartitionsForFairCase(Integer.parseInt( + conf.get(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS))); + Preconditions.checkArgument(builder.getNumPartitionsForFairCase() > 0, + "Number of partitions for fair cartesian product should be positive integer"); } } Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(), "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" + builder.getMaxFraction() + ") in cartesian product slow start"); + Preconditions.checkArgument(builder.getMaxParallelism() > 0, + "max parallelism must be positive, currently is " + builder.getMaxParallelism()); + Preconditions.checkArgument(builder.getMinOpsPerWorker() > 0, + "Min ops per worker must be positive, currently is " + builder.getMinOpsPerWorker()); return builder.build(); } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java index 1dbe6bf..a406c1b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java @@ -19,14 +19,17 @@ package org.apache.tez.runtime.library.cartesianproduct; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; import javax.annotation.Nullable; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; + /** * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or - * unpartitioned implementation according to the config. All method invocations are actually + * fair implementation according to the config. All method invocations are actually * redirected to real implementation. */ public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand { @@ -39,12 +42,12 @@ public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand { @Override public void initialize() throws Exception { Preconditions.checkArgument(getContext().getUserPayload() != null); - CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload( - getContext().getUserPayload()); + CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom( + ByteString.copyFrom(getContext().getUserPayload().getPayload())); // no need to check config because config comes from VM and is already checked by VM edgeManagerReal = config.getIsPartitioned() ? new CartesianProductEdgeManagerPartitioned(getContext()) - : new CartesianProductEdgeManagerUnpartitioned(getContext()); + : new FairCartesianProductEdgeManager(getContext()); edgeManagerReal.initialize(config); } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java deleted file mode 100644 index df0bcfa..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import com.google.common.primitives.Ints; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.tez.dag.api.UserPayload; - -import java.nio.ByteBuffer; - -import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; - -class CartesianProductEdgeManagerConfig extends CartesianProductConfig { - final int[] numChunksPerSrc; - final int numChunk; - final int chunkIdOffset; - - protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices, - int[] numPartitions, int[] numChunksPerSrc, int numChunk, - int chunkIdOffset, - CartesianProductFilterDescriptor filterDescriptor) { - super(isPartitioned, numPartitions, sourceVertices, filterDescriptor); - this.numChunksPerSrc = numChunksPerSrc; - this.numChunk = numChunk; - this.chunkIdOffset = chunkIdOffset; - } - - public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload) - throws InvalidProtocolBufferException { - CartesianProductConfigProto proto = - CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); - - boolean isPartitioned = proto.getIsPartitioned(); - String[] sources = new String[proto.getSourcesList().size()]; - proto.getSourcesList().toArray(sources); - int[] numPartitions = - proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); - CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() - ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null; - if (proto.hasFilterUserPayload()) { - filterDescriptor.setUserPayload( - UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); - } - int[] humChunksPerSrc = - proto.getNumChunksCount() == 0 ? null : Ints.toArray(proto.getNumChunksList()); - int numChunk = proto.getNumChunk(); - int chunkIdOffset = proto.getChunkIdOffset(); - return new CartesianProductEdgeManagerConfig(isPartitioned, sources, numPartitions, - humChunksPerSrc, numChunk, chunkIdOffset, filterDescriptor); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java index 5ece5cf..5f2910a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java @@ -22,34 +22,44 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; +import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.UserPayload; import javax.annotation.Nullable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; + class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal { private int positionId; private CartesianProductFilter filter; private int[] taskIdMapping; - private CartesianProductEdgeManagerConfig config; private int[] numPartitions; + private List<String> sources; public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) { super(context); } @Override - public void initialize(CartesianProductEdgeManagerConfig config) throws Exception { - this.config = config; - this.numPartitions = Ints.toArray(config.getNumPartitions()); - positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName()); - CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor(); - if (filterDescriptor != null) { - filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(), - new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()}); + public void initialize(CartesianProductConfigProto config) throws Exception { + this.numPartitions = Ints.toArray(config.getNumPartitionsList()); + this.sources = config.getSourcesList(); + this.positionId = sources.indexOf(getContext().getSourceVertexName()); + + if (config.hasFilterClassName()) { + UserPayload userPayload = config.hasFilterUserPayload() + ? UserPayload.create(ByteBuffer.wrap(config.getFilterUserPayload().toByteArray())) : null; + try { + filter = ReflectionUtils.createClazzInstance(config.getFilterClassName(), + new Class[]{UserPayload.class}, new UserPayload[]{userPayload}); + } catch (TezReflectionException e) { + throw e; + } } generateTaskIdMapping(); } @@ -107,13 +117,12 @@ class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManager CartesianProductCombination combination = new CartesianProductCombination(numPartitions); combination.firstTask(); - List<String> sources = config.getSourceVertices(); do { for (int i = 0; i < sources.size(); i++) { vertexPartitionMap.put(sources.get(i), combination.getCombination().get(i)); } if (filter == null || filter.isValidCombination(vertexPartitionMap)) { - idealTaskId.add(combination.getChunkId()); + idealTaskId.add(combination.getTaskId()); } } while (combination.nextTask()); this.taskIdMapping = Ints.toArray(idealTaskId); http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java index f22035b..0b91ec2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java @@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.cartesianproduct; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; /** * base class of cartesian product edge manager implementation @@ -35,7 +36,7 @@ abstract class CartesianProductEdgeManagerReal { return this.context; } - public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception; + public abstract void initialize(CartesianProductConfigProto config) throws Exception; public void prepareForRouting() throws Exception {} http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java deleted file mode 100644 index 80d7dc1..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import org.apache.tez.dag.api.EdgeManagerPluginContext; -import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; -import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; -import org.apache.tez.runtime.library.utils.Grouper; - -import javax.annotation.Nullable; - - -class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal { - private int positionId; - private int numChunk; - private int chunkIdOffset; - private int[] numChunkPerSrc; - private int numDestinationConsumerTasks; - private Grouper grouper = new Grouper(); - - public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) { - super(context); - } - - public void initialize(CartesianProductEdgeManagerConfig config) { - String groupName = getContext().getVertexGroupName(); - String srcName = groupName != null ? groupName : getContext().getSourceVertexName(); - this.positionId = config.getSourceVertices().indexOf(srcName); - this.numChunkPerSrc = config.numChunksPerSrc; - this.numChunk = config.numChunk; - this.chunkIdOffset = config.chunkIdOffset; - - if (numChunk != 0) { - grouper.init(getContext().getSourceVertexNumTasks(), numChunk); - numDestinationConsumerTasks = 1; - for (int numGroup : numChunkPerSrc) { - numDestinationConsumerTasks *= numGroup; - } - numDestinationConsumerTasks /= numChunkPerSrc[positionId]; - } - } - - @Override - public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { - return failedInputId + grouper.getFirstTaskInGroup( - CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) - - chunkIdOffset); - } - - @Override - public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId, - int destTaskId) throws Exception { - int chunkId = - CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) - - chunkIdOffset; - if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); - return EventRouteMetadata.create(1, new int[] {idx}); - } - return null; - } - - @Nullable - @Override - public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, - int destTaskId) - throws Exception { - int chunkId = - CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) - - chunkIdOffset; - if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); - return CompositeEventRouteMetadata.create(1, idx, 0); - } - return null; - } - - @Nullable - @Override - public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, - int destTaskId) - throws Exception { - int chunkId = - CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) - - chunkIdOffset; - if (0 <= chunkId && chunkId < numChunk && grouper.isInGroup(srcTaskId, chunkId)) { - int idx = srcTaskId - grouper.getFirstTaskInGroup(chunkId); - return EventRouteMetadata.create(1, new int[] {idx}); - } - return null; - } - - @Override - public int getNumDestinationTaskPhysicalInputs(int destTaskId) { - int chunkId = - CartesianProductCombination.fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionId) - - chunkIdOffset; - return 0 <= chunkId && chunkId < numChunk ? grouper.getNumTasksInGroup(chunkId) : 0; - } - - @Override - public int getNumSourceTaskPhysicalOutputs(int srcTaskId) { - return 1; - } - - @Override - public int getNumDestinationConsumerTasks(int sourceTaskIndex) { - return numDestinationConsumerTasks; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java index 857f11e..ff22593 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.cartesianproduct; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; @@ -39,10 +40,11 @@ import java.util.Set; import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST; import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; /** * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or - * unpartitioned implementation according to the config. All method invocations are actually + * fair implementation according to the config. All method invocations are actually * redirected to real implementation. * * Predefined parallelism isn't allowed for cartesian product vertex. Parallellism has to be @@ -51,9 +53,9 @@ import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; * If a vertex use this vertex, its input edges must be either cartesian product edge or broadcast * edge. * - * Sources can be either vertices or vertex groups (only in unpartitioned case). + * Sources can be either vertices or vertex groups (only in fair cartesian product). * - * Slow start only works in partitioned case. Auto grouping only works in unpartitioned case. + * Slow start only works in partitioned case. */ public class CartesianProductVertexManager extends VertexManagerPlugin { /** @@ -72,22 +74,46 @@ public class CartesianProductVertexManager extends VertexManagerPlugin { public static final float TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f; /** - * Enables automatic grouping. It groups source tasks of each cartesian product source vertex - * so that every group generates similar output size. And parallelism can be reduced because - * destination tasks handle combinations of per group output instead of per task output. This is - * only available for unpartitioned case for now, and it's useful for scenarios where there are - * many source tasks generate small outputs. + * Num partitions as int value, for fair cartesian product only. + * Set this if auto determined num partition is not large enough */ - public static final String TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING = - "tez.cartesian-product.enable-auto-grouping"; - public static final boolean TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT = false; + public static final String TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS = + "tez.cartesian-product.num-partitions"; /** - * The number of output bytes we want from each group. + * Whether to disable grouping in fair cartesian product + * If this is set to true, it's best to set "tez.cartesian-product.num-partitions" to 1 to avoid + * unnecessary overhead caused by multiple partitions. */ - public static final String TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP = - "tez.cartesian-product.desired-input-per-src"; - public static final long TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT = 32 * 1024 * 1024; + public static final String TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING = + "tez.cartesian-product.disable-grouping"; + public static final boolean TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT = true; + + /** + * If every source vertex has this percents of tasks completed and generate some output, + * we can begin auto grouping. + * + * Positive float value, max 1. + * If not set, auto grouping will begin once every source vertex generate enough output + */ + public static final String TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION = + "tez.cartesian-product.grouping-fraction"; + + /** + * Max parallelism, for fair cartesian product only. + * This is used to avoid get too many tasks. The value must be positive. + */ + public static final String TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM = + "tez.cartesian-product.max-parallelism"; + public static final int TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT = 1000; + + /** + * Min cartesian product operations per worker, for fair cartesian product only. + * This is used to avoid a task gets too small workload. The value must be positive. + */ + public static final String TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER = + "tez.cartesian-product.min-ops-per-worker"; + public static final long TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT = 1000000; private CartesianProductVertexManagerReal vertexManagerReal = null; @@ -99,12 +125,12 @@ public class CartesianProductVertexManager extends VertexManagerPlugin { @Override public void initialize() throws Exception { - CartesianProductVertexManagerConfig config = - CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload()); + CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom( + ByteString.copyFrom(getContext().getUserPayload().getPayload())); // check whether DAG and config are is consistent Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties(); Set<String> sourceVerticesDAG = edgePropertyMap.keySet(); - Set<String> sourceVerticesConfig = new HashSet<>(config.getSourceVertices()); + Set<String> sourceVerticesConfig = new HashSet<>(config.getSourcesList()); Map<String, List<String>> vertexGroups = getContext().getInputVertexGroups(); Map<String, String> vertexToGroup = new HashMap<>(); @@ -159,7 +185,7 @@ public class CartesianProductVertexManager extends VertexManagerPlugin { vertexManagerReal = config.getIsPartitioned() ? new CartesianProductVertexManagerPartitioned(getContext()) - : new CartesianProductVertexManagerUnpartitioned(getContext()); + : new FairCartesianProductVertexManager(getContext()); vertexManagerReal.initialize(config); } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java deleted file mode 100644 index e082ec3..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.tez.dag.api.UserPayload; - -import java.nio.ByteBuffer; - -import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; - -class CartesianProductVertexManagerConfig extends CartesianProductConfig { - final float minFraction; - final float maxFraction; - final boolean enableAutoGrouping; - final long desiredBytesPerChunk; - - public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sources, - int[] numPartitions, - float minFraction, float maxFraction, - boolean enableAutoGrouping, long desiredBytesPerChunk, - CartesianProductFilterDescriptor filterDescriptor) { - super(isPartitioned, numPartitions, sources, filterDescriptor); - Preconditions.checkArgument(minFraction <= maxFraction, - "min fraction(" + minFraction + ") should be less than max fraction(" + - maxFraction + ") in cartesian product slow start"); - this.minFraction = minFraction; - this.maxFraction = maxFraction; - this.enableAutoGrouping = enableAutoGrouping; - this.desiredBytesPerChunk = desiredBytesPerChunk; - } - - public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload) - throws InvalidProtocolBufferException { - CartesianProductConfigProto proto = - CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); - - boolean isPartitioned = proto.getIsPartitioned(); - String[] sources = new String[proto.getSourcesList().size()]; - proto.getSourcesList().toArray(sources); - int[] numPartitions = - proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); - CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() - ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null; - if (proto.hasFilterUserPayload()) { - filterDescriptor.setUserPayload( - UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); - } - float minFraction = proto.getMinFraction(); - float maxFraction = proto.getMaxFraction(); - - boolean enableAutoGrouping = proto.hasEnableAutoGrouping() ? proto.getEnableAutoGrouping() - : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT; - long desiredBytesPerGroup = proto.hasDesiredBytesPerChunk() ? proto.getDesiredBytesPerChunk() - : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT; - return new CartesianProductVertexManagerConfig(isPartitioned, sources, numPartitions, - minFraction, maxFraction, enableAutoGrouping, desiredBytesPerGroup, filterDescriptor); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java index ddff37d..e4aaad6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java @@ -27,10 +27,12 @@ import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; import java.util.EnumSet; @@ -43,8 +45,9 @@ import java.util.Map; * min fraction and schedules all task when max fraction is reached */ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal { - private CartesianProductVertexManagerConfig config; private List<String> sourceVertices; + private int[] numPartitions; + private float minFraction, maxFraction; private int parallelism = 0; private boolean vertexStarted = false; private boolean vertexReconfigured = false; @@ -64,19 +67,26 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan } @Override - public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException { - this.config = config; - this.sourceVertices = config.getSourceVertices(); - CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor(); - if (filterDescriptor != null) { + public void initialize(CartesianProductConfigProto config) throws TezReflectionException { + this.sourceVertices = config.getSourcesList(); + this.numPartitions = Ints.toArray(config.getNumPartitionsList()); + this.minFraction = config.hasMinFraction() ? config.getMinFraction() + : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT; + this.maxFraction = config.hasMaxFraction() ? config.getMaxFraction() + : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT; + + if (config.hasFilterClassName()) { + UserPayload userPayload = config.hasFilterUserPayload() + ? UserPayload.create(ByteBuffer.wrap(config.getFilterUserPayload().toByteArray())) : null; try { - filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(), - new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()}); + filter = ReflectionUtils.createClazzInstance(config.getFilterClassName(), + new Class[]{UserPayload.class}, new UserPayload[]{userPayload}); } catch (TezReflectionException e) { LOG.error("Creating filter failed"); throw e; } } + for (String sourceVertex : sourceVertices) { sourceTaskCompleted.put(sourceVertex, new BitSet()); } @@ -147,7 +157,7 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan Map<String, Integer> vertexPartitionMap = new HashMap<>(); CartesianProductCombination combination = - new CartesianProductCombination(Ints.toArray(config.getNumPartitions())); + new CartesianProductCombination(numPartitions); combination.firstTask(); do { for (int i = 0; i < sourceVertices.size(); i++) { @@ -174,12 +184,12 @@ class CartesianProductVertexManagerPartitioned extends CartesianProductVertexMan // determine the destination task with largest id to schedule float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks; int numTaskToSchedule; - if (percentFinishedSrcTask < config.minFraction) { + if (percentFinishedSrcTask < minFraction) { numTaskToSchedule = 0; - } else if (config.minFraction <= percentFinishedSrcTask && - percentFinishedSrcTask <= config.maxFraction) { - numTaskToSchedule = (int) ((percentFinishedSrcTask-config.minFraction) - /(config.maxFraction-config.minFraction)*parallelism); + } else if (minFraction <= percentFinishedSrcTask && + percentFinishedSrcTask <= maxFraction) { + numTaskToSchedule = (int) ((percentFinishedSrcTask - minFraction) + /(maxFraction - minFraction) * parallelism); } else { numTaskToSchedule = parallelism; } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java index 1a397fd..f28f4a3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java @@ -21,6 +21,7 @@ import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; import java.io.IOException; import java.util.List; @@ -39,7 +40,7 @@ abstract class CartesianProductVertexManagerReal { return this.context; } - public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception; + public abstract void initialize(CartesianProductConfigProto config) throws Exception; public abstract void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws IOException; http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java deleted file mode 100644 index 46ea76e..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerUnpartitioned.java +++ /dev/null @@ -1,438 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import com.google.common.primitives.Ints; -import com.google.protobuf.ByteString; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.runtime.api.TaskAttemptIdentifier; -import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; -import org.apache.tez.runtime.library.utils.Grouper; -import org.roaringbitmap.RoaringBitmap; -import org.slf4j.Logger; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; - -import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; -import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; - -/** - * In unpartitioned case, we have one destination task for each source chunk combination. A source - * is a source vertex or a source vertex group. A chunk is one source task (without auto grouping) - * or a group of source tasks (with auto grouping). A chunk may contains multiple tasks across - * vertices. The mapping from source chunk to destination task id is done by - * {@link <CartesianProductCombination>}. - * - * If auto grouping is enabled, this vertex manager will estimate output size of each source and - * group source tasks of each source in chunk according to desired grouping size configured by user. - * - * - */ -class CartesianProductVertexManagerUnpartitioned extends CartesianProductVertexManagerReal { - /** - * a cartesian product source - */ - static class Source { - // list of source vertices of this source - List<SrcVertex> srcVertices = new ArrayList<>(); - // position of this source in all sources - int position; - // name of source vertex or vertex group - String name; - - // total number of chunks in this source - public int getNumChunk() { - int numChunk = 0; - for (SrcVertex srcV : srcVertices) { - numChunk += srcV.numChunk; - } - return numChunk; - } - - // whether this source has any task completed - public boolean hasTaskCompleted() { - for (SrcVertex srcV : srcVertices) { - if (!srcV.taskCompleted.isEmpty()) { - return true; - } - } - return false; - } - - public String toString(boolean afterReconfigure) { - StringBuilder sb = new StringBuilder(); - sb.append("Source at position "); - sb.append(position); - if (name != null) { - sb.append(", "); - sb.append("vertex group "); - sb.append(name); - - } - sb.append(": {"); - for (SrcVertex srcV : srcVertices) { - sb.append("["); - sb.append(srcV.toString(afterReconfigure)); - sb.append("], "); - } - sb.deleteCharAt(sb.length() - 1); - sb.setCharAt(sb.length() - 1, '}'); - return sb.toString(); - } - } - - /** - * a cartesian product source vertex - */ - class SrcVertex { - // which source this vertex belongs to - Source source; - // vertex name - String name; - int numTask; - // num chunks of this source vertex - int numChunk; - // offset of chunk id in vertex group - // we need sequence chunks in the vertex group to make them look like from single vertex - int chunkIdOffset = 0; - RoaringBitmap taskCompleted = new RoaringBitmap(); - RoaringBitmap taskWithVMEvent = new RoaringBitmap(); - long outputBytes; - - public void doGrouping() { - numChunk = numTask; - if (config.enableAutoGrouping) { - outputBytes = outputBytes * numTask / taskWithVMEvent.getCardinality(); - numChunk = Math.min(numChunk, - (int) ((outputBytes + config.desiredBytesPerChunk - 1) / config.desiredBytesPerChunk)); - } - } - - public String toString(boolean afterReconfigure) { - StringBuilder sb = new StringBuilder(); - sb.append("vertex ").append(name).append(", "); - if (afterReconfigure) { - sb.append("estimated output ").append(outputBytes).append(" bytes, "); - sb.append(numChunk).append(" chunks"); - } else { - sb.append(numTask).append(" tasks, "); - sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, "); - sb.append("output ").append(outputBytes).append(" bytes"); - } - return sb.toString(); - } - } - - private static final Logger LOG = - org.slf4j.LoggerFactory.getLogger(CartesianProductVertexManagerUnpartitioned.class); - - CartesianProductVertexManagerConfig config; - Map<String, Source> sourcesByName = new HashMap<>(); - Map<String, SrcVertex> srcVerticesByName = new HashMap<>(); - - private boolean vertexReconfigured = false; - private boolean vertexStarted = false; - private boolean vertexStartSchedule = false; - private int numCPSrcNotInConfigureState = 0; - private int numBroadcastSrcNotInRunningState = 0; - private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>(); - private RoaringBitmap scheduledTasks = new RoaringBitmap(); - - /* auto reduce related */ - // num of chunks of source at the corresponding position in source list - private int[] numChunksPerSrc; - private Set<String> vertexSentVME = new HashSet<>(); - private Grouper grouper = new Grouper(); - - public CartesianProductVertexManagerUnpartitioned(VertexManagerPluginContext context) { - super(context); - } - - @Override - public void initialize(CartesianProductVertexManagerConfig config) throws Exception { - for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) { - if (e.getValue().getDataMovementType() == CUSTOM - && e.getValue().getEdgeManagerDescriptor().getClassName() - .equals(CartesianProductEdgeManager.class.getName())) { - srcVerticesByName.put(e.getKey(), new SrcVertex()); - srcVerticesByName.get(e.getKey()).name = e.getKey(); - getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED)); - numCPSrcNotInConfigureState++; - } else { - getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING)); - numBroadcastSrcNotInRunningState++; - } - } - - Map<String, List<String>> srcGroups = getContext().getInputVertexGroups(); - for (int i = 0; i < config.getSourceVertices().size(); i++) { - String srcName = config.getSourceVertices().get(i); - Source source = new Source(); - source.position = i; - if (srcGroups.containsKey(srcName)) { - source.name = srcName; - for (String srcVName : srcGroups.get(srcName)) { - source.srcVertices.add(srcVerticesByName.get(srcVName)); - srcVerticesByName.get(srcVName).source = source; - } - } else { - source.srcVertices.add(srcVerticesByName.get(srcName)); - srcVerticesByName.get(srcName).source = source; - } - sourcesByName.put(srcName, source); - } - - numChunksPerSrc = new int[sourcesByName.size()]; - this.config = config; - getContext().vertexReconfigurationPlanned(); - } - - @Override - public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) - throws Exception { - vertexStarted = true; - if (completions != null) { - for (TaskAttemptIdentifier attempt : completions) { - addCompletedSrcTaskToProcess(attempt); - } - } - tryScheduleTasks(); - } - - @Override - public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException { - String vertex = stateUpdate.getVertexName(); - VertexState state = stateUpdate.getVertexState(); - - if (state == VertexState.CONFIGURED) { - srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex); - numCPSrcNotInConfigureState--; - } else if (state == VertexState.RUNNING) { - numBroadcastSrcNotInRunningState--; - } - tryScheduleTasks(); - } - - @Override - public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { - addCompletedSrcTaskToProcess(attempt); - tryScheduleTasks(); - } - - private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) { - int taskId = attempt.getTaskIdentifier().getIdentifier(); - String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); - SrcVertex srcV = srcVerticesByName.get(vertex); - if (srcV != null && !srcV.taskCompleted.contains(taskId)) { - srcV.taskCompleted.add(taskId); - completedSrcTaskToProcess.add(attempt); - } - } - - private boolean tryStartSchedule() { - if (!vertexReconfigured || !vertexStarted || numBroadcastSrcNotInRunningState > 0) { - return false; - } - - for (Source src : sourcesByName.values()) { - if (!src.hasTaskCompleted()) { - return false; - } - } - vertexStartSchedule = true; - return true; - } - - public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) - throws IOException { - /* vmEvent after reconfigure doesn't matter */ - if (vertexReconfigured) { - return; - } - - if (vmEvent.getUserPayload() != null) { - String srcVertex = - vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName(); - SrcVertex srcV = srcVerticesByName.get(srcVertex); - - // vmEvent from non-cp vertex doesn't matter - if (srcV == null) { - return; - } - - VertexManagerEventPayloadProto proto = - VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload())); - srcV.outputBytes += proto.getOutputSize(); - srcV.taskWithVMEvent.add(vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier()); - vertexSentVME.add(srcVertex); - } - - tryScheduleTasks(); - } - - private boolean tryReconfigure() throws IOException { - if (numCPSrcNotInConfigureState > 0) { - return false; - } - if (config.enableAutoGrouping) { - if (vertexSentVME.size() != srcVerticesByName.size()) { - return false; - } - // every src v must output at least one chunk size - for (SrcVertex srcV : srcVerticesByName.values()) { - if (srcV.outputBytes < config.desiredBytesPerChunk - && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) { - return false; - } - } - } - - LOG.info("Start reconfigure, grouping: " + config.enableAutoGrouping - + ", chunk size: " + config.desiredBytesPerChunk + " bytes."); - for (String srcName : config.getSourceVertices()) { - LOG.info(sourcesByName.get(srcName).toString(false)); - } - - for (Source src : sourcesByName.values()) { - for (int i = 0; i < src.srcVertices.size(); i++) { - src.srcVertices.get(i).doGrouping(); - if (i > 0) { - src.srcVertices.get(i).chunkIdOffset += src.srcVertices.get(i-1).numChunk; - } - } - numChunksPerSrc[src.position] = src.getNumChunk(); - } - - int parallelism = 1; - for (Source src : sourcesByName.values()) { - parallelism *= src.getNumChunk(); - } - - LOG.info("After reconfigure, "); - for (String srcName : config.getSourceVertices()) { - LOG.info(sourcesByName.get(srcName).toString(true)); - } - LOG.info("Final parallelism: " + parallelism); - - CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); - for (int i = 0; i < numChunksPerSrc.length; i++) { - numChunksPerSrc[i] = sourcesByName.get(config.getSourceVertices().get(i)).getNumChunk(); - } - builder.setIsPartitioned(false).addAllSources(config.getSourceVertices()) - .addAllNumChunks(Ints.asList(this.numChunksPerSrc)); - - Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties(); - Iterator<Map.Entry<String,EdgeProperty>> iter = edgeProperties.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<String, EdgeProperty> e = iter.next(); - if (e.getValue().getDataMovementType() != CUSTOM) { - iter.remove(); - } else { - SrcVertex srcV = srcVerticesByName.get(e.getKey()); - builder.setNumChunk(srcV.numChunk).setChunkIdOffset(srcV.chunkIdOffset); - e.getValue().getEdgeManagerDescriptor() - .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()))); - } - } - getContext().reconfigureVertex(parallelism, null, edgeProperties); - vertexReconfigured = true; - getContext().doneReconfiguringVertex(); - return true; - } - - private void tryScheduleTasks() throws IOException { - if (!vertexReconfigured && !tryReconfigure()) { - return; - } - if (!vertexStartSchedule && !tryStartSchedule()) { - return; - } - - while (!completedSrcTaskToProcess.isEmpty()) { - scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll()); - } - } - - private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) { - int taskId = attempt.getTaskIdentifier().getIdentifier(); - String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); - SrcVertex srcV = srcVerticesByName.get(vertex); - Source src = srcV.source; - - List<ScheduleTaskRequest> requests = new ArrayList<>(); - CartesianProductCombination combination = - new CartesianProductCombination(numChunksPerSrc, src.position); - grouper.init(srcV.numTask, srcV.numChunk); - combination.firstTaskWithFixedChunk(grouper.getGroupId(taskId) + srcV.chunkIdOffset); - do { - List<Integer> list = combination.getCombination(); - - if (scheduledTasks.contains(combination.getChunkId())) { - continue; - } - boolean readyToSchedule = true; - for (int i = 0; i < list.size(); i++) { - int chunkId = list.get(i); - SrcVertex srcVHasGroup = null; - for (SrcVertex v : sourcesByName.get(config.getSourceVertices().get(i)).srcVertices) { - if (v.chunkIdOffset <= chunkId && chunkId < v.chunkIdOffset + v.numChunk) { - srcVHasGroup = v; - break; - } - } - assert srcVHasGroup != null; - grouper.init(srcVHasGroup.numTask, srcVHasGroup.numChunk); - chunkId -= srcVHasGroup.chunkIdOffset; - for (int j = grouper.getFirstTaskInGroup(chunkId); j <= grouper.getLastTaskInGroup(chunkId); j++) { - if (!srcVHasGroup.taskCompleted.contains(j)) { - readyToSchedule = false; - break; - } - } - if (!readyToSchedule) { - break; - } - } - - if (readyToSchedule) { - requests.add(ScheduleTaskRequest.create(combination.getChunkId(), null)); - scheduledTasks.add(combination.getChunkId()); - } - } while (combination.nextTaskWithFixedChunk()); - if (!requests.isEmpty()) { - getContext().scheduleTasks(requests); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java new file mode 100644 index 0000000..3085e5e --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductEdgeManager.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.primitives.Ints; +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.runtime.library.utils.Grouper; + +import javax.annotation.Nullable; + +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductCombination.fromTaskId; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; + + +class FairCartesianProductEdgeManager extends CartesianProductEdgeManagerReal { + private int numPartition; + // position of current source in all cartesian product sources + private int positionInSrc; + // #chunk of each cartesian product source + private int[] numChunkPerSrc; + // #task of each vertex in vertex group that contains src vertex + private int[] numTaskPerSrcVertexInGroup; + // position of src vertex in vertex group + private int positionInGroup; + // # destination tasks that consume same chunk + private int numDestConsumerPerChunk; + private Grouper grouper = new Grouper(); + private Grouper grouperForComputeOffset = new Grouper(); + + public FairCartesianProductEdgeManager(EdgeManagerPluginContext context) { + super(context); + } + + @Override + public void initialize(CartesianProductConfigProto config) { + String groupName = getContext().getVertexGroupName(); + String srcName = groupName != null ? groupName : getContext().getSourceVertexName(); + this.positionInSrc = config.getSourcesList().indexOf(srcName); + + if (config.hasNumPartitionsForFairCase()) { + this.numPartition = config.getNumPartitionsForFairCase(); + } else { + this.numPartition = (int) Math.pow(config.getMaxParallelism(), 1.0 / config.getSourcesCount()); + } + + if (config.getNumChunksCount() > 0) { + // initialize after reconfiguration + this.numChunkPerSrc = Ints.toArray(config.getNumChunksList()); + grouper.init(getContext().getSourceVertexNumTasks() * numPartition, + numChunkPerSrc[positionInSrc]); + this.numTaskPerSrcVertexInGroup = Ints.toArray(config.getNumTaskPerVertexInGroupList()); + this.positionInGroup = config.getPositionInGroup(); + + numDestConsumerPerChunk = 1; + for (int numChunk : numChunkPerSrc) { + numDestConsumerPerChunk *= numChunk; + } + numDestConsumerPerChunk /= numChunkPerSrc[positionInSrc]; + } + } + + @Override + public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { + int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc); + int itemId = failedInputId - getItemIdOffset(chunkId) + grouper.getFirstItemInGroup(chunkId); + return itemId / numPartition; + } + + @Override + public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId, + int destTaskId) throws Exception { + int itemId = srcTaskId * numPartition + srcOutputId; + int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc); + if (grouper.isInGroup(itemId, chunkId)) { + int idx = itemId - grouper.getFirstItemInGroup(chunkId) + getItemIdOffset(chunkId); + return EventRouteMetadata.create(1, new int[] {idx}); + } + return null; + } + + @Nullable + @Override + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc); + int firstItemInChunk = grouper.getFirstItemInGroup(chunkId); + int lastItemInChunk = grouper.getLastItemInGroup(chunkId); + int firstItemInSrcTask = srcTaskId * numPartition; + int lastItemInSrcTask = firstItemInSrcTask + numPartition - 1; + if (!(lastItemInChunk < firstItemInSrcTask || firstItemInChunk > lastItemInSrcTask)) { + int firstItem = Math.max(firstItemInChunk, firstItemInSrcTask); + int lastItem = Math.min(lastItemInChunk, lastItemInSrcTask); + return CompositeEventRouteMetadata.create(lastItem - firstItem + 1, + firstItem - firstItemInChunk + getItemIdOffset(chunkId), firstItem - firstItemInSrcTask); + } + return null; + } + + /** + * #item from vertices before source vertex in the same vertex group + * @param chunkId + * @return + */ + private int getItemIdOffset(int chunkId) { + int offset = 0; + for (int i = 0; i < positionInGroup; i++) { + grouperForComputeOffset.init(numTaskPerSrcVertexInGroup[i] * numPartition, + numChunkPerSrc[positionInSrc]); + offset += grouperForComputeOffset.getNumItemsInGroup(chunkId); + } + return offset; + } + + @Nullable + @Override + public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc); + int firstItemInChunk = grouper.getFirstItemInGroup(chunkId); + int lastItemInChunk = grouper.getLastItemInGroup(chunkId); + int firstItemInSrcTask = srcTaskId * numPartition; + int lastItemInSrcTask = firstItemInSrcTask + numPartition - 1; + if (!(lastItemInChunk < firstItemInSrcTask || firstItemInChunk > lastItemInSrcTask)) { + int firstItem = Math.max(firstItemInChunk, firstItemInSrcTask); + int lastItem = Math.min(lastItemInChunk, lastItemInSrcTask); + int[] targetIndices = new int[lastItem - firstItem + 1]; + for (int i = firstItem; i <= lastItem; i++) { + targetIndices[i - firstItem] = i - firstItemInChunk + getItemIdOffset(chunkId); + } + return EventRouteMetadata.create(targetIndices.length, targetIndices); + } + return null; + } + + @Override + public int getNumDestinationTaskPhysicalInputs(int destTaskId) { + int chunkId = fromTaskId(numChunkPerSrc, destTaskId).getCombination().get(positionInSrc); + if (0 <= chunkId && chunkId < numChunkPerSrc[positionInSrc]) { + return grouper.getNumItemsInGroup(chunkId); + } + return 0; + } + + @Override + public int getNumSourceTaskPhysicalOutputs(int srcTaskId) { + return numPartition; + } + + @Override + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + int numChunk = grouper.getGroupId(sourceTaskIndex * numPartition + numPartition - 1) + - grouper.getGroupId(sourceTaskIndex * numPartition) + 1; + return numDestConsumerPerChunk * numChunk; + } +} \ No newline at end of file
