TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. (Zhiyuan Yang via mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a068b23 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a068b23 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a068b23 Branch: refs/heads/master Commit: 1a068b2391684563bb53a0720848b7673d8dc46c Parents: af82469 Author: Ming Ma <[email protected]> Authored: Tue Sep 6 10:49:50 2016 -0700 Committer: Ming Ma <[email protected]> Committed: Tue Sep 6 10:49:50 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +- .../apache/tez/examples/CartesianProduct.java | 208 ++++++++++++++ tez-runtime-library/findbugs-exclude.xml | 18 ++ tez-runtime-library/pom.xml | 1 + .../CartesianProductCombination.java | 164 +++++++++++ .../CartesianProductConfig.java | 255 +++++++++++++++++ .../CartesianProductEdgeManager.java | 106 +++++++ .../CartesianProductEdgeManagerConfig.java | 64 +++++ .../CartesianProductEdgeManagerPartitioned.java | 124 ++++++++ .../CartesianProductEdgeManagerReal.java | 62 ++++ ...artesianProductEdgeManagerUnpartitioned.java | 98 +++++++ .../CartesianProductFilter.java | 47 +++ .../CartesianProductFilterDescriptor.java | 28 ++ .../CartesianProductVertexManager.java | 139 +++++++++ .../CartesianProductVertexManagerConfig.java | 75 +++++ ...artesianProductVertexManagerPartitioned.java | 176 ++++++++++++ .../CartesianProductVertexManagerReal.java | 50 ++++ ...tesianProductVertexManagerUnpartitioned.java | 178 ++++++++++++ .../main/proto/CartesianProductPayload.proto | 31 ++ .../TestCartesianProductCombination.java | 110 +++++++ .../TestCartesianProductConfig.java | 106 +++++++ .../TestCartesianProductEdgeManager.java | 68 +++++ ...tCartesianProductEdgeManagerPartitioned.java | 284 +++++++++++++++++++ ...artesianProductEdgeManagerUnpartitioned.java | 240 ++++++++++++++++ .../TestCartesianProductVertexManager.java | 67 +++++ ...artesianProductVertexManagerPartitioned.java | 230 +++++++++++++++ ...tesianProductVertexManagerUnpartitioned.java | 194 +++++++++++++ .../org/apache/tez/test/TestFaultTolerance.java | 74 ++++- 29 files changed, 3198 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b73dd3f..0225db6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. TEZ-3326. Display JVM system properties in AM and task logs. TEZ-3009. Errors that occur during container task acquisition are not logged. TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher. http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e39315b..e5f3e71 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1705,8 +1705,7 @@ public class TaskAttemptImpl implements TaskAttempt, } int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000); - boolean crossTimeDeadline = readErrorTimespanSec >= - MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false; + boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC; float failureFraction = ((float) attempt.uniquefailedOutputReports.size()) / outputFailedEvent.getConsumerTaskNumber(); http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java new file mode 100644 index 0000000..9f3d490 --- /dev/null +++ b/tez-examples/src/main/java/org/apache/tez/examples/CartesianProduct.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.examples; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueReader; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.api.Partitioner; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; +import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.processor.SimpleProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +/** + * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one + * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions + * according to the parity of token's first char. Then JoinProcessor does cartesian product of + * partitioned token sets. + */ +public class CartesianProduct extends TezExampleBase { + private static final String INPUT = "Input1"; + private static final String OUTPUT = "Output"; + private static final String VERTEX1 = "Vertex1"; + private static final String VERTEX2 = "Vertex2"; + private static final String VERTEX3 = "Vertex3"; + private static final String PARTITIONED = "-partitioned"; + private static final String UNPARTITIONED = "-unpartitioned"; + private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class); + private static final int numPartition = 2; + private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2}; + + public static class TokenProcessor extends SimpleProcessor { + public TokenProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 1); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader(); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter(); + while (kvReader.next()) { + StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString()); + while (itr.hasMoreTokens()) { + kvWriter.write(new Text(itr.nextToken()), new IntWritable(1)); + } + } + } + } + + public static class JoinProcessor extends SimpleMRProcessor { + public JoinProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter(); + KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader(); + KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader(); + Set<String> rightSet = new HashSet<>(); + + while (kvReader2.next()) { + rightSet.add(kvReader2.getCurrentKey().toString()); + } + + while (kvReader1.next()) { + String left = kvReader1.getCurrentKey().toString(); + for (String right : rightSet) { + kvWriter.write(left, right); + } + } + } + } + + public static class CustomPartitioner implements Partitioner { + @Override + public int getPartition(Object key, Object value, int numPartitions) { + return key.toString().charAt(0) % numPartition; + } + } + + private DAG createDAG(TezConfiguration tezConf, String inputPath1, String inputPath2, + String outputPath, boolean isPartitioned) throws IOException { + Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName())); + // turn off groupSplit so that each input file incurs one task + v1.addDataSource(INPUT, + MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1) + .groupSplits(false).build()); + Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName())); + v2.addDataSource(INPUT, + MRInput.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2) + .groupSplits(false).build()); + CartesianProductConfig cartesianProductConfig; + if (isPartitioned) { + Map<String, Integer> vertexPartitionMap = new HashMap<>(); + for (String vertex : sourceVertices) { + vertexPartitionMap.put(vertex, numPartition); + } + cartesianProductConfig = new CartesianProductConfig(vertexPartitionMap); + } else { + cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices)); + } + UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf); + Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName())); + v3.addDataSink(OUTPUT, + MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath) + .build()); + v3.setVertexManagerPlugin( + VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) + .setUserPayload(userPayload)); + + DAG dag = DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3); + EdgeManagerPluginDescriptor edgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()); + edgeManagerDescriptor.setUserPayload(userPayload); + EdgeProperty edgeProperty; + if (isPartitioned) { + UnorderedPartitionedKVEdgeConfig edgeConf = + UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(), + CustomPartitioner.class.getName()).build(); + edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor); + } else { + UnorderedKVEdgeConfig edgeConf = + UnorderedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName()).build(); + edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor); + } + dag.addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty)); + + return dag; + } + + @Override + protected void printUsage() { + System.err.println("Usage: args: ["+PARTITIONED + "|" + UNPARTITIONED + + " <input_dir1> <input_dir2> <output_dir>"); + } + + @Override + protected int validateArgs(String[] otherArgs) { + return (otherArgs.length != 4 || (!otherArgs[0].equals(PARTITIONED) + && !otherArgs[0].equals(UNPARTITIONED))) ? -1 : 0; + } + + @Override + protected int runJob(String[] args, TezConfiguration tezConf, + TezClient tezClient) throws Exception { + DAG dag = createDAG(tezConf, args[1], args[2], + args[3], args[0].equals(PARTITIONED)); + return runDag(dag, isCountersLog(), LOG); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args); + System.exit(res); + } +} + http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index 4e15edc..d3b6245 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -123,6 +123,24 @@ </Match> <Match> + <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/> + <Field name="unknownFields"/> + <Bug pattern="SE_BAD_FIELD"/> + </Match> + + <Match> + <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto"/> + <Field name="PARSER"/> + <Bug pattern="MS_SHOULD_BE_FINAL"/> + </Match> + + <Match> + <Class name="org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload$CartesianProductConfigProto$Builder"/> + <Method name="maybeForceBuilderInitialization"/> + <Bug pattern="UCF_USELESS_CONTROL_FLOW"/> + </Match> + + <Match> <Bug pattern="EI_EXPOSE_REP"/> <Or> <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" /> http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/pom.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml index 9831e50..b676933 100644 --- a/tez-runtime-library/pom.xml +++ b/tez-runtime-library/pom.xml @@ -129,6 +129,7 @@ <directory>${basedir}/src/main/proto</directory> <includes> <include>ShufflePayloads.proto</include> + <include>CartesianProductPayload.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java new file mode 100644 index 0000000..a46993d --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductCombination.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.base.Preconditions; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Represent the combination of source partitions or tasks. + * + * For example, if we have two source vertices and each generates two partition, we will have 2*2=4 + * destination tasks. The mapping from source partition/task to destination task is like this: + * <0, 0> -> 0, <0, 1> -> 1, <1, 0> -> 2, <1, 1> -> 3; + * + * Basically, it stores the source partition/task combination and can compute corresponding + * destination task. It can also figure out the source combination from a given destination task. + * Task id is mapped in the ascending order of combinations, starting from 0. <field>factor</field> + * is the helper array to computer task id, so task id = (combination) dot-product (factor) + * + * You can traverse all combinations with <method>firstTask</method> and <method>nextTask</method>, + * like <0, 0> -> <0, 1> -> <1, 0> -> <1, 1>. + * + * Or you can also traverse all combinations that has one specific partition with + * <method>firstTaskWithFixedPartition</method> and <method>nextTaskWithFixedPartition</method>, + * like <0, 1, 0> -> <0, 1, 1> -> <1, 1, 0> -> <1, 1, 1> (all combinations with 2nd vertex's 2nd + * partition. + */ +class CartesianProductCombination { + // numPartitions for partitioned case, numTasks for unpartitioned case + private int[] numPartitionOrTask; + // at which position (in source vertices array) our vertex is + private int positionId = -1; + // The i-th element Ci represents partition/task Ci of source vertex i. + private final Integer[] combination; + // the weight of each vertex when computing the task id + private final Integer[] factor; + + public CartesianProductCombination(int[] numPartitionOrTask) { + this.numPartitionOrTask = Arrays.copyOf(numPartitionOrTask, numPartitionOrTask.length); + combination = new Integer[numPartitionOrTask.length]; + factor = new Integer[numPartitionOrTask.length]; + factor[factor.length-1] = 1; + for (int i = combination.length-2; i >= 0; i--) { + factor[i] = factor[i+1]*numPartitionOrTask[i+1]; + } + } + + public CartesianProductCombination(int[] numPartitionOrTask, int positionId) { + this(numPartitionOrTask); + this.positionId = positionId; + } + + /** + * @return a read only view of current combination + */ + public List<Integer> getCombination() { + return Collections.unmodifiableList(Arrays.asList(combination)); + } + + /** + * first combination with given partition id in current position + * @param partition + */ + public void firstTaskWithFixedPartition(int partition) { + Preconditions.checkArgument(positionId >= 0 && positionId < combination.length); + Arrays.fill(combination, 0); + combination[positionId] = partition; + } + + /** + * next combination without current partition in current position + * @return false if there is no next combination + */ + public boolean nextTaskWithFixedPartition() { + Preconditions.checkArgument(positionId >= 0 && positionId < combination.length); + int i; + for (i = combination.length-1; i >= 0; i--) { + if (i != positionId && combination[i] != numPartitionOrTask[i]-1) { + break; + } + } + + if (i < 0) { + return false; + } + + combination[i]++; + + for (i++; i < combination.length; i++) { + if (i != positionId) { + combination[i] = 0; + } + } + + return true; + } + + /** + * first combination with given partition id in current position + */ + public void firstTask() { + Arrays.fill(combination, 0); + } + + /** + * next combination without current partition in current position + * @return false if there is no next combination + */ + public boolean nextTask() { + int i; + for (i = combination.length-1; i >= 0; i--) { + if (combination[i] != numPartitionOrTask[i]-1) { + break; + } + } + + if (i < 0) { + return false; + } + + combination[i]++; + Arrays.fill(combination, i+1, combination.length, 0); + return true; + } + + /** + * @return corresponding task id for current combination + */ + public int getTaskId() { + int taskId = 0; + for (int i = 0; i < combination.length; i++) { + taskId += combination[i]*factor[i]; + } + return taskId; + } + + public static CartesianProductCombination fromTaskId(int[] numPartitionOrTask, + int taskId) { + CartesianProductCombination result = new CartesianProductCombination(numPartitionOrTask); + for (int i = 0; i < result.combination.length; i++) { + result.combination[i] = taskId/result.factor[i]; + taskId %= result.factor[i]; + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java new file mode 100644 index 0000000..b682182 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductConfig.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; + +/** + * <class>CartesianProductConfig</class> is used to configure both + * <class>CartesianProductVertexManager</class> and <class>CartesianProductEdgeManager</class>. + * User need to specify the vertices and number of partitions of each vertices' output at least. + * In partitioned case, filter should be specified here also(via + * <class>CartesianProductFilterDescriptor</class>. User may also configure min/max fractions used + * in slow start. + */ +@Evolving +public class CartesianProductConfig { + private final boolean isPartitioned; + private final String[] sourceVertices; + private final int[] numPartitions; + private final CartesianProductFilterDescriptor filterDescriptor; + + /** + * create config for unpartitioned case + * @param sourceVertices list of source vertices names + */ + public CartesianProductConfig(List<String> sourceVertices) { + Preconditions.checkArgument(sourceVertices != null, "source vertices list cannot be null"); + Preconditions.checkArgument(sourceVertices.size() > 1, + "there must be more than 1 source " + "vertices, currently only " + sourceVertices.size()); + + this.isPartitioned = false; + this.sourceVertices = sourceVertices.toArray(new String[sourceVertices.size()]); + this.numPartitions = null; + this.filterDescriptor = null; + } + + /** + * create config for partitioned case without filter + * @param vertexPartitionMap the map from vertex name to its number of partitions + */ + public CartesianProductConfig(Map<String, Integer> vertexPartitionMap) { + this(vertexPartitionMap, null); + } + + /** + * create config for partitioned case with filter + * @param vertexPartitionMap the map from vertex name to its number of partitions + * @param filterDescriptor + */ + public CartesianProductConfig(Map<String, Integer> vertexPartitionMap, + CartesianProductFilterDescriptor filterDescriptor) { + Preconditions.checkArgument(vertexPartitionMap != null, "vertex-partition map cannot be null"); + Preconditions.checkArgument(vertexPartitionMap.size() > 1, + "there must be more than 1 source " + "vertices, currently only " + vertexPartitionMap.size()); + + this.isPartitioned = true; + this.numPartitions = new int[vertexPartitionMap.size()]; + this.sourceVertices = new String[vertexPartitionMap.size()]; + this.filterDescriptor = filterDescriptor; + + int i = 0; + for (Map.Entry<String, Integer> entry : vertexPartitionMap.entrySet()) { + this.sourceVertices[i] = entry.getKey(); + this.numPartitions[i] = entry.getValue(); + i++; + } + + checkNumPartitions(); + } + + /** + * create config for partitioned case, with specified source vertices order + * @param numPartitions + * @param sourceVertices + * @param filterDescriptor + */ + @VisibleForTesting + protected CartesianProductConfig(int[] numPartitions, String[] sourceVertices, + CartesianProductFilterDescriptor filterDescriptor) { + Preconditions.checkArgument(numPartitions != null, "partitions count array can't be null"); + Preconditions.checkArgument(sourceVertices != null, "source vertices array can't be null"); + Preconditions.checkArgument(numPartitions.length == sourceVertices.length, + "partitions count array(length: " + numPartitions.length + ") and source vertices array " + + "(length: " + sourceVertices.length + ") cannot have different length"); + Preconditions.checkArgument(sourceVertices.length > 1, + "there must be more than 1 source " + "vertices, currently only " + sourceVertices.length); + + this.isPartitioned = true; + this.numPartitions = numPartitions; + this.sourceVertices = sourceVertices; + this.filterDescriptor = filterDescriptor; + + checkNumPartitions(); + } + + /** + * create config for both cases, used by subclass + */ + protected CartesianProductConfig(boolean isPartitioned, int[] numPartitions, + String[] sourceVertices, + CartesianProductFilterDescriptor filterDescriptor) { + this.isPartitioned = isPartitioned; + this.numPartitions = numPartitions; + this.sourceVertices = sourceVertices; + this.filterDescriptor = filterDescriptor; + } + + @VisibleForTesting + protected void checkNumPartitions() { + if (isPartitioned) { + boolean isUnpartitioned = true; + for (int i = 0; i < numPartitions.length; i++) { + Preconditions.checkArgument(this.numPartitions[i] > 0, + "Vertex " + sourceVertices[i] + "has negative (" + numPartitions[i] + ") partitions"); + isUnpartitioned = isUnpartitioned && numPartitions[i] == 1; + } + Preconditions.checkArgument(!isUnpartitioned, + "every source vertex has 1 partition in a partitioned case"); + } else { + Preconditions.checkArgument(this.numPartitions == null, + "partition counts should be null in unpartitioned case"); + } + } + + /** + * @return the array of source vertices names + */ + public List<String> getSourceVertices() { + return Collections.unmodifiableList(Arrays.asList(sourceVertices)); + } + + /** + * @return the array of number of partitions, the order is same as result of + * <method>getSourceVertices</method> + */ + public List<Integer> getNumPartitions() { + if (this.numPartitions == null) { + return null; + } + return Collections.unmodifiableList(Ints.asList(this.numPartitions)); + } + + public boolean getIsPartitioned() { + return isPartitioned; + } + + public CartesianProductFilterDescriptor getFilterDescriptor() { + return this.filterDescriptor; + } + + public UserPayload toUserPayload(TezConfiguration conf) throws IOException { + return UserPayload.create(ByteBuffer.wrap(toProto(conf).toByteArray())); + } + + protected CartesianProductConfigProto toProto(TezConfiguration conf) { + CartesianProductConfigProto.Builder builder = + CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(this.isPartitioned) + .addAllSourceVertices(Arrays.asList(sourceVertices)); + + if (isPartitioned) { + builder.addAllNumPartitions(Ints.asList(numPartitions)); + if (filterDescriptor != null) { + builder.setFilterClassName(filterDescriptor.getClassName()); + UserPayload filterUesrPayload = filterDescriptor.getUserPayload(); + if (filterUesrPayload != null) { + builder.setFilterUserPayload(ByteString.copyFrom(filterUesrPayload.getPayload())); + } + } + } + + builder.setMinFraction( + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT); + builder.setMaxFraction( + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT); + + if (conf != null) { + builder.setMinFraction(conf.getFloat( + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION, + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT)); + builder.setMaxFraction(conf.getFloat( + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION, + CartesianProductVertexManager.TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT)); + } + Preconditions.checkArgument(builder.getMinFraction() <= builder.getMaxFraction(), + "min fraction(" + builder.getMinFraction() + ") should be less than max fraction(" + + builder.getMaxFraction() + ") in cartesian product slow start"); + + return builder.build(); + } + + protected static CartesianProductConfigProto userPayloadToProto(UserPayload payload) + throws InvalidProtocolBufferException { + Preconditions.checkArgument(payload != null, "UserPayload is null"); + Preconditions.checkArgument(payload.getPayload() != null, "UserPayload carreis null payload"); + return + CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); + } + + protected static CartesianProductConfig fromUserPayload(UserPayload payload) + throws InvalidProtocolBufferException { + return fromProto(userPayloadToProto(payload)); + } + + protected static CartesianProductConfig fromProto( + CartesianProductConfigProto proto) { + if (!proto.getIsPartitioned()) { + return new CartesianProductConfig(proto.getSourceVerticesList()); + } else { + String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; + proto.getSourceVerticesList().toArray(sourceVertices); + CartesianProductFilterDescriptor filterDescriptor = null; + if (proto.hasFilterClassName()) { + filterDescriptor = new CartesianProductFilterDescriptor(proto.getFilterClassName()); + if (proto.hasFilterUserPayload()) { + filterDescriptor.setUserPayload( + UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); + } + } + return new CartesianProductConfig(Ints.toArray(proto.getNumPartitionsList()), + sourceVertices, filterDescriptor); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java new file mode 100644 index 0000000..96cce94 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManager.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; +import org.apache.tez.dag.api.TezException; + +import javax.annotation.Nullable; + +/** + * This EM wrap a real edge manager implementation object. It choose whether it's partitioned or + * unpartitioned implementation according to the config. All method invocations are actually + * redirected to real implementation. + */ +public class CartesianProductEdgeManager extends EdgeManagerPluginOnDemand { + private CartesianProductEdgeManagerReal edgeManagerReal; + + public CartesianProductEdgeManager(EdgeManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + Preconditions.checkArgument(getContext().getUserPayload() != null); + CartesianProductEdgeManagerConfig config = CartesianProductEdgeManagerConfig.fromUserPayload( + getContext().getUserPayload()); + // no need to check config because config comes from VM and is already checked by VM + edgeManagerReal = config.getIsPartitioned() + ? new CartesianProductEdgeManagerPartitioned(getContext()) + : new CartesianProductEdgeManagerUnpartitioned(getContext()); + edgeManagerReal.initialize(config); + } + + @VisibleForTesting + protected CartesianProductEdgeManagerReal getEdgeManagerReal() { + return this.edgeManagerReal; + } + + @Override + public void prepareForRouting() throws Exception { + edgeManagerReal.prepareForRouting(); + } + + @Override + public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { + return edgeManagerReal.routeInputErrorEventToSource(destTaskId, failedInputId); + } + + @Nullable + @Override + public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, + int srcOutputId, + int destTaskId) + throws Exception { + return edgeManagerReal.routeDataMovementEventToDestination(srcTaskId, srcOutputId, destTaskId); + } + + @Nullable + @Override + public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + return edgeManagerReal.routeCompositeDataMovementEventToDestination(srcTaskId, destTaskId); + } + + @Nullable + @Override + public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + return edgeManagerReal.routeInputSourceTaskFailedEventToDestination(srcTaskId, destTaskId); + } + + @Override + public int getNumDestinationTaskPhysicalInputs(int destTaskId) { + return edgeManagerReal.getNumDestinationTaskPhysicalInputs(destTaskId); + } + + @Override + public int getNumSourceTaskPhysicalOutputs(int srcTaskId) { + return edgeManagerReal.getNumSourceTaskPhysicalOutputs(srcTaskId); + } + + @Override + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + return edgeManagerReal.getNumDestinationConsumerTasks(sourceTaskIndex); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java new file mode 100644 index 0000000..d48a0bb --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.tez.dag.api.UserPayload; + +import java.nio.ByteBuffer; + +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; + +class CartesianProductEdgeManagerConfig extends CartesianProductConfig { + private final int[] numTasks; + + protected CartesianProductEdgeManagerConfig(boolean isPartitioned, String[] sourceVertices, + int[] numPartitions, int[] numTasks, + CartesianProductFilterDescriptor filterDescriptor) { + super(isPartitioned, numPartitions, sourceVertices, filterDescriptor); + this.numTasks = numTasks; + } + + public int[] getNumTasks() { + return this.numTasks; + } + + public static CartesianProductEdgeManagerConfig fromUserPayload(UserPayload payload) + throws InvalidProtocolBufferException { + CartesianProductConfigProto proto = + CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); + + boolean isPartitioned = proto.getIsPartitioned(); + String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; + proto.getSourceVerticesList().toArray(sourceVertices); + int[] numPartitions = + proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); + CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() + ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null; + if (proto.hasFilterUserPayload()) { + filterDescriptor.setUserPayload( + UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); + } + int[] numTasks = + proto.getNumTasksCount() == 0 ? null : Ints.toArray(proto.getNumTasksList()); + return new CartesianProductEdgeManagerConfig(isPartitioned, sourceVertices, numPartitions, + numTasks, filterDescriptor); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java new file mode 100644 index 0000000..644d5af --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerPartitioned.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.primitives.Ints; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.UserPayload; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class CartesianProductEdgeManagerPartitioned extends CartesianProductEdgeManagerReal { + private int positionId; + private CartesianProductFilter filter; + private int[] taskIdMapping; + private CartesianProductEdgeManagerConfig config; + private int[] numPartitions; + + public CartesianProductEdgeManagerPartitioned(EdgeManagerPluginContext context) { + super(context); + } + + @Override + public void initialize(CartesianProductEdgeManagerConfig config) throws Exception { + this.config = config; + this.numPartitions = Ints.toArray(config.getNumPartitions()); + positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName()); + CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor(); + if (filterDescriptor != null) { + filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(), + new Class[] {UserPayload.class}, new UserPayload[] {filterDescriptor.getUserPayload()}); + } + generateTaskIdMapping(); + } + + @Override + public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { + return failedInputId; + } + + @Override + public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId, + int destTaskId) throws Exception { + int partition = CartesianProductCombination.fromTaskId(numPartitions, + getIdealTaskId(destTaskId)).getCombination().get(positionId); + return srcOutputId != partition ? null : + EventRouteMetadata.create(1, new int[]{srcTaskId}); + } + + @Nullable + @Override + public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + int partition = CartesianProductCombination.fromTaskId(numPartitions, + getIdealTaskId(destTaskId)).getCombination().get(positionId); + return EventRouteMetadata.create(1, new int[]{srcTaskId}, new int[]{partition}); + } + + @Nullable + @Override + public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + return EventRouteMetadata.create(1, new int[]{srcTaskId}); + } + + @Override + public int getNumDestinationTaskPhysicalInputs(int destTaskId) { + return getContext().getSourceVertexNumTasks(); + } + + @Override + public int getNumSourceTaskPhysicalOutputs(int srcTaskId) { + return numPartitions[positionId]; + } + + @Override + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + return getContext().getDestinationVertexNumTasks(); + } + + private void generateTaskIdMapping() { + List<Integer> idealTaskId = new ArrayList<>(); + Map<String, Integer> vertexPartitionMap = new HashMap<>(); + CartesianProductCombination combination = + new CartesianProductCombination(numPartitions); + combination.firstTask(); + List<String> sourceVertices = config.getSourceVertices(); + do { + for (int i = 0; i < sourceVertices.size(); i++) { + vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i)); + } + if (filter == null || filter.isValidCombination(vertexPartitionMap)) { + idealTaskId.add(combination.getTaskId()); + } + } while (combination.nextTask()); + this.taskIdMapping = Ints.toArray(idealTaskId); + } + + private int getIdealTaskId(int realTaskId) { + return taskIdMapping[realTaskId]; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java new file mode 100644 index 0000000..705db05 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerReal.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; + +/** + * base class of cartesian product edge manager implementation + */ +abstract class CartesianProductEdgeManagerReal { + private final EdgeManagerPluginContext context; + + public CartesianProductEdgeManagerReal(EdgeManagerPluginContext context) { + this.context = context; + } + + public EdgeManagerPluginContext getContext() { + return this.context; + } + + public abstract void initialize(CartesianProductEdgeManagerConfig config) throws Exception; + + public void prepareForRouting() throws Exception {} + + public abstract int routeInputErrorEventToSource(int destTaskId, int failedInputId) + throws Exception; + + public abstract EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, + int srcOutputId, + int destTaskId) + throws Exception; + + public abstract EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) + throws Exception; + + public abstract EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, + int destTaskId) + throws Exception; + + public abstract int getNumDestinationTaskPhysicalInputs(int destTaskId); + + public abstract int getNumSourceTaskPhysicalOutputs(int srcTaskId); + + public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex); +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java new file mode 100644 index 0000000..cea4142 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductEdgeManagerUnpartitioned.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; + +import javax.annotation.Nullable; +import java.util.Arrays; + +import static org.apache.tez.dag.api.EdgeManagerPluginOnDemand.*; + +class CartesianProductEdgeManagerUnpartitioned extends CartesianProductEdgeManagerReal { + private int positionId; + private int[] numTasks; + private int numDestinationConsumerTasks; + + public CartesianProductEdgeManagerUnpartitioned(EdgeManagerPluginContext context) { + super(context); + } + + public void initialize(CartesianProductEdgeManagerConfig config) { + positionId = config.getSourceVertices().indexOf(getContext().getSourceVertexName()); + this.numTasks = config.getNumTasks(); + + if (numTasks != null && numTasks[positionId] != 0) { + numDestinationConsumerTasks = 1; + for (int numTask : numTasks) { + numDestinationConsumerTasks *= numTask; + } + numDestinationConsumerTasks /= numTasks[positionId]; + } + } + + @Override + public int routeInputErrorEventToSource(int destTaskId, int failedInputId) throws Exception { + return + CartesianProductCombination.fromTaskId(numTasks, destTaskId).getCombination().get(positionId); + } + + @Override + public EventRouteMetadata routeDataMovementEventToDestination(int srcTaskId, int srcOutputId, + int destTaskId) throws Exception { + int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId) + .getCombination().get(positionId); + return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null; + } + + @Nullable + @Override + public EventRouteMetadata routeCompositeDataMovementEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId) + .getCombination().get(positionId); + return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}, new int[]{0}) : null; + } + + @Nullable + @Override + public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(int srcTaskId, + int destTaskId) + throws Exception { + int index = CartesianProductCombination.fromTaskId(numTasks, destTaskId) + .getCombination().get(positionId); + return index == srcTaskId ? EventRouteMetadata.create(1, new int[]{0}) : null; + } + + @Override + public int getNumDestinationTaskPhysicalInputs(int destTaskId) { + return 1; + } + + @Override + public int getNumSourceTaskPhysicalOutputs(int srcTaskId) { + return 1; + } + + @Override + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + return numDestinationConsumerTasks; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java new file mode 100644 index 0000000..5b6456e --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.UserPayload; + +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * User can extend this base class and override <method>isValidCombination</method> to implement + * custom filter + */ +@Evolving +public abstract class CartesianProductFilter { + private UserPayload userPayload; + + public CartesianProductFilter(UserPayload payload) { + this.userPayload = payload; + } + + /** + * @param vertexPartitionMap the map from vertex name to partition id + * @return whether this combination of partitions is valid + */ + public abstract boolean isValidCombination(Map<String, Integer> vertexPartitionMap); + + public UserPayload getUserPayload() { + return userPayload; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java new file mode 100644 index 0000000..bc81755 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductFilterDescriptor.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.EntityDescriptor; + +public class CartesianProductFilterDescriptor + extends EntityDescriptor<CartesianProductFilterDescriptor> { + + public CartesianProductFilterDescriptor(String filterClassName) { + super(filterClassName); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java new file mode 100644 index 0000000..659d3b7 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This VM wrap a real vertex manager implementation object. It choose whether it's partitioned or + * unpartitioned implementation according to the config. All method invocations are actually + * redirected to real implementation. + */ +public class CartesianProductVertexManager extends VertexManagerPlugin { + public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION = + "tez.cartesian-product.min-src-fraction"; + public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT = 0.25f; + public static final String TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION = + "tez.cartesian-product.min-src-fraction"; + public static final float TEZ_CAERESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT = 0.75f; + + private CartesianProductVertexManagerReal vertexManagerReal = null; + + public CartesianProductVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + CartesianProductVertexManagerConfig config = + CartesianProductVertexManagerConfig.fromUserPayload(getContext().getUserPayload()); + // check whether DAG and config are is consistent + Map<String, EdgeProperty> edgePropertyMap = getContext().getInputVertexEdgeProperties(); + Set<String> sourceVerticesDAG = edgePropertyMap.keySet(); + Set<String> sourceVerticesConfig = new HashSet<>(); + sourceVerticesConfig.addAll(config.getSourceVertices()); + + for (Map.Entry<String, EdgeProperty> entry : edgePropertyMap.entrySet()) { + if (entry.getValue().getEdgeManagerDescriptor().getClassName() + .equals(CartesianProductEdgeManager.class.getName())) { + Preconditions.checkArgument(sourceVerticesDAG.contains(entry.getKey()), + entry.getKey() + " has CartesianProductEdgeManager but isn't in " + + "CartesianProductVertexManagerConfig"); + } else { + Preconditions.checkArgument(!sourceVerticesDAG.contains(entry.getKey()), + entry.getKey() + " has no CartesianProductEdgeManager but is in " + + "CartesianProductVertexManagerConfig"); + } + } + + for (String vertex : sourceVerticesConfig) { + Preconditions.checkArgument(sourceVerticesDAG.contains(vertex), + vertex + " is in CartesianProductVertexManagerConfig but not a source vertex in DAG"); + Preconditions.checkArgument( + edgePropertyMap.get(vertex).getEdgeManagerDescriptor().getClassName() + .equals(CartesianProductEdgeManager.class.getName()), + vertex + " is in CartesianProductVertexManagerConfig and a source vertex, but has no " + + "CartesianProductEdgeManager"); + } + + vertexManagerReal = config.getIsPartitioned() + ? new CartesianProductVertexManagerPartitioned(getContext()) + : new CartesianProductVertexManagerUnpartitioned(getContext()); + vertexManagerReal.initialize(config); + } + + @VisibleForTesting + protected CartesianProductVertexManagerReal getVertexManagerReal() { + return this.vertexManagerReal; + } + + /** + * no op currently, will be used for locality based optimization in future + * @param vmEvent + * @throws Exception + */ + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { + + vertexManagerReal.onVertexManagerEventReceived(vmEvent); + } + + /** + * Currently direct input to cartesian product vertex is not supported + * @param inputName + * @param inputDescriptor + * @param events + * @throws Exception + */ + @Override + public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, + List<Event> events) throws Exception { + throw new TezException("Direct input to cartesian product vertex is not supported yet"); + } + + @Override + public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception { + vertexManagerReal.onVertexStarted(completions); + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception{ + vertexManagerReal.onVertexStateUpdated(stateUpdate); + } + + @Override + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { + vertexManagerReal.onSourceTaskCompleted(attempt); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java new file mode 100644 index 0000000..b324524 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerConfig.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.tez.dag.api.UserPayload; + +import java.nio.ByteBuffer; + +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; + +class CartesianProductVertexManagerConfig extends CartesianProductConfig { + private final float minFraction; + private final float maxFraction; + + public CartesianProductVertexManagerConfig(boolean isPartitioned, String[] sourceVertices, + int[] numPartitions, + float minFraction, float maxFraction, + CartesianProductFilterDescriptor filterDescriptor) { + super(isPartitioned, numPartitions, sourceVertices, filterDescriptor); + Preconditions.checkArgument(minFraction <= maxFraction, + "min fraction(" + minFraction + ") should be less than max fraction(" + + maxFraction + ") in cartesian product slow start"); + this.minFraction = minFraction; + this.maxFraction = maxFraction; + } + + public float getMinFraction() { + return minFraction; + } + + public float getMaxFraction() { + return maxFraction; + } + + public static CartesianProductVertexManagerConfig fromUserPayload(UserPayload payload) + throws InvalidProtocolBufferException { + CartesianProductConfigProto proto = + CartesianProductConfigProto.parseFrom(ByteString.copyFrom(payload.getPayload())); + + boolean isPartitioned = proto.getIsPartitioned(); + String[] sourceVertices = new String[proto.getSourceVerticesList().size()]; + proto.getSourceVerticesList().toArray(sourceVertices); + int[] numPartitions = + proto.getNumPartitionsCount() == 0 ? null : Ints.toArray(proto.getNumPartitionsList()); + CartesianProductFilterDescriptor filterDescriptor = proto.hasFilterClassName() + ? new CartesianProductFilterDescriptor(proto.getFilterClassName()) : null; + if (proto.hasFilterUserPayload()) { + filterDescriptor.setUserPayload( + UserPayload.create(ByteBuffer.wrap(proto.getFilterUserPayload().toByteArray()))); + } + float minFraction = proto.getMinFraction(); + float maxFraction = proto.getMaxFraction(); + return new CartesianProductVertexManagerConfig(isPartitioned, sourceVertices, numPartitions, + minFraction, maxFraction, filterDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java new file mode 100644 index 0000000..af2abae --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerPartitioned.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezReflectionException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Starts scheduling tasks when number of completed source tasks crosses + * min fraction and schedules all task when max fraction is reached + */ +class CartesianProductVertexManagerPartitioned extends CartesianProductVertexManagerReal { + private CartesianProductVertexManagerConfig config; + private List<String> sourceVertices; + private int parallelism = 0; + private boolean vertexStarted = false; + private boolean vertexReconfigured = false; + private int numSourceVertexConfigured = 0; + private CartesianProductFilter filter; + private Map<String, BitSet> sourceTaskCompleted = new HashMap<>(); + private int numFinishedSrcTasks = 0; + private int totalNumSrcTasks = 0; + private int lastScheduledTaskId = -1; + private static final Logger LOG = + LoggerFactory.getLogger(CartesianProductVertexManagerPartitioned.class); + + public CartesianProductVertexManagerPartitioned(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize(CartesianProductVertexManagerConfig config) throws TezReflectionException { + this.config = config; + this.sourceVertices = config.getSourceVertices(); + CartesianProductFilterDescriptor filterDescriptor = config.getFilterDescriptor(); + if (filterDescriptor != null) { + try { + filter = ReflectionUtils.createClazzInstance(filterDescriptor.getClassName(), + new Class[]{UserPayload.class}, new UserPayload[]{filterDescriptor.getUserPayload()}); + } catch (TezReflectionException e) { + LOG.error("Creating filter failed"); + throw e; + } + } + for (String sourceVertex : sourceVertices) { + sourceTaskCompleted.put(sourceVertex, new BitSet()); + } + for (String vertex : sourceVertices) { + getContext().registerForVertexStateUpdates(vertex, EnumSet.of(VertexState.CONFIGURED)); + } + getContext().vertexReconfigurationPlanned(); + } + + private void reconfigureVertex() throws IOException { + // try all combinations, check against filter and get final parallelism + Map<String, Integer> vertexPartitionMap = new HashMap<>(); + + CartesianProductCombination combination = + new CartesianProductCombination(Ints.toArray(config.getNumPartitions())); + combination.firstTask(); + do { + for (int i = 0; i < sourceVertices.size(); i++) { + vertexPartitionMap.put(sourceVertices.get(i), combination.getCombination().get(i)); + } + if (filter == null || filter.isValidCombination(vertexPartitionMap)) { + parallelism++; + } + } while (combination.nextTask()); + // no need to reconfigure EM because EM already has all necessary information via config object + getContext().reconfigureVertex(parallelism, null, null); + vertexReconfigured = true; + getContext().doneReconfiguringVertex(); + } + + @Override + public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) + throws Exception { + vertexStarted = true; + if (completions != null) { + for (TaskAttemptIdentifier attempt : completions) { + onSourceTaskCompleted(attempt); + } + } + // try schedule because there may be no more vertex state update and source completions + tryScheduleTask(); + } + + @Override + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException{ + Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED); + if (!vertexReconfigured) { + reconfigureVertex(); + } + numSourceVertexConfigured++; + totalNumSrcTasks += getContext().getVertexNumTasks(stateUpdate.getVertexName()); + // try schedule because there may be no more vertex start and source completions + tryScheduleTask(); + } + + @Override + public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { + int taskId = attempt.getTaskIdentifier().getIdentifier(); + String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); + BitSet bitSet = this.sourceTaskCompleted.get(vertex); + if (!bitSet.get(taskId)) { + bitSet.set(taskId); + numFinishedSrcTasks++; + tryScheduleTask(); + } + } + + /** + * schedule task as the ascending order of id. Slow start has same behavior as ShuffleVertexManager + */ + private void tryScheduleTask() { + // only schedule task when vertex is already started and all source vertices are configured + if (!vertexStarted + || numSourceVertexConfigured != sourceVertices.size()) { + return; + } + // determine the destination task with largest id to schedule + float percentFinishedSrcTask = numFinishedSrcTasks*1f/totalNumSrcTasks; + int numTaskToSchedule; + if (percentFinishedSrcTask < config.getMinFraction()) { + numTaskToSchedule = 0; + } else if (config.getMinFraction() <= percentFinishedSrcTask && + percentFinishedSrcTask <= config.getMaxFraction()) { + numTaskToSchedule = (int) ((percentFinishedSrcTask-config.getMinFraction()) + /(config.getMaxFraction()-config.getMinFraction())*parallelism); + } else { + numTaskToSchedule = parallelism; + } + // schedule tasks if there are more we can schedule + if (numTaskToSchedule-1 > lastScheduledTaskId) { + List<ScheduleTaskRequest> scheduleTaskRequests = new ArrayList<>(); + for (int i = lastScheduledTaskId + 1; i < numTaskToSchedule; i++) { + scheduleTaskRequests.add(ScheduleTaskRequest.create(i, null)); + } + lastScheduledTaskId = numTaskToSchedule-1; + getContext().scheduleTasks(scheduleTaskRequests); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/1a068b23/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java new file mode 100644 index 0000000..84e65ac --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/CartesianProductVertexManagerReal.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; + +import java.util.List; + +/** + * base class of cartesian product vertex manager implementation + */ +abstract class CartesianProductVertexManagerReal { + private final VertexManagerPluginContext context; + + public CartesianProductVertexManagerReal(VertexManagerPluginContext context) { + this.context = context; + } + + public final VertexManagerPluginContext getContext() { + return this.context; + } + + public abstract void initialize(CartesianProductVertexManagerConfig config) throws Exception; + + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {} + + public abstract void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception; + + public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception; + + public abstract void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception; +}
