Repository: tez Updated Branches: refs/heads/master 51972efec -> 788c1ad7f
TEZ-3737. FairCartesianProductVertexMananger used incorrect #partition (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/788c1ad7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/788c1ad7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/788c1ad7 Branch: refs/heads/master Commit: 788c1ad7f6e7cac72c62f42f1fdabcbc9b97892e Parents: 51972ef Author: Zhiyuan Yang <[email protected]> Authored: Wed May 24 12:33:03 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Wed May 24 12:33:03 2017 -0700 ---------------------------------------------------------------------- .../FairCartesianProductVertexManager.java | 50 ++++++++++---------- .../TestFairCartesianProductVertexManager.java | 12 ++--- 2 files changed, 32 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/788c1ad7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java index a38e20d..86e2080 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/cartesianproduct/FairCartesianProductVertexManager.java @@ -89,7 +89,8 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea // or estimated total number of output record (after reconfiguration) long numRecord; - public String toString(boolean afterReconfigure) { + @Override + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Source at position "); sb.append(position); @@ -99,11 +100,11 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea sb.append(name); } - sb.append("num chunk ").append(numChunk); + sb.append(", num chunk ").append(numChunk); sb.append(": {"); for (SrcVertex srcV : srcVertices) { sb.append("["); - sb.append(srcV.toString(afterReconfigure)); + sb.append(srcV.toString()); sb.append("], "); } sb.deleteCharAt(sb.length() - 1); @@ -165,17 +166,13 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea // or estimated total number of output record (after reconfiguration) long numRecord; - public String toString(boolean afterReconfigure) { + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("vertex ").append(name).append(", "); - if (afterReconfigure) { - sb.append("estimated # output records ").append(numRecord).append(", "); - sb.append("# chunks ").append(source.numChunk); - } else { - sb.append(numTask).append(" tasks, "); - sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, "); - sb.append("numRecord ").append(numRecord); - } + sb.append(numTask).append(" tasks, "); + sb.append(taskWithVMEvent.getCardinality()).append(" VMEvents, "); + sb.append("numRecord ").append(numRecord).append(", "); + sb.append("estimated # output records ").append(estimateNumRecord()); return sb.toString(); } @@ -189,8 +186,8 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea public boolean isChunkCompleted(int chunkId) { grouper.init(numTask * numPartitions, source.numChunk); - int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / maxParallelism; - int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / maxParallelism; + int firstRelevantTask = grouper.getFirstItemInGroup(chunkId) / numPartitions; + int lastRelevantTask = grouper.getLastItemInGroup(chunkId) / numPartitions; for (int relevantTask = firstRelevantTask; relevantTask <= lastRelevantTask; relevantTask++) { if (!taskCompleted.contains(relevantTask)) { return false; @@ -274,6 +271,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea srcVerticesByName.get(srcVName).source = source; } } else { + source.name = srcName; source.srcVertices.add(srcVerticesByName.get(srcName)); srcVerticesByName.get(srcName).source = source; } @@ -292,6 +290,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea throws Exception { vertexStarted = true; if (completions != null) { + LOG.info("OnVertexStarted with " + completions.size() + " completed source task"); for (TaskAttemptIdentifier attempt : completions) { addCompletedSrcTaskToProcess(attempt); } @@ -387,7 +386,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea for (SrcVertex srcV : srcVerticesByName.values()) { if (srcV.taskCompleted.getCardinality() < srcV.numTask && (srcV.numTask * config.getGroupingFraction() > srcV.taskCompleted.getCardinality() - || srcV.numRecord == 0)) { + || srcV.numRecord == 0)) { return false; } } @@ -402,17 +401,19 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea } } - LOG.info("Start reconfigure, " + LOG.info("Start reconfiguring vertex " + getContext().getVertexName() + ", max parallelism: " + maxParallelism - + ", min-ops-per-worker: " + minOpsPerWorker); + + ", min-ops-per-worker: " + minOpsPerWorker + + ", num partition: " + numPartitions); for (Source src : sourcesByName.values()) { - LOG.info(src.toString(false)); + LOG.info(src.toString()); } long totalOps = 1; for (Source src : sourcesByName.values()) { src.numRecord = src.estimateNumRecord(); if (src.numRecord == 0) { + LOG.info("Set parallelism to 0 because source " + src.name + " has 0 output recorc"); reconfigureWithZeroTask(); return true; } @@ -420,6 +421,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea try { totalOps = LongMath.checkedMultiply(totalOps, src.numRecord); } catch (ArithmeticException e) { + LOG.info("totalOps exceeds " + Long.MAX_VALUE + ", capping to " + Long.MAX_VALUE); totalOps = Long.MAX_VALUE; } } @@ -430,6 +432,7 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea } else { parallelism = (int) ((totalOps + minOpsPerWorker - 1) / minOpsPerWorker); } + LOG.info("Total ops " + totalOps + ", initial parallelism " + parallelism); // determine num chunk for each source by weighted factorization of initial parallelism // final parallelism will be product of all #chunk @@ -450,11 +453,10 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea parallelism *= src.numChunk; } - LOG.info("After reconfigure, "); + LOG.info("After reconfigure, final parallelism " + parallelism); for (Source src : sourcesByName.values()) { - LOG.info(src.toString(false)); + LOG.info(src.toString()); } - LOG.info("Final parallelism: " + parallelism); for (int i = 0; i < numChunksPerSrc.length; i++) { numChunksPerSrc[i] = sourcesByName.get(sourceList.get(i)).numChunk; @@ -516,9 +518,9 @@ class FairCartesianProductVertexManager extends CartesianProductVertexManagerRea CartesianProductCombination combination = new CartesianProductCombination(numChunksPerSrc, src.position); - grouper.init(srcV.numTask * maxParallelism, src.numChunk); - int firstRelevantChunk = grouper.getGroupId(taskId * maxParallelism); - int lastRelevantChunk = grouper.getGroupId(taskId * maxParallelism + maxParallelism - 1); + grouper.init(srcV.numTask * numPartitions, src.numChunk); + int firstRelevantChunk = grouper.getGroupId(taskId * numPartitions); + int lastRelevantChunk = grouper.getGroupId(taskId * numPartitions + numPartitions - 1); for (int chunkId = firstRelevantChunk; chunkId <= lastRelevantChunk; chunkId++) { combination.firstTaskWithFixedChunk(chunkId); do { http://git-wip-us.apache.org/repos/asf/tez/blob/788c1ad7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java index 01d7f0b..d088fd3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java @@ -80,7 +80,7 @@ public class TestFairCartesianProductVertexManager { /** * v0 and v1 are two cartesian product sources */ - private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker, + private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker, int numPartition, int srcParallelismMultiplier) throws Exception { when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3); @@ -88,7 +88,7 @@ public class TestFairCartesianProductVertexManager { CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); builder.setIsPartitioned(false).addSources("v0").addSources("v1") .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minOpsPerWorker) - .setNumPartitionsForFairCase(maxParallelism); + .setNumPartitionsForFairCase(numPartition); vertexManager.initialize(builder.build()); } @@ -218,7 +218,7 @@ public class TestFairCartesianProductVertexManager { @Test(timeout = 5000) public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception { - setupDAGVertexOnly(30, 1, 1); + setupDAGVertexOnly(30, 1, 30, 1); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); @@ -246,7 +246,7 @@ public class TestFairCartesianProductVertexManager { @Test(timeout = 5000) public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception { - setupDAGVertexOnly(100, 10000, 10); + setupDAGVertexOnly(100, 10000, 10, 10); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); @@ -366,7 +366,7 @@ public class TestFairCartesianProductVertexManager { @Test(timeout = 5000) public void testOnVertexStart() throws Exception { - setupDAGVertexOnly(6, 1, 1); + setupDAGVertexOnly(6, 1, 6, 1); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0)); @@ -467,7 +467,7 @@ public class TestFairCartesianProductVertexManager { @Test(timeout = 5000) public void testZeroSrcOutput() throws Exception { - setupDAGVertexOnly(10, 1, 1); + setupDAGVertexOnly(10, 1, 10, 1); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 0));
