Repository: tez
Updated Branches:
  refs/heads/master 9cf25d142 -> da4098b9d


TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing 
(jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da4098b9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da4098b9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da4098b9

Branch: refs/heads/master
Commit: da4098b9d6f72e6d4aacc1623622a0875408d2ba
Parents: 9cf25d1
Author: Jonathan Eagles <jeag...@yahoo-inc.com>
Authored: Wed Sep 21 10:54:47 2016 -0500
Committer: Jonathan Eagles <jeag...@yahoo-inc.com>
Committed: Wed Sep 21 10:54:47 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/TezCommonUtils.java   | 38 +++++++++++++++++---
 .../apache/tez/dag/api/DagTypeConverters.java   |  5 +--
 .../tez/dag/api/TestDagTypeConverters.java      |  2 +-
 .../apache/tez/dag/history/utils/DAGUtils.java  | 16 +++++----
 .../vertexmanager/ShuffleVertexManagerBase.java |  6 +++-
 .../library/common/shuffle/ShuffleUtils.java    | 21 ++++++-----
 .../impl/ShuffleInputEventHandlerImpl.java      |  5 ++-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  6 ++--
 .../common/sort/impl/PipelinedSorter.java       |  8 +++--
 .../common/sort/impl/dflt/DefaultSorter.java    |  6 +++-
 .../writers/UnorderedPartitionedKVWriter.java   | 11 +++---
 .../output/OrderedPartitionedKVOutput.java      |  8 +++--
 .../library/output/UnorderedKVOutput.java       |  3 +-
 .../output/UnorderedPartitionedKVOutput.java    |  3 +-
 .../common/shuffle/TestShuffleUtils.java        |  6 ++--
 16 files changed, 103 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c7f540b..3a55ec7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing
   TEZ-3434. Add unit tests for flushing of recovery events.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3404. Move blocking call for YARN Timeline domain creation from client 
side to AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java 
b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index e4cf028..afdce39 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
 import org.apache.commons.io.IOUtils;
@@ -345,13 +346,35 @@ public class TezCommonUtils {
     }
   }
 
+  private static final boolean NO_WRAP = true;
+
+  @Private
+  public static Deflater newBestCompressionDeflater() {
+    return new Deflater(Deflater.BEST_COMPRESSION, NO_WRAP);
+  }
+
+  @Private
+  public static Deflater newBestSpeedDeflater() {
+    return new Deflater(Deflater.BEST_SPEED, NO_WRAP);
+  }
+
+  @Private
+  public static Inflater newInflater() {
+    return new Inflater(NO_WRAP);
+  }
+
   @Private
   public static ByteString compressByteArrayToByteString(byte[] inBytes) 
throws IOException {
+    return compressByteArrayToByteString(inBytes, 
newBestCompressionDeflater());
+  }
+
+  @Private
+  public static ByteString compressByteArrayToByteString(byte[] inBytes, 
Deflater deflater) throws IOException {
+    deflater.reset();
     ByteString.Output os = ByteString.newOutput();
     DeflaterOutputStream compressOs = null;
     try {
-      compressOs = new DeflaterOutputStream(os, new Deflater(
-          Deflater.BEST_COMPRESSION));
+      compressOs = new DeflaterOutputStream(os, deflater);
       compressOs.write(inBytes);
       compressOs.finish();
       ByteString byteString = os.toByteString();
@@ -365,9 +388,14 @@ public class TezCommonUtils {
 
   @Private
   public static byte[] decompressByteStringToByteArray(ByteString byteString) 
throws IOException {
-    InflaterInputStream in = new InflaterInputStream(byteString.newInput());
-    byte[] bytes = IOUtils.toByteArray(in);
-    return bytes;
+    return decompressByteStringToByteArray(byteString, newInflater());
+  }
+
+  @Private
+  public static byte[] decompressByteStringToByteArray(ByteString byteString, 
Inflater inflater) throws IOException {
+    inflater.reset();
+    return IOUtils.toByteArray(new InflaterInputStream(byteString.newInput(), 
inflater));
+
   }
 
   public static String getCredentialsInfo(Credentials credentials, String 
identifier) {

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index cefe026..c5d9c0b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.zip.Inflater;
 import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
@@ -369,12 +370,12 @@ public class DagTypeConverters {
     return builder.build();
   }
 
-  public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) 
{
+  public static String getHistoryTextFromProto(TezEntityDescriptorProto proto, 
Inflater inflater) {
     if (!proto.hasHistoryText()) {
       return null;
     }
     try {
-      return new 
String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()),
+      return new 
String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText(), 
inflater),
           "UTF-8");
     } catch (IOException e) {
       throw new TezUncheckedException(e);

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index dc04f2d..265fce9 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -72,7 +72,7 @@ public class TestDagTypeConverters {
     Assert.assertNull(inputDescriptor.getHistoryText());
 
     // Check history text value
-    String actualHistoryText = 
DagTypeConverters.getHistoryTextFromProto(proto);
+    String actualHistoryText = 
DagTypeConverters.getHistoryTextFromProto(proto, TezCommonUtils.newInflater());
     Assert.assertEquals(historytext, actualHistoryText);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java 
b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index d8d2407..dce9e52 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -28,9 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.zip.Inflater;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
@@ -186,7 +188,7 @@ public class DAGUtils {
   }
 
   public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) 
throws IOException {
-
+    final Inflater inflater = TezCommonUtils.newInflater();
     final String VERSION_KEY = "version";
     final int version = 2;
     Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
@@ -208,7 +210,7 @@ public class DAGUtils {
         if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
           vertexMap.put(USER_PAYLOAD_AS_TEXT,
               DagTypeConverters.getHistoryTextFromProto(
-                  vertexPlan.getProcessorDescriptor()));
+                  vertexPlan.getProcessorDescriptor(), inflater));
         }
       }
 
@@ -232,7 +234,7 @@ public class DAGUtils {
         if (input.getIODescriptor().hasHistoryText()) {
           inputMap.put(USER_PAYLOAD_AS_TEXT,
               DagTypeConverters.getHistoryTextFromProto(
-                  input.getIODescriptor()));
+                  input.getIODescriptor(), inflater));
         }
         inputsList.add(inputMap);
       }
@@ -250,7 +252,7 @@ public class DAGUtils {
         if (output.getIODescriptor().hasHistoryText()) {
           outputMap.put(USER_PAYLOAD_AS_TEXT,
               DagTypeConverters.getHistoryTextFromProto(
-                  output.getIODescriptor()));
+                  output.getIODescriptor(), inflater));
         }
         outputsList.add(outputMap);
       }
@@ -282,12 +284,12 @@ public class DAGUtils {
       if (edgePlan.getEdgeSource().hasHistoryText()) {
         edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
             DagTypeConverters.getHistoryTextFromProto(
-                edgePlan.getEdgeSource()));
+                edgePlan.getEdgeSource(), inflater));
       }
       if (edgePlan.getEdgeDestination().hasHistoryText()) {
         edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
             DagTypeConverters.getHistoryTextFromProto(
-                edgePlan.getEdgeDestination()));
+                edgePlan.getEdgeDestination(), inflater));
       } // TEZ-2286 this is missing edgemanager descriptor for custom edge
       edgesList.add(edgeMap);
     }
@@ -319,7 +321,7 @@ public class DAGUtils {
             if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
               edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
                   DagTypeConverters.getHistoryTextFromProto(
-                      edgeMergedInputInfo.getMergedInput()));
+                      edgeMergedInputInfo.getMergedInput(), inflater));
             }
           }
           edgeMergedInputs.add(edgeMergedInput);

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 9b88cfd..dc6cd3b 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -66,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Inflater;
 
 /**
  * Starts scheduling tasks when number of completed source tasks crosses
@@ -104,6 +105,8 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
   long[] stats; //approximate amount of data to be fetched
   Configuration conf;
   ShuffleVertexManagerBaseConfig config;
+  // requires synchronized access
+  final Inflater inflater;
 
   /**
    * Used when automatic parallelism is enabled
@@ -198,6 +201,7 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
 
   public ShuffleVertexManagerBase(VertexManagerPluginContext context) {
     super(context);
+    inflater = TezCommonUtils.newInflater();
   }
 
   @Override
@@ -336,7 +340,7 @@ abstract class ShuffleVertexManagerBase extends 
VertexManagerPlugin {
           RoaringBitmap partitionStats = new RoaringBitmap();
           ByteString compressedPartitionStats = proto.getPartitionStats();
           byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
-              compressedPartitionStats);
+              compressedPartitionStats, inflater);
           NonSyncByteArrayInputStream bin = new 
NonSyncByteArrayInputStream(rawData);
           partitionStats.deserialize(new DataInputStream(bin));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index d74e447..aa07233 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -29,6 +29,7 @@ import java.text.DecimalFormat;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
+import java.util.zip.Deflater;
 
 import javax.annotation.Nullable;
 import javax.crypto.SecretKey;
@@ -278,12 +279,13 @@ public class ShuffleUtils {
    * @param finalMergeEnabled
    * @param isLastEvent
    * @param pathComponent
+   * @param deflater
    * @return ByteBuffer
    * @throws IOException
    */
   static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails,
       int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext 
context,
-      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String 
pathComponent)
+      int spillId, boolean finalMergeEnabled, boolean isLastEvent, String 
pathComponent, Deflater deflater)
       throws IOException {
     DataMovementEventPayloadProto.Builder payloadBuilder = 
DataMovementEventPayloadProto
         .newBuilder();
@@ -302,7 +304,7 @@ public class ShuffleUtils {
       if (emptyPartitions > 0) {
         ByteString emptyPartitionsBytesString =
             TezCommonUtils.compressByteArrayToByteString(
-                TezUtilsInternal.toByteArray(emptyPartitionDetails));
+                TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
         payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
         LOG.info("EmptyPartition bitsetSize=" + 
emptyPartitionDetails.cardinality() + ", numOutputs="
             + numPhysicalOutputs + ", emptyPartitions=" + emptyPartitions
@@ -339,13 +341,14 @@ public class ShuffleUtils {
    * @param context
    * @param generateVmEvent whether to generate a vm event or not
    * @param isCompositeEvent whether to generate a CompositeDataMovementEvent 
or a DataMovementEvent
+   * @param deflater
    * @throws IOException
    */
   public static void generateEventsForNonStartedOutput(List<Event> eventList,
                                                        int numPhysicalOutputs,
                                                        OutputContext context,
                                                        boolean generateVmEvent,
-                                                       boolean 
isCompositeEvent) throws
+                                                       boolean 
isCompositeEvent, Deflater deflater) throws
       IOException {
     DataMovementEventPayloadProto.Builder payloadBuilder = 
DataMovementEventPayloadProto
         .newBuilder();
@@ -369,7 +372,7 @@ public class ShuffleUtils {
     emptyPartitionDetails.set(0, numPhysicalOutputs, true);
     ByteString emptyPartitionsBytesString =
         TezCommonUtils.compressByteArrayToByteString(
-            TezUtilsInternal.toByteArray(emptyPartitionDetails));
+            TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater);
     payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString);
     payloadBuilder.setRunDuration(0);
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
@@ -403,7 +406,7 @@ public class ShuffleUtils {
   public static void generateEventOnSpill(List<Event> eventList, boolean 
finalMergeEnabled,
       boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord 
spillRecord,
       int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String 
pathComponent,
-      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats)
+      @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, 
Deflater deflater)
       throws IOException {
     Preconditions.checkArgument(eventList != null, "EventList can't be null");
 
@@ -421,11 +424,11 @@ public class ShuffleUtils {
 
     ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, 
numPhysicalOutputs,
         spillRecord, context, spillId,
-        finalMergeEnabled, isLastEvent, pathComponent);
+        finalMergeEnabled, isLastEvent, pathComponent, deflater);
 
     if (finalMergeEnabled || isLastEvent) {
       VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats,
-          reportDetailedPartitionStats);
+          reportDetailedPartitionStats, deflater);
       eventList.add(vmEvent);
     }
 
@@ -435,7 +438,7 @@ public class ShuffleUtils {
   }
 
   public static VertexManagerEvent generateVMEvent(OutputContext context,
-      long[] sizePerPartition, boolean reportDetailedPartitionStats)
+      long[] sizePerPartition, boolean reportDetailedPartitionStats, Deflater 
deflater)
           throws IOException {
     ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
         ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
@@ -459,7 +462,7 @@ public class ShuffleUtils {
         DataOutputBuffer dout = new DataOutputBuffer();
         stats.serialize(dout);
         ByteString partitionStatsBytes =
-            TezCommonUtils.compressByteArrayToByteString(dout.getData());
+            TezCommonUtils.compressByteArrayToByteString(dout.getData(), 
deflater);
         vmBuilder.setPartitionStats(partitionStatsBytes);
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
index adc3432..7d9eacf 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Inflater;
 
 import com.google.protobuf.ByteString;
 
@@ -59,6 +60,7 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
   private final int ifileReadAheadLength;
   private final boolean useSharedInputs;
   private final InputContext inputContext;
+  private final Inflater inflater;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
   private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -78,6 +80,7 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
     // this currently relies on a user to enable the flag
     // expand on idea based on vertex parallelism and num inputs
     this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
+    this.inflater = TezCommonUtils.newInflater();
   }
 
   @Override
@@ -131,7 +134,7 @@ public class ShuffleInputEventHandlerImpl implements 
ShuffleEventHandler {
 
     if (shufflePayload.hasEmptyPartitions()) {
       byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload
-          .getEmptyPartitions());
+          .getEmptyPartitions(), inflater);
       BitSet emptyPartionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
       if (emptyPartionsBitSet.get(srcIndex)) {
         InputAttemptIdentifier srcAttemptIdentifier =

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 7991485..f6f6da1 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Inflater;
 
 import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
@@ -47,7 +48,7 @@ public class ShuffleInputEventHandlerOrderedGrouped 
implements ShuffleEventHandl
 
   private final ShuffleScheduler scheduler;
   private final InputContext inputContext;
-
+  private final Inflater inflater;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
   private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -58,6 +59,7 @@ public class ShuffleInputEventHandlerOrderedGrouped 
implements ShuffleEventHandl
                                                 ShuffleScheduler scheduler) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
+    this.inflater = TezCommonUtils.newInflater();
   }
 
   @Override
@@ -110,7 +112,7 @@ public class ShuffleInputEventHandlerOrderedGrouped 
implements ShuffleEventHandl
 
     if (shufflePayload.hasEmptyPartitions()) {
       try {
-        byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
+        byte[] emptyPartitions = 
TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(),
 inflater);
         BitSet emptyPartitionsBitSet = 
TezUtilsInternal.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
           if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 609e9ff..9b3aadb 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.zip.Deflater;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -52,6 +53,7 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -113,6 +115,7 @@ public class PipelinedSorter extends ExternalSorter {
   private int bufferIndex = -1;
   private final int MIN_BLOCK_SIZE;
   private final boolean lazyAllocateMem;
+  private final Deflater deflater;
 
   // TODO Set additional countesr - total bytes written, spills etc.
 
@@ -224,6 +227,7 @@ public class PipelinedSorter extends ExternalSorter {
     valSerializer.open(span.out);
     keySerializer.open(span.out);
     minSpillsForCombine = 
this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+    deflater = TezCommonUtils.newBestCompressionDeflater();
   }
 
   ByteBuffer allocateSpace() {
@@ -350,7 +354,7 @@ public class PipelinedSorter extends ExternalSorter {
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false,
         outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1),
         partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
-        reportDetailedPartitionStats());
+        reportDetailedPartitionStats(), deflater);
     outputContext.sendEvents(events);
     LOG.info(outputContext.getDestinationVertexName() +
         ": Added spill event for spill (final update=false), spillId=" + 
(numSpills - 1));
@@ -673,7 +677,7 @@ public class PipelinedSorter extends ExternalSorter {
           ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
               outputContext, i, indexCacheList.get(i), partitions,
               sendEmptyPartitionDetails, pathComponent, partitionStats,
-              reportDetailedPartitionStats());
+              reportDetailedPartitionStats(), deflater);
           LOG.info(outputContext.getDestinationVertexName() + ": Adding spill 
event for spill (final update=" + isLastEvent + "), spillId=" + i);
         }
         outputContext.sendEvents(events);

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 873d8e1..b5c3071 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Deflater;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -41,6 +42,7 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.io.NonSyncDataOutputStream;
 import org.apache.tez.runtime.api.Event;
@@ -112,6 +114,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
   final BlockingBuffer bb = new BlockingBuffer();
   volatile boolean spillThreadRunning = false;
   final SpillThread spillThread = new SpillThread();
+  private final Deflater deflater;
 
   final ArrayList<TezSpillRecord> indexCacheList =
     new ArrayList<TezSpillRecord>();
@@ -127,6 +130,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
   public DefaultSorter(OutputContext outputContext, Configuration conf, int 
numOutputs,
       long initialMemoryAvailable) throws IOException {
     super(outputContext, conf, numOutputs, initialMemoryAvailable);
+    deflater = TezCommonUtils.newBestCompressionDeflater();
     // sanity checks
     final float spillper = this.conf.getFloat(
         TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -1133,7 +1137,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
     String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index);
     ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), 
isLastEvent,
         outputContext, index, spillRecord, partitions, 
sendEmptyPartitionDetails, pathComponent,
-        partitionStats, reportDetailedPartitionStats());
+        partitionStats, reportDetailedPartitionStats(), deflater);
 
     LOG.info(outputContext.getDestinationVertexName() + ": " +
         "Adding spill event for spill (final update=" + isLastEvent + "), 
spillId=" + index);

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index eff29a5..0f38a29 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Deflater;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -116,6 +117,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
   // uncompressed size for each partition
   private final long[] sizePerPartition;
   private volatile long spilledSize = 0;
+  private final Deflater deflater;
 
   /**
    * Represents final number of records written (spills are not counted)
@@ -158,6 +160,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     super(outputContext, conf, numOutputs);
     Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory 
should be >= 0 bytes");
 
+    this.deflater = TezCommonUtils.newBestCompressionDeflater();
     this.destNameTrimmed = 
TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName());
     //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might 
not add much value in
     // this case.  Add it later if needed.
@@ -594,7 +597,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
 
   private Event generateVMEvent() throws IOException {
     return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition,
-        this.reportDetailedPartitionStats());
+        this.reportDetailedPartitionStats(), deflater);
   }
 
   private Event generateDMEvent() throws IOException {
@@ -614,7 +617,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     if (emptyPartitions.cardinality() != 0) {
       // Empty partitions exist
       ByteString emptyPartitionsByteString =
-          
TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions));
+          
TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions),
 deflater);
       payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
     }
 
@@ -658,7 +661,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         List<Event> eventList = Lists.newLinkedList();
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
             reportPartitionStats() ? new long[numPartitions] : null,
-                reportDetailedPartitionStats()));
+                reportDetailedPartitionStats(), deflater));
         //Send final event with all empty partitions and null path component.
         BitSet emptyPartitions = new BitSet(numPartitions);
         emptyPartitions.flip(0, numPartitions);
@@ -979,7 +982,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       String pathComponent = (outputContext.getUniqueIdentifier() + "_" + 
spillNumber);
       if (isFinalUpdate) {
         eventList.add(ShuffleUtils.generateVMEvent(outputContext,
-            sizePerPartition, reportDetailedPartitionStats()));
+            sizePerPartition, reportDetailedPartitionStats(), deflater));
       }
       Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
           pathComponent, emptyPartitions);

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 9a3d778..5f6a304 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -69,6 +71,7 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
   private long startTime;
   private long endTime;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
+  private final Deflater deflater;
 
   @VisibleForTesting
   boolean pipelinedShuffle;
@@ -78,6 +81,7 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
 
   public OrderedPartitionedKVOutput(OutputContext outputContext, int 
numPhysicalOutputs) {
     super(outputContext, numPhysicalOutputs);
+    deflater = TezCommonUtils.newBestCompressionDeflater();
   }
 
   @Override
@@ -200,14 +204,14 @@ public class OrderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, 
isLastEvent,
           getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), 
conf),
           getNumPhysicalOutputs(), sendEmptyPartitionDetails, 
getContext().getUniqueIdentifier(),
-          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats());
+          sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), 
deflater);
     }
     return eventList;
   }
 
   private List<Event> generateEmptyEvents() throws IOException {
     List<Event> eventList = Lists.newLinkedList();
-    ShuffleUtils.generateEventsForNonStartedOutput(eventList, 
getNumPhysicalOutputs(), getContext(), true, true);
+    ShuffleUtils.generateEventsForNonStartedOutput(eventList, 
getNumPhysicalOutputs(), getContext(), true, true, deflater);
     return eventList;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 4f74f7d..cc7b27c 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -133,7 +134,7 @@ public class UnorderedKVOutput extends 
AbstractLogicalOutput {
       returnEvents = new LinkedList<Event>();
       ShuffleUtils
           .generateEventsForNonStartedOutput(returnEvents, 
getNumPhysicalOutputs(), getContext(),
-              false, false);
+              false, false, TezCommonUtils.newBestCompressionDeflater());
     }
 
     // This works for non-started outputs since new counters will be created 
with an initial value of 0

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index c4b3b22..3d16181 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -110,7 +111,7 @@ public class UnorderedPartitionedKVOutput extends 
AbstractLogicalOutput {
       returnEvents = new LinkedList<Event>();
       ShuffleUtils
           .generateEventsForNonStartedOutput(returnEvents, 
getNumPhysicalOutputs(), getContext(),
-              false, true);
+              false, true, TezCommonUtils.newBestCompressionDeflater());
     }
 
     // This works for non-started outputs since new counters will be created 
with an initial value of 0

http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4233f5d..496468b 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -163,7 +163,7 @@ public class TestShuffleUtils {
     String pathComponent = "/attempt_x_y_0/file.out";
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, 
TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 1);
     Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
@@ -202,7 +202,7 @@ public class TestShuffleUtils {
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, 
TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
@@ -243,7 +243,7 @@ public class TestShuffleUtils {
     //normal code path where we do final merge all the time
     ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent,
         outputContext, spillId, new TezSpillRecord(indexFile, conf),
-            physicalOutputs, true, pathComponent, null, false);
+            physicalOutputs, true, pathComponent, null, false, 
TezCommonUtils.newBestCompressionDeflater());
 
     Assert.assertTrue(events.size() == 2); //one for VM
     Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);

Reply via email to