http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java new file mode 100644 index 0000000..a38e20d --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; +import org.apache.tez.runtime.library.utils.Grouper; +import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.CUSTOM; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; + +/** + * In fair cartesian product case, we have one destination task for each source chunk combination. + * A source is a source vertex or a source vertex group. A chunk is a part of source output. The + * mapping from source chunk to destination task id is done by {@link <CartesianProductCombination>} + * + * This requires source output to be partitioned with a round robin partitioner, even data is + * unpartitioned intrinsically. By doing this, we can achieve arbitrary parallelism. + * + * It tries to distribute work evenly by having each task to perform similar number of cartesian + * product operations. To achieve this, it estimate #record from each source and total # ops. + * + * The parallelism is decided based on estimated total #ops and two configurations, max allowed + * parallelism and min-ops-per-worker. The max parallelism will be tried first and used if resulting + * #ops-per-worker is no less than min-ops-per-worker. Otherwise, parallelism will be total # ops + * divided by #ops-per-worker. + * + * To reduce shuffle overhead, we try to group output from same task first. A chunk from a source + * vertex contains continuous physical output from a task or its neighboring task. + * + * Vertex group is supported. Chunk i of a source group contains chunk i of every vertex in this + * group. + */ +class FairCartesianProductVertexManager extends CartesianProductVertexManagerReal { + /** + * a cartesian product source. + * Chunk i of a source contains chunk i of every vertex in this source + */ + static class Source { + // list of source vertices of this source + List<SrcVertex> srcVertices = new ArrayList<>(); + // position of this source in all sources + int position; + // name of source vertex or vertex group + String name; + + // total number of chunks in this source + // each vertex in this source has same numChunk + int numChunk; + // total number of acknowledged output record (before reconfiguration) + // or estimated total number of output record (after reconfiguration) + long numRecord; + + public String toString(boolean afterReconfigure) { + StringBuilder sb = new StringBuilder(); + sb.append("Source at position "); + sb.append(position); + if (name != null) { + sb.append(", "); + sb.append("name "); + sb.append(name); + + } + sb.append("num chunk ").append(numChunk); + sb.append(": {"); + for (SrcVertex srcV : srcVertices) { + sb.append("["); + sb.append(srcV.toString(afterReconfigure)); + sb.append("], "); + } + sb.deleteCharAt(sb.length() - 1); + sb.setCharAt(sb.length() - 1, '}'); + return sb.toString(); + } + + // estimate total number of output record from all vertices in this group + public long estimateNumRecord() { + long estimation = 0; + for (SrcVertex srcV : srcVertices) { + estimation += srcV.estimateNumRecord(); + } + return estimation; + } + + private boolean isChunkCompleted(int chunkId) { + // a chunk is completed only if its corresponding chunk in each vertex is completed + for (SrcVertex srcV : srcVertices) { + if (!srcV.isChunkCompleted(chunkId)) { + return false; + } + } + return true; + } + + public int getNumTask() { + int numTask = 0; + for (SrcVertex srcV : srcVertices) { + numTask += srcV.numTask; + } + return numTask; + } + + public SrcVertex getSrcVertexWithMostOutput() { + SrcVertex srcVWithMaxOutput = null; + for (SrcVertex srcV : srcVertices) { + if (srcVWithMaxOutput == null || srcV.numRecord > srcVWithMaxOutput.numRecord) { + srcVWithMaxOutput = srcV; + } + } + return srcVWithMaxOutput; + } + } + + /** + * a cartesian product source vertex + */ + class SrcVertex { + // which source this vertex belongs to + Source source; + // vertex name + String name; + int numTask; + + RoaringBitmap taskCompleted = new RoaringBitmap(); + RoaringBitmap taskWithVMEvent = new RoaringBitmap(); + // total number of acknowledged output record (before reconfiguration) + // or estimated total number of output record (after reconfiguration) + long numRecord; + + public String toString(boolean afterReconfigure) { + StringBuilder sb = new StringBuilder(); + sb.append("vertex ").append(name).append(", "); + if (afterReconfigure) { + sb.append("estimated # output records ").append(numRecord).append(", "); + sb.append("# chunks ").append(source.numChunk); + } else { + sb.append(numTask).append(" tasks, "); + sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, "); + sb.append("numRecord ").append(numRecord); + } + return sb.toString(); + } + + public long estimateNumRecord() { + if (taskWithVMEvent.isEmpty()) { + return 0; + } else { + return numRecord * numTask / taskWithVMEvent.getCardinality(); + } + } + + public boolean isChunkCompleted(int chunkId) { + grouper.init(numTask * numPartitions, source.numChunk); + int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / maxParallelism; + int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / maxParallelism; + for (int relevantTask = firstRelevantTask; relevantTask <= lastRelevantTask; relevantTask++) { + if (!taskCompleted.contains(relevantTask)) { + return false; + } + } + return true; + } + } + + private static final Logger LOG = + org.slf4j.LoggerFactory.getLogger(FairCartesianProductVertexManager.class); + + private CartesianProductConfigProto config; + private List<String> sourceList; + private Map<String, Source> sourcesByName = new HashMap<>(); + private Map<String, SrcVertex> srcVerticesByName = new HashMap<>(); + private boolean enableGrouping; + private int maxParallelism; + private int numPartitions; + private long minOpsPerWorker; + + private long minNumRecordForEstimation; + private boolean vertexReconfigured = false; + private boolean vertexStarted = false; + private boolean vertexStartSchedule = false; + private int numCPSrcNotInConfigureState = 0; + private int numBroadcastSrcNotInRunningState = 0; + private Queue<TaskAttemptIdentifier> completedSrcTaskToProcess = new LinkedList<>(); + private RoaringBitmap scheduledTasks = new RoaringBitmap(); + + private int parallelism; + + /* auto reduce related */ + // num of chunks of source at the corresponding position in source list + private int[] numChunksPerSrc; + private Grouper grouper = new Grouper(); + + public FairCartesianProductVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize(CartesianProductConfigProto config) throws Exception { + this.config = config; + maxParallelism = config.hasMaxParallelism() ? config.getMaxParallelism() + :CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT; + enableGrouping = config.hasEnableGrouping() ? config.getEnableGrouping() + :CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT; + minOpsPerWorker = config.hasMinOpsPerWorker() ? config.getMinOpsPerWorker() + : CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT; + sourceList = config.getSourcesList(); + if (config.hasNumPartitionsForFairCase()) { + numPartitions = config.getNumPartitionsForFairCase(); + } else { + numPartitions = (int) Math.pow(maxParallelism, 1.0 / sourceList.size()); + } + + for (Map.Entry<String, EdgeProperty> e : getContext().getInputVertexEdgeProperties().entrySet()) { + if (e.getValue().getDataMovementType() == CUSTOM + && e.getValue().getEdgeManagerDescriptor().getClassName() + .equals(CartesianProductEdgeManager.class.getName())) { + srcVerticesByName.put(e.getKey(), new SrcVertex()); + srcVerticesByName.get(e.getKey()).name = e.getKey(); + getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.CONFIGURED)); + numCPSrcNotInConfigureState++; + } else { + getContext().registerForVertexStateUpdates(e.getKey(), EnumSet.of(VertexState.RUNNING)); + numBroadcastSrcNotInRunningState++; + } + } + + Map<String, List<String>> srcGroups = getContext().getInputVertexGroups(); + for (int i = 0; i < sourceList.size(); i++) { + String srcName = sourceList.get(i); + Source source = new Source(); + source.position = i; + if (srcGroups.containsKey(srcName)) { + source.name = srcName; + for (String srcVName : srcGroups.get(srcName)) { + source.srcVertices.add(srcVerticesByName.get(srcVName)); + srcVerticesByName.get(srcVName).source = source; + } + } else { + source.srcVertices.add(srcVerticesByName.get(srcName)); + srcVerticesByName.get(srcName).source = source; + } + sourcesByName.put(srcName, source); + } + + minNumRecordForEstimation = + (long) Math.pow(minOpsPerWorker * maxParallelism, 1.0 / sourceList.size()); + + numChunksPerSrc = new int[sourcesByName.size()]; + getContext().vertexReconfigurationPlanned(); + } + + @Override + public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) + throws Exception { + vertexStarted = true; + if (completions != null) { + for (TaskAttemptIdentifier attempt : completions) { + addCompletedSrcTaskToProcess(attempt); + } + } + tryScheduleTasks(); + } + + @Override + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws IOException { + String vertex = stateUpdate.getVertexName(); + VertexState state = stateUpdate.getVertexState(); + + if (state == VertexState.CONFIGURED) { + srcVerticesByName.get(vertex).numTask = getContext().getVertexNumTasks(vertex); + numCPSrcNotInConfigureState--; + } else if (state == VertexState.RUNNING) { + numBroadcastSrcNotInRunningState--; + } + tryScheduleTasks(); + } + + @Override + public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { + addCompletedSrcTaskToProcess(attempt); + tryScheduleTasks(); + } + + private void addCompletedSrcTaskToProcess(TaskAttemptIdentifier attempt) { + int taskId = attempt.getTaskIdentifier().getIdentifier(); + String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); + SrcVertex srcV = srcVerticesByName.get(vertex); + if (srcV != null && !srcV.taskCompleted.contains(taskId)) { + srcV.taskCompleted.add(taskId); + completedSrcTaskToProcess.add(attempt); + } + } + + private boolean tryStartSchedule() { + vertexStartSchedule = + (vertexReconfigured && vertexStarted && numBroadcastSrcNotInRunningState == 0); + return vertexStartSchedule; + } + + @Override + public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) + throws IOException { + /* vmEvent after reconfigure doesn't matter */ + if (vertexReconfigured) { + return; + } + + if (vmEvent.getUserPayload() != null) { + String srcVertex = + vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getVertexIdentifier().getName(); + SrcVertex srcV = srcVerticesByName.get(srcVertex); + + // vmEvent from non-cp vertex doesn't matter + if (srcV == null) { + return; + } + + VertexManagerEventPayloadProto proto = + VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vmEvent.getUserPayload())); + srcV.numRecord += proto.getNumRecord(); + srcV.taskWithVMEvent.add( + vmEvent.getProducerAttemptIdentifier().getTaskIdentifier().getIdentifier()); + } + + tryScheduleTasks(); + } + + private void reconfigureWithZeroTask() { + getContext().reconfigureVertex(0, null, null); + vertexReconfigured = true; + getContext().doneReconfiguringVertex(); + } + + private boolean tryReconfigure() throws IOException { + if (numCPSrcNotInConfigureState > 0) { + return false; + } + + for (Source src : sourcesByName.values()) { + if (src.getNumTask() == 0) { + parallelism = 0; + reconfigureWithZeroTask(); + return true; + } + } + + if (config.hasGroupingFraction() && config.getGroupingFraction() > 0) { + // every src vertex must complete a certain number of task before we do estimation + for (SrcVertex srcV : srcVerticesByName.values()) { + if (srcV.taskCompleted.getCardinality() < srcV.numTask + && (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality() + || srcV.numRecord == 0)) { + return false; + } + } + } else { + // every src vertex must generate enough output records before we do estimation + // or all its tasks already finish but we cannot get enough result for estimation + for (SrcVertex srcV : srcVerticesByName.values()) { + if (srcV.numRecord < minNumRecordForEstimation + && srcV.taskWithVMEvent.getCardinality() < srcV.numTask) { + return false; + } + } + } + + LOG.info("Start reconfigure, " + + ", max parallelism: " + maxParallelism + + ", min-ops-per-worker: " + minOpsPerWorker); + for (Source src : sourcesByName.values()) { + LOG.info(src.toString(false)); + } + + long totalOps = 1; + for (Source src : sourcesByName.values()) { + src.numRecord = src.estimateNumRecord(); + if (src.numRecord == 0) { + reconfigureWithZeroTask(); + return true; + } + + try { + totalOps = LongMath.checkedMultiply(totalOps, src.numRecord); + } catch (ArithmeticException e) { + totalOps = Long.MAX_VALUE; + } + } + + // determine initial parallelism + if (totalOps / minOpsPerWorker >= maxParallelism) { + parallelism = maxParallelism; + } else { + parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker); + } + + // determine num chunk for each source by weighted factorization of initial parallelism + // final parallelism will be product of all #chunk + double k = Math.log10(parallelism); + for (Source src : sourcesByName.values()) { + k -= Math.log10(src.numRecord); + } + k = Math.pow(10, k / sourcesByName.size()); + + parallelism = 1; + for (Source src : sourcesByName.values()) { + if (enableGrouping) { + src.numChunk = Math.min(src.getSrcVertexWithMostOutput().numTask * numPartitions, + Math.max(1, (int) (src.numRecord * k))); + } else { + src.numChunk = src.getSrcVertexWithMostOutput().numTask; + } + parallelism *= src.numChunk; + } + + LOG.info("After reconfigure, "); + for (Source src : sourcesByName.values()) { + LOG.info(src.toString(false)); + } + LOG.info("Final parallelism: " + parallelism); + + for (int i = 0; i < numChunksPerSrc.length; i++) { + numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk; + } + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(config); + builder.addAllNumChunks(Ints.asList(this.numChunksPerSrc)); + + Map<String, EdgeProperty> edgeProperties = getContext().getInputVertexEdgeProperties(); + Iterator<Map.Entry<String, EdgeProperty>> iter = edgeProperties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<String, EdgeProperty> e = iter.next(); + if (e.getValue().getDataMovementType() != CUSTOM) { + iter.remove(); + } + } + + // send out vertex group info for computing physical input id of destination task + for (Source src : sourcesByName.values()) { + builder.clearNumTaskPerVertexInGroup(); + for (int i = 0; i < src.srcVertices.size(); i++) { + SrcVertex srcV = src.srcVertices.get(i); + builder.setPositionInGroup(i); + edgeProperties.get(srcV.name).getEdgeManagerDescriptor() + .setUserPayload(UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray()))); + builder.addNumTaskPerVertexInGroup(srcV.numTask); + } + } + getContext().reconfigureVertex(parallelism, null, edgeProperties); + vertexReconfigured = true; + getContext().doneReconfiguringVertex(); + return true; + } + + private void tryScheduleTasks() throws IOException { + if (!vertexReconfigured && !tryReconfigure()) { + return; + } + if (!vertexStartSchedule && !tryStartSchedule()) { + return; + } + + while (!completedSrcTaskToProcess.isEmpty()) { + scheduleTasksDependOnCompletion(completedSrcTaskToProcess.poll()); + } + } + + private void scheduleTasksDependOnCompletion(TaskAttemptIdentifier attempt) { + if (parallelism == 0) { + return; + } + + int taskId = attempt.getTaskIdentifier().getIdentifier(); + String vertex = attempt.getTaskIdentifier().getVertexIdentifier().getName(); + SrcVertex srcV = srcVerticesByName.get(vertex); + Source src = srcV.source; + + List<ScheduleTaskRequest> requests = new ArrayList<>(); + CartesianProductCombination combination = + new CartesianProductCombination(numChunksPerSrc, src.position); + + grouper.init(srcV.numTask * maxParallelism, src.numChunk); + int firstRelevantChunk = grouper.getGroupId(taskId * maxParallelism); + int lastRelevantChunk = grouper.getGroupId(taskId * maxParallelism + maxParallelism - 1); + for (int chunkId = firstRelevantChunk; chunkId <= lastRelevantChunk; chunkId++) { + combination.firstTaskWithFixedChunk(chunkId); + do { + List<Integer> list = combination.getCombination(); + + if (scheduledTasks.contains(combination.getTaskId())) { + continue; + } + + // a task is ready for schedule only if all its src chunk has been completed + boolean readyToSchedule = src.isChunkCompleted(list.get(src.position)); + for (int srcId = 0; readyToSchedule && srcId < list.size(); srcId++) { + if (srcId != src.position){ + readyToSchedule = + sourcesByName.get(sourceList.get(srcId)).isChunkCompleted(list.get(srcId)); + } + } + + if (readyToSchedule) { + requests.add(ScheduleTaskRequest.create(combination.getTaskId(), null)); + scheduledTasks.add(combination.getTaskId()); + } + } while (combination.nextTaskWithFixedChunk()); + } + + if (!requests.isEmpty()) { + getContext().scheduleTasks(requests); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java new file mode 100644 index 0000000..3c3e41a --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/RoundRobinPartitioner.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.partitioner; + +import org.apache.tez.runtime.library.api.Partitioner; + +public class RoundRobinPartitioner implements Partitioner { + private int x = 0; + + @Override + public int getPartition(Object key, Object value, int numPartitions) { + x = x % numPartitions; + return (x++) % numPartitions; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java index 73a8c87..b99f3d4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/Grouper.java @@ -20,70 +20,70 @@ package org.apache.tez.runtime.library.utils; import com.google.common.base.Preconditions; /** - * This grouper group specified number of tasks into specified number of groups. + * This grouper group specified number of items into specified number of groups. * - * If numTask%numGroup is zero, every group has numTask/numGroup tasks. - * Otherwise, every group will get numTask/numGroup tasks first, and remaining tasks will be - * distributed in last numTask-numTask%numGroup*numGroup groups (one task for each group). - * For example, if we group 8 tasks into 3 groups, each group get {2, 3, 3} tasks. + * If numItem%numGroup is zero, every group has numItem/numGroup items. + * Otherwise, every group will get numItem/numGroup items first, and remaining items will be + * distributed in last numItem-numItem%numGroup*numGroup groups (one item for each group). + * For example, if we group 8 items into 3 groups, each group get {2, 3, 3} items. */ public class Grouper { private int numGroup; - private int numTask; + private int numItem; private int numGroup1; - private int taskPerGroup1; + private int itemPerGroup1; private int numGroup2; - private int taskPerGroup2; + private int itemPerGroup2; - public Grouper init(int numTask, int numGroup) { + public Grouper init(int numItem, int numGroup) { Preconditions.checkArgument(numGroup > 0, "Number of groups is " + numGroup + ". Should be positive"); - Preconditions.checkArgument(numTask > 0, - "Number of tasks is " + numTask + ". Should be positive"); - Preconditions.checkArgument(numTask >= numGroup, - "Num of groups + " + numGroup + " shouldn't be more than number of tasks " + numTask); - this.numTask = numTask; + Preconditions.checkArgument(numItem > 0, + "Number of items is " + numItem + ". Should be positive"); + Preconditions.checkArgument(numItem >= numGroup, + "Num of groups + " + numGroup + " shouldn't be more than number of items " + numItem); + this.numItem = numItem; this.numGroup = numGroup; - this.taskPerGroup1 = numTask / numGroup; - this.taskPerGroup2 = taskPerGroup1 + 1; - this.numGroup2 = numTask % numGroup; + this.itemPerGroup1 = numItem / numGroup; + this.itemPerGroup2 = itemPerGroup1 + 1; + this.numGroup2 = numItem % numGroup; this.numGroup1 = numGroup - numGroup2; return this; } - public int getFirstTaskInGroup(int groupId) { + public int getFirstItemInGroup(int groupId) { Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId " + groupId); if (groupId < numGroup1) { - return groupId * taskPerGroup1; + return groupId * itemPerGroup1; } else { - return groupId * taskPerGroup1 + (groupId - numGroup1); + return groupId * itemPerGroup1 + (groupId - numGroup1); } } - public int getNumTasksInGroup(int groupId) { + public int getNumItemsInGroup(int groupId) { Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId); - return groupId < numGroup1 ? taskPerGroup1 : taskPerGroup2; + return groupId < numGroup1 ? itemPerGroup1 : itemPerGroup2; } - public int getLastTaskInGroup(int groupId) { + public int getLastItemInGroup(int groupId) { Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId); - return getFirstTaskInGroup(groupId) + getNumTasksInGroup(groupId) - 1; + return getFirstItemInGroup(groupId) + getNumItemsInGroup(groupId) - 1; } - public int getGroupId(int taskId) { - Preconditions.checkArgument(0 <= taskId && taskId < numTask, "Invalid taskId" + taskId); - if (taskId < taskPerGroup1 * numGroup1) { - return taskId/taskPerGroup1; + public int getGroupId(int itemId) { + Preconditions.checkArgument(0 <= itemId && itemId < numItem, "Invalid itemId" + itemId); + if (itemId < itemPerGroup1 * numGroup1) { + return itemId/ itemPerGroup1; } else { - return numGroup1 + (taskId - taskPerGroup1 * numGroup1) / taskPerGroup2; + return numGroup1 + (itemId - itemPerGroup1 * numGroup1) / itemPerGroup2; } } - public boolean isInGroup(int taskId, int groupId) { + public boolean isInGroup(int itemId, int groupId) { Preconditions.checkArgument(0 <= groupId && groupId < numGroup, "Invalid groupId" + groupId); - Preconditions.checkArgument(0 <= taskId && taskId < numTask, "Invalid taskId" + taskId); - return getFirstTaskInGroup(groupId) <= taskId - && taskId < getFirstTaskInGroup(groupId) + getNumTasksInGroup(groupId); + Preconditions.checkArgument(0 <= itemId && itemId < numItem, "Invalid itemId" + itemId); + return getFirstItemInGroup(groupId) <= itemId + && itemId < getFirstItemInGroup(groupId) + getNumItemsInGroup(groupId); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto index cb503ea..ae65502 100644 --- a/tez-runtime-library/src/main/proto/CartesianProductPayload.proto +++ b/tez-runtime-library/src/main/proto/CartesianProductPayload.proto @@ -27,9 +27,12 @@ message CartesianProductConfigProto { optional bytes filterUserPayload = 5; optional float minFraction = 6; optional float maxFraction = 7; - optional bool enableAutoGrouping = 8; - optional int64 desiredBytesPerChunk = 9; + optional int32 maxParallelism = 8; + optional int64 minOpsPerWorker = 9; repeated int32 numChunks = 10; - optional int32 numChunk = 11; - optional int32 chunkIdOffset = 12; + repeated int32 numTaskPerVertexInGroup = 11; + optional int32 positionInGroup = 12; + optional int32 numPartitionsForFairCase = 13; + optional bool enableGrouping = 14; + optional float groupingFraction = 15; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java index 3755ac8..4193aec 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductCombination.java @@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue; public class TestCartesianProductCombination { private void verifyCombination(CartesianProductCombination combination, int[] result, int taskId) { assertArrayEquals(result, Ints.toArray(combination.getCombination())); - assertEquals(taskId, combination.getChunkId()); + assertEquals(taskId, combination.getTaskId()); } private void testCombinationTwoWayVertex0() { http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java index 4857749..3b4200a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductConfig.java @@ -36,7 +36,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; public class TestCartesianProductConfig { private TezConfiguration conf; @@ -66,7 +65,7 @@ public class TestCartesianProductConfig { } @Test(timeout = 5000) - public void testSerializationUnpartitioned() throws Exception { + public void testSerializationFair() throws Exception { List<String> sourceVertices = new ArrayList<>(); sourceVertices.add("v1"); sourceVertices.add("v2"); @@ -77,7 +76,7 @@ public class TestCartesianProductConfig { CartesianProductConfig parsedConfig = CartesianProductConfig.fromUserPayload(payload); assertConfigEquals(config, parsedConfig); - // unpartitioned config should have null in numPartitions fields + // fair cartesian product config should have null in numPartitions fields try { config = new CartesianProductConfig(false, new int[]{}, new String[]{"v0","v1"},null); config.checkNumPartitions(); @@ -113,24 +112,34 @@ public class TestCartesianProductConfig { } @Test(timeout = 5000) - public void testAutoGroupingConfig() { + public void testFairCartesianProductConfig() { List<String> sourceVertices = new ArrayList<>(); sourceVertices.add("v0"); sourceVertices.add("v1"); CartesianProductConfig config = new CartesianProductConfig(sourceVertices); - // auto grouping conf not set + // conf not set CartesianProductConfigProto proto = config.toProto(conf); - assertFalse(proto.hasEnableAutoGrouping()); - assertFalse(proto.hasDesiredBytesPerChunk()); + assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM_DEFAULT, + proto.getMaxParallelism()); + assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER_DEFAULT, + proto.getMinOpsPerWorker()); + assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING_DEFAULT, + proto.getEnableGrouping()); + assertFalse(proto.hasNumPartitionsForFairCase()); + assertFalse(proto.hasGroupingFraction()); - // auto groupinig conf not set - conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true); - conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000); + // conf set + conf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, 1000); + conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER, 1000000); + conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING, false); + conf.setFloat(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_GROUPING_FRACTION, 0.75f); + conf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS, 25); proto = config.toProto(conf); - assertTrue(proto.hasEnableAutoGrouping()); - assertTrue(proto.hasDesiredBytesPerChunk()); - assertEquals(true, proto.getEnableAutoGrouping()); - assertEquals(1000, proto.getDesiredBytesPerChunk()); + assertEquals(1000, proto.getMaxParallelism()); + assertEquals(1000000, proto.getMinOpsPerWorker()); + assertFalse(proto.getEnableGrouping()); + assertEquals(0.75f, proto.getGroupingFraction(), 0.01); + assertEquals(25, proto.getNumPartitionsForFairCase()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java index d722932..58f460f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManager.java @@ -41,7 +41,8 @@ public class TestCartesianProductEdgeManager { CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); builder.setIsPartitioned(true) .addAllSources(Arrays.asList("v0", "v1")) - .addAllNumPartitions(Ints.asList(2,3)); + .addAllNumPartitions(Ints.asList(2,3)) + .setMaxParallelism(100).setMinOpsPerWorker(1); UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); when(context.getUserPayload()).thenReturn(payload); edgeManager.initialize(); @@ -52,12 +53,13 @@ public class TestCartesianProductEdgeManager { builder.clear(); builder.setIsPartitioned(false) .addAllSources(Arrays.asList("v0", "v1")) - .addAllNumChunks(Ints.asList(2,3)); + .addAllNumChunks(Ints.asList(2,3)) + .setMaxParallelism(100).setMinOpsPerWorker(1); payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); when(context.getUserPayload()).thenReturn(payload); when(context.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(); assertTrue(edgeManager.getEdgeManagerReal() - instanceof CartesianProductEdgeManagerUnpartitioned); + instanceof FairCartesianProductEdgeManager); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java deleted file mode 100644 index 3ba6aad..0000000 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import com.google.common.primitives.Ints; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -public class TestCartesianProductEdgeManagerConfig { - @Test(timeout = 5000) - public void testUnpartitionedAutoGroupingConfig() throws IOException { - List<String> sourceVertices = new ArrayList<>(); - sourceVertices.add("v0"); - sourceVertices.add("v1"); - int[] numChunkPerSrc = new int[] {2, 3}; - int numGroup = 3, chunkIdOffset = 0; - - CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); - builder.setIsPartitioned(false).addAllNumChunks(Ints.asList(numChunkPerSrc)) - .addAllSources(sourceVertices).setNumChunk(numGroup).setChunkIdOffset(chunkIdOffset); - UserPayload payload = UserPayload.create(ByteBuffer.wrap(builder.build().toByteArray())); - - CartesianProductEdgeManagerConfig config = - CartesianProductEdgeManagerConfig.fromUserPayload(payload); - assertArrayEquals(numChunkPerSrc, config.numChunksPerSrc); - assertEquals(numGroup, config.numChunk); - assertEquals(chunkIdOffset, config.chunkIdOffset); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java index b586de6..462760f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java @@ -17,6 +17,7 @@ */ package org.apache.tez.runtime.library.cartesianproduct; +import com.google.protobuf.ByteString; import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; @@ -27,6 +28,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Map; +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -50,19 +52,22 @@ public class TestCartesianProductEdgeManagerPartitioned { */ @Test(timeout = 5000) public void testTwoWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, - new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, null); + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(true).addSources("v0").addSources("v1") + .addNumPartitions(3).addNumPartitions(4); when(mockContext.getDestinationVertexNumTasks()).thenReturn(12); - testTwoWayV0(emConfig); - testTwoWayV1(emConfig); + CartesianProductConfigProto config = builder.build(); + testTwoWayV0(config); + testTwoWayV1(config); } - private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception { + private void testTwoWayV0(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v0"); when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(0, compositeRoutingData.getSource()); @@ -88,12 +93,13 @@ public class TestCartesianProductEdgeManagerPartitioned { assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2)); } - private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception { + private void testTwoWayV1(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v1"); when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(1, compositeRoutingData.getSource()); @@ -138,25 +144,25 @@ public class TestCartesianProductEdgeManagerPartitioned { */ @Test(timeout = 5000) public void testTwoWayWithFilter() throws Exception { - ByteBuffer buffer = ByteBuffer.allocate(2); - buffer.putChar('>'); + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + ByteBuffer buffer = ByteBuffer.allocate(2).putChar('>'); buffer.flip(); - CartesianProductFilterDescriptor filterDescriptor = - new CartesianProductFilterDescriptor(TestFilter.class.getName()) - .setUserPayload(UserPayload.create(buffer)); - CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, - new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, filterDescriptor); + builder.setIsPartitioned(true).addSources("v0").addSources("v1") + .addNumPartitions(3).addNumPartitions(4).setFilterClassName(TestFilter.class.getName()) + .setFilterUserPayload(ByteString.copyFrom(buffer)); + CartesianProductConfigProto config = builder.build(); when(mockContext.getDestinationVertexNumTasks()).thenReturn(3); - testTwoWayV0WithFilter(emConfig); - testTwoWayV1WithFilter(emConfig); + testTwoWayV0WithFilter(config); + testTwoWayV1WithFilter(config); } - private void testTwoWayV0WithFilter(CartesianProductEdgeManagerConfig config) throws Exception { + private void testTwoWayV0WithFilter(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v0"); when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(2, compositeRoutingData.getSource()); @@ -174,12 +180,13 @@ public class TestCartesianProductEdgeManagerPartitioned { assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2)); } - private void testTwoWayV1WithFilter(CartesianProductEdgeManagerConfig config) throws Exception { + private void testTwoWayV1WithFilter(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v1"); when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(0, compositeRoutingData.getSource()); @@ -204,21 +211,25 @@ public class TestCartesianProductEdgeManagerPartitioned { */ @Test(timeout = 5000) public void testThreeWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, - new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, 0, 0, null); + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(true).addSources("v0").addSources("v1").addSources("v2") + .addNumPartitions(4).addNumPartitions(3).addNumPartitions(2); + CartesianProductConfigProto config = builder.build(); + when(mockContext.getDestinationVertexNumTasks()).thenReturn(24); - testThreeWayV0(emConfig); - testThreeWayV1(emConfig); - testThreeWayV2(emConfig); + testThreeWayV0(config); + testThreeWayV1(config); + testThreeWayV2(config); } - private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception { + private void testThreeWayV0(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v0"); when(mockContext.getSourceVertexNumTasks()).thenReturn(2); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(0, compositeRoutingData.getSource()); @@ -236,12 +247,13 @@ public class TestCartesianProductEdgeManagerPartitioned { assertEquals(4, edgeManager.getNumSourceTaskPhysicalOutputs(2)); } - private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception { + private void testThreeWayV1(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v1"); when(mockContext.getSourceVertexNumTasks()).thenReturn(3); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(0, compositeRoutingData.getSource()); @@ -259,12 +271,13 @@ public class TestCartesianProductEdgeManagerPartitioned { assertEquals(3, edgeManager.getNumSourceTaskPhysicalOutputs(2)); } - private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception { + private void testThreeWayV2(CartesianProductConfigProto config) throws Exception { when(mockContext.getSourceVertexName()).thenReturn("v2"); when(mockContext.getSourceVertexNumTasks()).thenReturn(4); edgeManager.initialize(config); - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); + CompositeEventRouteMetadata compositeRoutingData = + edgeManager.routeCompositeDataMovementEventToDestination(1, 1); assertNotNull(compositeRoutingData); assertEquals(1, compositeRoutingData.getCount()); assertEquals(1, compositeRoutingData.getSource()); http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java deleted file mode 100644 index 1ce9c8b..0000000 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import org.apache.tez.dag.api.EdgeManagerPluginContext; -import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; -import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForDest; -import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForInputError; -import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForRouting; -import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForSrc; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestCartesianProductEdgeManagerUnpartitioned { - private EdgeManagerPluginContext mockContext; - private CartesianProductEdgeManagerUnpartitioned edgeManager; - - @Before - public void setup() { - mockContext = mock(EdgeManagerPluginContext.class); - edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext); - } - - static class TestData { - int srcId, destId, inputId; - Object expected; - - public TestData(int srcId, int destId, int inputId, Object expected) { - this.srcId = srcId; - this.destId = destId; - this.inputId = inputId; - this.expected = expected; - } - - public static TestData dataForRouting(int srcId, int destId, Object expected) { - return new TestData(srcId, destId, -1, expected); - } - - public static TestData dataForInputError(int destId, int inputId, Object expected) { - return new TestData(-1, destId, inputId, expected); - } - - public static TestData dataForSrc(int srcId, Object expected) { - return new TestData(srcId, -1, -1, expected); - } - - public static TestData dataForDest(int destId, Object expected) { - return new TestData(-1, destId, -1, expected); - } - } - - private void testEdgeManager(CartesianProductEdgeManagerConfig conf, String vName, int numTask, - String groupName, TestData cDMEInvalid, TestData cDMEValid, - TestData srcFailInvalid, TestData srcFailValid, - TestData inputError, TestData numDestInput, - TestData numSrcOutputTest, TestData numConsumerTest) - throws Exception { - when(mockContext.getSourceVertexName()).thenReturn(vName); - when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask); - when(mockContext.getVertexGroupName()).thenReturn(groupName); - edgeManager.initialize(conf); - - CompositeEventRouteMetadata cDME = - edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId, - cDMEInvalid.destId); - assertNull(cDME); - - cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId, - cDMEValid.destId); - assertNotNull(cDME); - CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected); - assertEquals(expectedCDME.getCount(), cDME.getCount()); - assertEquals(expectedCDME.getTarget(), cDME.getTarget()); - assertEquals(expectedCDME.getSource(), cDME.getSource()); - - EventRouteMetadata dme = - edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId, - srcFailInvalid.destId); - assertNull(dme); - - dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId, - srcFailValid.destId); - assertNotNull(dme); - EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected); - assertEquals(expectedDME.getNumEvents(), dme.getNumEvents()); - assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices()); - - assertEquals(inputError.expected, - edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId)); - - assertEquals(numDestInput.expected, - edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId)); - assertEquals(numSrcOutputTest.expected, - edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId)); - assertEquals(numConsumerTest.expected, - edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId)); - } - - /** - * Vertex v0 has 2 tasks - * Vertex v1 has 3 tasks - */ - @Test(timeout = 5000) - public void testTwoWayAllVertex() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, - new int[]{2,3}, 2, 0, null), "v0", 2, null, - dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, - new int[]{2,3}, 3, 0, null), "v1", 3, null, - dataForRouting(1, 2, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 2, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1,0,1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2)); - } - - /** - * Vertex v0 has 2 tasks - * Vertex v1 has 3 tasks - * Vertex v2 has 4 tasks - */ - @Test(timeout = 5000) - public void testThreeWayAllVertex() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, - null, new int[]{2,3,4}, 2, 0, null), "v0", 2, null, - dataForRouting(1, 1, null), dataForRouting(1, 12, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 1, null), dataForRouting(1, 12, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 12)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, - null, new int[]{2,3,4}, 3, 0, null), "v1", 3, null, - dataForRouting(1, 1, null), dataForRouting(1, 16, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 1, null), dataForRouting(1, 16, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 8)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, - null, new int[]{2,3,4}, 4, 0, null), "v2", 4, null, - dataForRouting(1, 0, null), dataForRouting(1, 13, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 0, null), dataForRouting(1, 13, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 6)); - } - - @Test(timeout = 5000) - public void testZeroSrcTask() { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null, - new int[]{2,0}, 0,0, null); - testZeroSrcTaskV0(emConfig); - testZeroSrcTaskV1(emConfig); - } - - private void testZeroSrcTaskV0(CartesianProductEdgeManagerConfig config) { - when(mockContext.getSourceVertexName()).thenReturn("v0"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(2); - edgeManager.initialize(config); - - assertEquals(0, edgeManager.getNumDestinationConsumerTasks(0)); - assertEquals(0, edgeManager.getNumDestinationConsumerTasks(1)); - } - - private void testZeroSrcTaskV1(CartesianProductEdgeManagerConfig config) { - when(mockContext.getSourceVertexName()).thenReturn("v1"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(0); - edgeManager.initialize(config); - } - - /** - * Vertex v0 has 10 tasks 2 groups - * Vertex v1 has 30 tasks 3 group - */ - @Test(timeout = 5000) - public void testTwoWayAllVertexAutoGrouping() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, - null, new int[]{2,3}, 2, 0, null), "v0", 10, null, - dataForRouting(6, 1, null), dataForRouting(1, 0, CompositeEventRouteMetadata.create(1, 1, 0)), - dataForRouting(6, 1, null), dataForRouting(1, 0, EventRouteMetadata.create(1, new int[]{1})), - dataForInputError(1, 1, 1), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, - null, new int[]{2,3}, 3, 0, null), "v1", 30, null, - dataForRouting(6, 1, null), dataForRouting(11, 1, CompositeEventRouteMetadata.create(1, 1, 0)), - dataForRouting(6, 1, null), dataForRouting(11, 1, EventRouteMetadata.create(1, new int[]{1})), - dataForInputError(1, 1, 11), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2)); - } - - /** - * v0 with group g0 {v1, v2} - * Vertex v0 has 2 tasks - * Vertex v1 has 1 tasks - * Vertex v2 has 2 tasks - */ - @Test(timeout = 5000) - public void testTwoWayVertexWithVertexGroup() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 2, 0, null), "v0", 2, null, - dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 1, 0, null), "v1", 1, "g0", - dataForRouting(0, 1, null), dataForRouting(0, 3, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(0, 1, null), dataForRouting(0, 3, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(3, 0, 0), dataForDest(0, 1), dataForSrc(0, 1), dataForSrc(0, 2)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 2, 1, null), "v2", 2, "g0", - dataForRouting(1, 1, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 1, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2)); - } - - /** - * group g0 {v1, v2} with group g1 {v3, v4} - * - * Vertex v0 has 1 tasks - * Vertex v1 has 2 tasks - * Vertex v2 has 3 tasks - * Vertex v3 has 4 tasks - */ - @Test(timeout = 5000) - public void testTwoWayAllVertexGroup() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, - null, new int[]{3,7}, 1, 0, null), "v0", 1, "g0", - dataForRouting(0, 7, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(0, 7, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(0, 1), dataForSrc(0, 7)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, - null, new int[]{3,7}, 2, 1, null), "v1", 2, "g0", - dataForRouting(0, 1, null), dataForRouting(1, 15, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(0, 1, null), dataForRouting(1, 15, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(7, 0, 0), dataForDest(7, 1), dataForSrc(1, 1), dataForSrc(1, 7)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, - null, new int[]{3,7}, 3, 0, null), "v2", 3, "g1", - dataForRouting(1, 0, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(1, 0, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, - null, new int[]{3,7}, 4, 3, null), "v3", 4, "g1", - dataForRouting(0, 1, null), dataForRouting(1, 4, CompositeEventRouteMetadata.create(1, 0, 0)), - dataForRouting(0, 1, null), dataForRouting(1, 4, EventRouteMetadata.create(1, new int[]{0})), - dataForInputError(4, 0, 1), dataForDest(4, 1), dataForSrc(1, 1), dataForSrc(1, 3)); - } - - - /** - * v0 with group g0 {v1, v2} - * Vertex v0 has 10 tasks, 2 groups - * Vertex v1 has 10 tasks, 1 group - * Vertex v2 has 20 tasks, 2 groups - */ - @Test(timeout = 5000) - public void testTwoWayWithVertexGroupAutoGrouping() throws Exception { - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 2, 0, null), "v0", 10, null, - dataForRouting(0, 4, null), dataForRouting(2, 1, CompositeEventRouteMetadata.create(1, 2, 0)), - dataForRouting(0, 4, null), dataForRouting(2, 1, EventRouteMetadata.create(1, new int[]{2})), - dataForInputError(1, 3, 3), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 1, 0, null), "v1", 10, "g0", - dataForRouting(1, 1, null), dataForRouting(2, 3, CompositeEventRouteMetadata.create(1, 2, 0)), - dataForRouting(1, 1, null), dataForRouting(2, 3, EventRouteMetadata.create(1, new int[]{2})), - dataForInputError(3, 1, 1), dataForDest(0, 10), dataForSrc(1, 1), dataForSrc(1, 2)); - testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, - null, new int[]{2,3}, 2, 1, null), "v2", 20, "g0", - dataForRouting(11, 1, null), dataForRouting(12, 2, CompositeEventRouteMetadata.create(1, 2, 0)), - dataForRouting(11, 1, null), dataForRouting(12, 2, EventRouteMetadata.create(1, new int[]{2})), - dataForInputError(2, 2, 12), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java index 5144e69..5846c8b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java @@ -97,7 +97,7 @@ public class TestCartesianProductVertexManager { config = new CartesianProductConfig(sourceVertices); vertexManager.initialize(); assertTrue(vertexManager.getVertexManagerReal() - instanceof CartesianProductVertexManagerUnpartitioned); + instanceof FairCartesianProductVertexManager); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java deleted file mode 100644 index 5c6ffa7..0000000 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import org.apache.tez.dag.api.TezConfiguration; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class TestCartesianProductVertexManagerConfig { - @Test(timeout = 5000) - public void testAutoGroupingConfig() throws IOException { - List<String> sourceVertices = new ArrayList<>(); - sourceVertices.add("v0"); - sourceVertices.add("v1"); - CartesianProductConfig config = new CartesianProductConfig(sourceVertices); - TezConfiguration conf = new TezConfiguration(); - - // auto group not set in proto - CartesianProductVertexManagerConfig vmConf = - CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf)); - assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT, - vmConf.enableAutoGrouping); - assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT, - vmConf.desiredBytesPerChunk); - - // auto group set in proto - conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true); - conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000); - vmConf = CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf)); - assertEquals(true, vmConf.enableAutoGrouping); - assertEquals(1000, vmConf.desiredBytesPerChunk); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java index 36c0325..1012a36 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerPartitioned.java @@ -32,6 +32,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -66,14 +67,13 @@ public class TestCartesianProductVertexManagerPartitioned { @Before public void setup() throws TezReflectionException { - setupWithConfig( - new CartesianProductVertexManagerConfig(true, new String[]{"v0","v1"}, new int[] {2, 2}, - CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MIN_FRACTION_DEFAULT, - CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_SLOW_START_MAX_FRACTION_DEFAULT, - false, 0, null)); + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(true).addSources("v0").addSources("v1") + .addNumPartitions(2).addNumPartitions(2); + setupWithConfig(builder.build()); } - private void setupWithConfig(CartesianProductVertexManagerConfig config) + private void setupWithConfig(CartesianProductConfigProto config) throws TezReflectionException { MockitoAnnotations.initMocks(this); context = mock(VertexManagerPluginContext.class); @@ -102,7 +102,7 @@ public class TestCartesianProductVertexManagerPartitioned { } } - private void testReconfigureVertexHelper(CartesianProductVertexManagerConfig config, + private void testReconfigureVertexHelper(CartesianProductConfigProto config, int parallelism) throws Exception { setupWithConfig(config); @@ -117,12 +117,12 @@ public class TestCartesianProductVertexManagerPartitioned { @Test(timeout = 5000) public void testReconfigureVertex() throws Exception { - testReconfigureVertexHelper( - new CartesianProductVertexManagerConfig(true, new String[]{"v0", "v1"}, new int[] {5, 5}, 0, - 0, false, 0, new CartesianProductFilterDescriptor(TestFilter.class.getName())), 10); - testReconfigureVertexHelper( - new CartesianProductVertexManagerConfig(true, new String[]{"v0", "v1"}, new int[] {5, 5}, 0, - 0, false, 0, null), 25); + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(true).addSources("v0").addSources("v1") + .addNumPartitions(5).addNumPartitions(5).setFilterClassName(TestFilter.class.getName()); + testReconfigureVertexHelper(builder.build(), 10); + builder.clearFilterClassName(); + testReconfigureVertexHelper(builder.build(), 25); } @Test(timeout = 5000)
