This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new ff2f6ab TEZ-4199. MergeManager::finalMerge should make use of
compression
ff2f6ab is described below
commit ff2f6ab29bf3e7f14e35f636882a22af128088ee
Author: Mustafa Iman <[email protected]>
AuthorDate: Fri Jul 17 11:34:46 2020 -0500
TEZ-4199. MergeManager::finalMerge should make use of compression
Signed-off-by: Jonathan Eagles <[email protected]>
(cherry picked from commit 8acfb03335e10a983e51974107520a34581f1187)
---
.../library/common/sort/impl/TezMerger.java | 2 +-
.../orderedgrouped/DummyCompressionCodec.java | 131 +++++++++++++++++++++
.../shuffle/orderedgrouped/TestMergeManager.java | 89 +++++++++++++-
3 files changed, 217 insertions(+), 5 deletions(-)
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 0e18ead..3e04e74 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -536,7 +536,7 @@ public class TezMerger {
List<Segment> segments, RawComparator comparator,
Progressable reporter, boolean sortSegments, CompressionCodec codec,
boolean considerFinalMergeForProgress) {
- this(conf, fs, segments, comparator, reporter, sortSegments, null,
+ this(conf, fs, segments, comparator, reporter, sortSegments, codec,
considerFinalMergeForProgress, true);
}
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
new file mode 100644
index 0000000..962a9e0
--- /dev/null
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * A dummy codec. It passes everything to underlying stream
+ */
+public class DummyCompressionCodec implements CompressionCodec {
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out) throws
IOException {
+ return new DummyCompressionOutputStream(out);
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
+ return new DummyCompressionOutputStream(out);
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return Compressor.class;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ return mock(Compressor.class);
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in) throws
IOException {
+ return new DummyCompressionInputStream(in);
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in, Decompressor
decompressor) throws IOException {
+ return new DummyCompressionInputStream(in);
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return Decompressor.class;
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return mock(Decompressor.class);
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return null;
+ }
+
+ class DummyCompressionOutputStream extends CompressionOutputStream {
+
+ protected DummyCompressionOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ //no-op
+ }
+
+ @Override
+ public void resetState() throws IOException {
+ //no-op
+ }
+ }
+
+ class DummyCompressionInputStream extends CompressionInputStream {
+
+ protected DummyCompressionInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void resetState() throws IOException {
+ //no-op
+ }
+ }
+}
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 92d9125..9cffcc7 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -38,8 +37,7 @@ import java.util.UUID;
import com.google.common.collect.Sets;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -257,6 +255,65 @@ public class TestMergeManager {
assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
}
+ @Test
+ public void testDiskMergeWithCodec() throws Throwable {
+ Configuration conf = new TezConfiguration(defaultConf);
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
IntWritable.class.getName());
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
IntWritable.class.getName());
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 3);
+
+ Path localDir = new Path(workDir, "local");
+ localFs.mkdirs(localDir);
+
+ conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
localDir.toString());
+
+ LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ InputContext inputContext =
createMockInputContext(UUID.randomUUID().toString());
+
+ // Create a mock compressor. We will check if it is used.
+ CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
+
+ MergeManager mergeManager =
+ new MergeManager(conf, localFs, localDirAllocator, inputContext,
null, null, null, null,
+ mock(ExceptionReporter.class), 2000, dummyCodec, false,
-1);
+ mergeManager.configureAndStart();
+
+ assertEquals(0, mergeManager.getUsedMemory());
+ assertEquals(0, mergeManager.getCommitMemory());
+
+ InputAttemptIdentifier inputAttemptIdentifier1 = new
InputAttemptIdentifier(0, 0);
+ InputAttemptIdentifier inputAttemptIdentifier2 = new
InputAttemptIdentifier(1, 0);
+ InputAttemptIdentifier inputAttemptIdentifier3 = new
InputAttemptIdentifier(2, 0);
+ InputAttemptIdentifier inputAttemptIdentifier4 = new
InputAttemptIdentifier(3, 0);
+ byte[] data1 = generateDataBySizeAndGetBytes(conf, 500,
inputAttemptIdentifier1);
+ byte[] data2 = generateDataBySizeAndGetBytes(conf, 500,
inputAttemptIdentifier2);
+ byte[] data3 = generateDataBySizeAndGetBytes(conf, 500,
inputAttemptIdentifier3);
+ byte[] data4 = generateDataBySizeAndGetBytes(conf, 500,
inputAttemptIdentifier3);
+
+ MapOutput mo1 = mergeManager.reserve(inputAttemptIdentifier1,
data1.length, data1.length, 0);
+ MapOutput mo2 = mergeManager.reserve(inputAttemptIdentifier2,
data2.length, data2.length, 0);
+ MapOutput mo3 = mergeManager.reserve(inputAttemptIdentifier3,
data3.length, data3.length, 0);
+ MapOutput mo4 = mergeManager.reserve(inputAttemptIdentifier4,
data4.length, data4.length, 0);
+
+ mo1.getDisk().write(data1);
+ mo1.getDisk().flush();
+ mo2.getDisk().write(data2);
+ mo2.getDisk().flush();
+ mo3.getDisk().write(data3);
+ mo3.getDisk().flush();
+ mo4.getDisk().write(data4);
+ mo4.getDisk().flush();
+
+ mo1.commit();
+ mo2.commit();
+ mo3.commit();
+ mo4.commit();
+
+ mergeManager.close(true);
+ verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
+ }
+
@Test(timeout = 60000l)
public void testIntermediateMemoryMerge() throws Throwable {
Configuration conf = new TezConfiguration(defaultConf);
@@ -592,7 +649,31 @@ public class TestMergeManager {
return data;
}
- private byte[] generateData(Configuration conf, int numEntries,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+ private byte[] generateDataBySizeAndGetBytes(Configuration conf, int rawLen,
+ InputAttemptIdentifier
inputAttemptIdentifier) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+ IFile.Writer writer =
+ new IFile.Writer(conf, fsdos, IntWritable.class,
IntWritable.class, null, null, null);
+ int i = 0;
+ while(true) {
+ writer.append(new IntWritable(i), new IntWritable(i));
+ i++;
+ if (writer.getRawLength() > rawLen) {
+ break;
+ }
+ }
+ writer.close();
+ int compressedLength = (int)writer.getCompressedLength();
+ int rawLength = (int)writer.getRawLength();
+ byte[] data = new byte[rawLength];
+ ShuffleUtils.shuffleToMemory(data, new
ByteArrayInputStream(baos.toByteArray()),
+ rawLength, compressedLength, null, false, 0, LOG,
inputAttemptIdentifier);
+ return baos.toByteArray();
+ }
+
+ private byte[] generateData(Configuration conf, int numEntries,
+ InputAttemptIdentifier inputAttemptIdentifier)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
IFile.Writer writer =