Repository: tez Updated Branches: refs/heads/master a7f93ae1d -> 5b0f5a0c4
TEZ-3762. When final merge is disabled in unordered case, it should create index file instead of relying on cache (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5b0f5a0c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5b0f5a0c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5b0f5a0c Branch: refs/heads/master Commit: 5b0f5a0c415869c60da45715e2a67adf0405b778 Parents: a7f93ae Author: Zhiyuan Yang <[email protected]> Authored: Mon Jun 26 15:21:46 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon Jun 26 15:21:46 2017 -0700 ---------------------------------------------------------------------- .../library/common/writers/UnorderedPartitionedKVWriter.java | 3 ++- .../apache/tez/runtime/library/output/UnorderedKVOutput.java | 1 + .../runtime/library/output/UnorderedPartitionedKVOutput.java | 1 + .../common/writers/TestUnorderedPartitionedKVWriter.java | 5 +++++ .../library/conf/TestUnorderedPartitionedKVOutputConfig.java | 5 ++--- 5 files changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5b0f5a0c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 6bdb9e4..70c577c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -286,6 +286,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit + ", sizePerBuffer=" + sizePerBuffer + ", skipBuffers=" + skipBuffers + ", pipelinedShuffle=" + pipelinedShuffle + + ", isFinalMergeEnabled=" + isFinalMergeEnabled + ", numPartitions=" + numPartitions + ", reportPartitionStats=" + reportPartitionStats); } @@ -917,7 +918,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit Path outputFilePath = null; Path indexFilePath = null; - if (!pipelinedShuffle) { + if (!pipelinedShuffle && isFinalMergeEnabled) { if (isFinalSpill) { outputFilePath = outputFileHandler.getOutputFileForWrite(spillSize); indexFilePath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate); http://git-wip-us.apache.org/repos/asf/tez/blob/5b0f5a0c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 51521e4..c987024 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -169,6 +169,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); http://git-wip-us.apache.org/repos/asf/tez/blob/5b0f5a0c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index e83f1e9..94312f7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -142,6 +142,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX); confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); http://git-wip-us.apache.org/repos/asf/tez/blob/5b0f5a0c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index bbe0992..71bd240 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -458,6 +458,11 @@ public class TestUnorderedPartitionedKVWriter { outputLargeRecordsCounter.getValue()); if (pipeliningEnabled || !isFinalMergeEnabled) { + // verify spill data files and index file exist + for (int i = 0; i < kvWriter.numSpills.get(); i++) { + assertTrue(localFs.exists(kvWriter.outputFileHandler.getSpillFileForWrite(i, 0))); + assertTrue(localFs.exists(kvWriter.outputFileHandler.getSpillIndexFileForWrite(i, 0))); + } return; } http://git-wip-us.apache.org/repos/asf/tez/blob/5b0f5a0c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java index a9e955e..5e49b51 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java @@ -85,7 +85,7 @@ public class TestUnorderedPartitionedKVOutputConfig { .setAdditionalConfiguration(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true") .setAdditionalConfiguration(TezRuntimeConfiguration - .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "true") + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false") .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, String.valueOf(false)) .setAdditionalConfiguration(additionalConf) @@ -103,9 +103,8 @@ public class TestUnorderedPartitionedKVOutputConfig { // Verify programmatic API usage assertEquals(true, conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false)); - //unorderedpartitioned writer ignores this value. assertEquals(false, conf.getBoolean(TezRuntimeConfiguration - .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false)); + .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true)); assertEquals(1111, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 0)); assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, "")); assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
