This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new ff2f6ab  TEZ-4199. MergeManager::finalMerge should make use of 
compression
ff2f6ab is described below

commit ff2f6ab29bf3e7f14e35f636882a22af128088ee
Author: Mustafa Iman <[email protected]>
AuthorDate: Fri Jul 17 11:34:46 2020 -0500

    TEZ-4199. MergeManager::finalMerge should make use of compression
    
    Signed-off-by: Jonathan Eagles <[email protected]>
    (cherry picked from commit 8acfb03335e10a983e51974107520a34581f1187)
---
 .../library/common/sort/impl/TezMerger.java        |   2 +-
 .../orderedgrouped/DummyCompressionCodec.java      | 131 +++++++++++++++++++++
 .../shuffle/orderedgrouped/TestMergeManager.java   |  89 +++++++++++++-
 3 files changed, 217 insertions(+), 5 deletions(-)

diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 0e18ead..3e04e74 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -536,7 +536,7 @@ public class TezMerger {
         List<Segment> segments, RawComparator comparator,
         Progressable reporter, boolean sortSegments, CompressionCodec codec,
         boolean considerFinalMergeForProgress) {
-      this(conf, fs, segments, comparator, reporter, sortSegments, null,
+      this(conf, fs, segments, comparator, reporter, sortSegments, codec,
           considerFinalMergeForProgress, true);
     }
 
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
new file mode 100644
index 0000000..962a9e0
--- /dev/null
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * A dummy codec. It passes everything to underlying stream
+ */
+public class DummyCompressionCodec implements CompressionCodec {
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws 
IOException {
+    return new DummyCompressionOutputStream(out);
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, 
Compressor compressor) throws IOException {
+    return new DummyCompressionOutputStream(out);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return Compressor.class;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return mock(Compressor.class);
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws 
IOException {
+    return new DummyCompressionInputStream(in);
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor 
decompressor) throws IOException {
+    return new DummyCompressionInputStream(in);
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return Decompressor.class;
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return mock(Decompressor.class);
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return null;
+  }
+
+  class DummyCompressionOutputStream extends CompressionOutputStream {
+
+    protected DummyCompressionOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void finish() throws IOException {
+      //no-op
+    }
+
+    @Override
+    public void resetState() throws IOException {
+      //no-op
+    }
+  }
+
+  class DummyCompressionInputStream extends CompressionInputStream {
+
+    protected DummyCompressionInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+
+    @Override
+    public int read() throws IOException {
+      return in.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return in.read(b, off, len);
+    }
+
+    @Override
+    public void resetState() throws IOException {
+      //no-op
+    }
+  }
+}
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 92d9125..9cffcc7 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -38,8 +37,7 @@ import java.util.UUID;
 
 import com.google.common.collect.Sets;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -257,6 +255,65 @@ public class TestMergeManager {
     assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
   }
 
+  @Test
+  public void testDiskMergeWithCodec() throws Throwable {
+    Configuration conf = new TezConfiguration(defaultConf);
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
IntWritable.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
IntWritable.class.getName());
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 3);
+
+    Path localDir = new Path(workDir, "local");
+    localFs.mkdirs(localDir);
+
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, 
localDir.toString());
+
+    LocalDirAllocator localDirAllocator =
+            new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    InputContext inputContext = 
createMockInputContext(UUID.randomUUID().toString());
+
+    // Create a mock compressor. We will check if it is used.
+    CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
+
+    MergeManager mergeManager =
+            new MergeManager(conf, localFs, localDirAllocator, inputContext, 
null, null, null, null,
+                    mock(ExceptionReporter.class), 2000, dummyCodec, false, 
-1);
+    mergeManager.configureAndStart();
+
+    assertEquals(0, mergeManager.getUsedMemory());
+    assertEquals(0, mergeManager.getCommitMemory());
+
+    InputAttemptIdentifier inputAttemptIdentifier1 = new 
InputAttemptIdentifier(0, 0);
+    InputAttemptIdentifier inputAttemptIdentifier2 = new 
InputAttemptIdentifier(1, 0);
+    InputAttemptIdentifier inputAttemptIdentifier3 = new 
InputAttemptIdentifier(2, 0);
+    InputAttemptIdentifier inputAttemptIdentifier4 = new 
InputAttemptIdentifier(3, 0);
+    byte[] data1 = generateDataBySizeAndGetBytes(conf, 500, 
inputAttemptIdentifier1);
+    byte[] data2 = generateDataBySizeAndGetBytes(conf, 500, 
inputAttemptIdentifier2);
+    byte[] data3 = generateDataBySizeAndGetBytes(conf, 500, 
inputAttemptIdentifier3);
+    byte[] data4 = generateDataBySizeAndGetBytes(conf, 500, 
inputAttemptIdentifier3);
+
+    MapOutput mo1 = mergeManager.reserve(inputAttemptIdentifier1, 
data1.length, data1.length, 0);
+    MapOutput mo2 = mergeManager.reserve(inputAttemptIdentifier2, 
data2.length, data2.length, 0);
+    MapOutput mo3 = mergeManager.reserve(inputAttemptIdentifier3, 
data3.length, data3.length, 0);
+    MapOutput mo4 = mergeManager.reserve(inputAttemptIdentifier4, 
data4.length, data4.length, 0);
+
+    mo1.getDisk().write(data1);
+    mo1.getDisk().flush();
+    mo2.getDisk().write(data2);
+    mo2.getDisk().flush();
+    mo3.getDisk().write(data3);
+    mo3.getDisk().flush();
+    mo4.getDisk().write(data4);
+    mo4.getDisk().flush();
+
+    mo1.commit();
+    mo2.commit();
+    mo3.commit();
+    mo4.commit();
+
+    mergeManager.close(true);
+    verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
+  }
+
   @Test(timeout = 60000l)
   public void testIntermediateMemoryMerge() throws Throwable {
     Configuration conf = new TezConfiguration(defaultConf);
@@ -592,7 +649,31 @@ public class TestMergeManager {
     return data;
   }
 
-  private byte[] generateData(Configuration conf, int numEntries, 
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+  private byte[] generateDataBySizeAndGetBytes(Configuration conf, int rawLen,
+                                               InputAttemptIdentifier 
inputAttemptIdentifier) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer writer =
+            new IFile.Writer(conf, fsdos, IntWritable.class, 
IntWritable.class, null, null, null);
+    int i = 0;
+    while(true) {
+      writer.append(new IntWritable(i), new IntWritable(i));
+      i++;
+      if (writer.getRawLength() > rawLen) {
+        break;
+      }
+    }
+    writer.close();
+    int compressedLength = (int)writer.getCompressedLength();
+    int rawLength = (int)writer.getRawLength();
+    byte[] data = new byte[rawLength];
+    ShuffleUtils.shuffleToMemory(data, new 
ByteArrayInputStream(baos.toByteArray()),
+            rawLength, compressedLength, null, false, 0, LOG, 
inputAttemptIdentifier);
+    return baos.toByteArray();
+  }
+
+  private byte[] generateData(Configuration conf, int numEntries,
+                              InputAttemptIdentifier inputAttemptIdentifier) 
throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
     IFile.Writer writer =

Reply via email to