Repository: tez Updated Branches: refs/heads/master 9cf25d142 -> da4098b9d
TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/da4098b9 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/da4098b9 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/da4098b9 Branch: refs/heads/master Commit: da4098b9d6f72e6d4aacc1623622a0875408d2ba Parents: 9cf25d1 Author: Jonathan Eagles <[email protected]> Authored: Wed Sep 21 10:54:47 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Sep 21 10:54:47 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/TezCommonUtils.java | 38 +++++++++++++++++--- .../apache/tez/dag/api/DagTypeConverters.java | 5 +-- .../tez/dag/api/TestDagTypeConverters.java | 2 +- .../apache/tez/dag/history/utils/DAGUtils.java | 16 +++++---- .../vertexmanager/ShuffleVertexManagerBase.java | 6 +++- .../library/common/shuffle/ShuffleUtils.java | 21 ++++++----- .../impl/ShuffleInputEventHandlerImpl.java | 5 ++- .../ShuffleInputEventHandlerOrderedGrouped.java | 6 ++-- .../common/sort/impl/PipelinedSorter.java | 8 +++-- .../common/sort/impl/dflt/DefaultSorter.java | 6 +++- .../writers/UnorderedPartitionedKVWriter.java | 11 +++--- .../output/OrderedPartitionedKVOutput.java | 8 +++-- .../library/output/UnorderedKVOutput.java | 3 +- .../output/UnorderedPartitionedKVOutput.java | 3 +- .../common/shuffle/TestShuffleUtils.java | 6 ++-- 16 files changed, 103 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c7f540b..3a55ec7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3163. Reuse and tune Inflaters and Deflaters to speed DME processing TEZ-3434. Add unit tests for flushing of recovery events. TEZ-3317. Speculative execution starts too early due to 0 progress. TEZ-3404. Move blocking call for YARN Timeline domain creation from client side to AM. http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index e4cf028..afdce39 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.StringTokenizer; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import org.apache.commons.io.IOUtils; @@ -345,13 +346,35 @@ public class TezCommonUtils { } } + private static final boolean NO_WRAP = true; + + @Private + public static Deflater newBestCompressionDeflater() { + return new Deflater(Deflater.BEST_COMPRESSION, NO_WRAP); + } + + @Private + public static Deflater newBestSpeedDeflater() { + return new Deflater(Deflater.BEST_SPEED, NO_WRAP); + } + + @Private + public static Inflater newInflater() { + return new Inflater(NO_WRAP); + } + @Private public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException { + return compressByteArrayToByteString(inBytes, newBestCompressionDeflater()); + } + + @Private + public static ByteString compressByteArrayToByteString(byte[] inBytes, Deflater deflater) throws IOException { + deflater.reset(); ByteString.Output os = ByteString.newOutput(); DeflaterOutputStream compressOs = null; try { - compressOs = new DeflaterOutputStream(os, new Deflater( - Deflater.BEST_COMPRESSION)); + compressOs = new DeflaterOutputStream(os, deflater); compressOs.write(inBytes); compressOs.finish(); ByteString byteString = os.toByteString(); @@ -365,9 +388,14 @@ public class TezCommonUtils { @Private public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException { - InflaterInputStream in = new InflaterInputStream(byteString.newInput()); - byte[] bytes = IOUtils.toByteArray(in); - return bytes; + return decompressByteStringToByteArray(byteString, newInflater()); + } + + @Private + public static byte[] decompressByteStringToByteArray(ByteString byteString, Inflater inflater) throws IOException { + inflater.reset(); + return IOUtils.toByteArray(new InflaterInputStream(byteString.newInput(), inflater)); + } public static String getCredentialsInfo(Credentials credentials, String identifier) { http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index cefe026..c5d9c0b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.zip.Inflater; import java.util.Map.Entry; import javax.annotation.Nullable; @@ -369,12 +370,12 @@ public class DagTypeConverters { return builder.build(); } - public static String getHistoryTextFromProto(TezEntityDescriptorProto proto) { + public static String getHistoryTextFromProto(TezEntityDescriptorProto proto, Inflater inflater) { if (!proto.hasHistoryText()) { return null; } try { - return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()), + return new String(TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText(), inflater), "UTF-8"); } catch (IOException e) { throw new TezUncheckedException(e); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index dc04f2d..265fce9 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -72,7 +72,7 @@ public class TestDagTypeConverters { Assert.assertNull(inputDescriptor.getHistoryText()); // Check history text value - String actualHistoryText = DagTypeConverters.getHistoryTextFromProto(proto); + String actualHistoryText = DagTypeConverters.getHistoryTextFromProto(proto, TezCommonUtils.newInflater()); Assert.assertEquals(historytext, actualHistoryText); } http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index d8d2407..dce9e52 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -28,9 +28,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.ATSConstants; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; @@ -186,7 +188,7 @@ public class DAGUtils { } public static Map<String,Object> convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException { - + final Inflater inflater = TezCommonUtils.newInflater(); final String VERSION_KEY = "version"; final int version = 2; Map<String,Object> dagMap = new LinkedHashMap<String, Object>(); @@ -208,7 +210,7 @@ public class DAGUtils { if (vertexPlan.getProcessorDescriptor().hasHistoryText()) { vertexMap.put(USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - vertexPlan.getProcessorDescriptor())); + vertexPlan.getProcessorDescriptor(), inflater)); } } @@ -232,7 +234,7 @@ public class DAGUtils { if (input.getIODescriptor().hasHistoryText()) { inputMap.put(USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - input.getIODescriptor())); + input.getIODescriptor(), inflater)); } inputsList.add(inputMap); } @@ -250,7 +252,7 @@ public class DAGUtils { if (output.getIODescriptor().hasHistoryText()) { outputMap.put(USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - output.getIODescriptor())); + output.getIODescriptor(), inflater)); } outputsList.add(outputMap); } @@ -282,12 +284,12 @@ public class DAGUtils { if (edgePlan.getEdgeSource().hasHistoryText()) { edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - edgePlan.getEdgeSource())); + edgePlan.getEdgeSource(), inflater)); } if (edgePlan.getEdgeDestination().hasHistoryText()) { edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - edgePlan.getEdgeDestination())); + edgePlan.getEdgeDestination(), inflater)); } // TEZ-2286 this is missing edgemanager descriptor for custom edge edgesList.add(edgeMap); } @@ -319,7 +321,7 @@ public class DAGUtils { if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) { edgeMergedInput.put(USER_PAYLOAD_AS_TEXT, DagTypeConverters.getHistoryTextFromProto( - edgeMergedInputInfo.getMergedInput())); + edgeMergedInputInfo.getMergedInput(), inflater)); } } edgeMergedInputs.add(edgeMergedInput); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java index 9b88cfd..dc6cd3b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java @@ -66,6 +66,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Inflater; /** * Starts scheduling tasks when number of completed source tasks crosses @@ -104,6 +105,8 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { long[] stats; //approximate amount of data to be fetched Configuration conf; ShuffleVertexManagerBaseConfig config; + // requires synchronized access + final Inflater inflater; /** * Used when automatic parallelism is enabled @@ -198,6 +201,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { public ShuffleVertexManagerBase(VertexManagerPluginContext context) { super(context); + inflater = TezCommonUtils.newInflater(); } @Override @@ -336,7 +340,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { RoaringBitmap partitionStats = new RoaringBitmap(); ByteString compressedPartitionStats = proto.getPartitionStats(); byte[] rawData = TezCommonUtils.decompressByteStringToByteArray( - compressedPartitionStats); + compressedPartitionStats, inflater); NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData); partitionStats.deserialize(new DataInputStream(bin)); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index d74e447..aa07233 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -29,6 +29,7 @@ import java.text.DecimalFormat; import java.util.BitSet; import java.util.Collection; import java.util.List; +import java.util.zip.Deflater; import javax.annotation.Nullable; import javax.crypto.SecretKey; @@ -278,12 +279,13 @@ public class ShuffleUtils { * @param finalMergeEnabled * @param isLastEvent * @param pathComponent + * @param deflater * @return ByteBuffer * @throws IOException */ static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, - int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent) + int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Deflater deflater) throws IOException { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@ -302,7 +304,7 @@ public class ShuffleUtils { if (emptyPartitions > 0) { ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString( - TezUtilsInternal.toByteArray(emptyPartitionDetails)); + TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater); payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); LOG.info("EmptyPartition bitsetSize=" + emptyPartitionDetails.cardinality() + ", numOutputs=" + numPhysicalOutputs + ", emptyPartitions=" + emptyPartitions @@ -339,13 +341,14 @@ public class ShuffleUtils { * @param context * @param generateVmEvent whether to generate a vm event or not * @param isCompositeEvent whether to generate a CompositeDataMovementEvent or a DataMovementEvent + * @param deflater * @throws IOException */ public static void generateEventsForNonStartedOutput(List<Event> eventList, int numPhysicalOutputs, OutputContext context, boolean generateVmEvent, - boolean isCompositeEvent) throws + boolean isCompositeEvent, Deflater deflater) throws IOException { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@ -369,7 +372,7 @@ public class ShuffleUtils { emptyPartitionDetails.set(0, numPhysicalOutputs, true); ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString( - TezUtilsInternal.toByteArray(emptyPartitionDetails)); + TezUtilsInternal.toByteArray(emptyPartitionDetails), deflater); payloadBuilder.setEmptyPartitions(emptyPartitionsBytesString); payloadBuilder.setRunDuration(0); DataMovementEventPayloadProto payloadProto = payloadBuilder.build(); @@ -403,7 +406,7 @@ public class ShuffleUtils { public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats, boolean reportDetailedPartitionStats) + @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Deflater deflater) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); @@ -421,11 +424,11 @@ public class ShuffleUtils { ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs, spillRecord, context, spillId, - finalMergeEnabled, isLastEvent, pathComponent); + finalMergeEnabled, isLastEvent, pathComponent, deflater); if (finalMergeEnabled || isLastEvent) { VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats, - reportDetailedPartitionStats); + reportDetailedPartitionStats, deflater); eventList.add(vmEvent); } @@ -435,7 +438,7 @@ public class ShuffleUtils { } public static VertexManagerEvent generateVMEvent(OutputContext context, - long[] sizePerPartition, boolean reportDetailedPartitionStats) + long[] sizePerPartition, boolean reportDetailedPartitionStats, Deflater deflater) throws IOException { ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); @@ -459,7 +462,7 @@ public class ShuffleUtils { DataOutputBuffer dout = new DataOutputBuffer(); stats.serialize(dout); ByteString partitionStatsBytes = - TezCommonUtils.compressByteArrayToByteString(dout.getData()); + TezCommonUtils.compressByteArrayToByteString(dout.getData(), deflater); vmBuilder.setPartitionStats(partitionStatsBytes); } } http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index adc3432..7d9eacf 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.BitSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.Inflater; import com.google.protobuf.ByteString; @@ -59,6 +60,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private final int ifileReadAheadLength; private final boolean useSharedInputs; private final InputContext inputContext; + private final Inflater inflater; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); private final AtomicInteger numDmeEvents = new AtomicInteger(0); @@ -78,6 +80,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { // this currently relies on a user to enable the flag // expand on idea based on vertex parallelism and num inputs this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0); + this.inflater = TezCommonUtils.newInflater(); } @Override @@ -131,7 +134,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { if (shufflePayload.hasEmptyPartitions()) { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload - .getEmptyPartitions()); + .getEmptyPartitions(), inflater); BitSet emptyPartionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartionsBitSet.get(srcIndex)) { InputAttemptIdentifier srcAttemptIdentifier = http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 7991485..f6f6da1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.BitSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.Inflater; import com.google.protobuf.ByteString; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; @@ -47,7 +48,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; - + private final Inflater inflater; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); private final AtomicInteger numDmeEvents = new AtomicInteger(0); @@ -58,6 +59,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl ShuffleScheduler scheduler) { this.inputContext = inputContext; this.scheduler = scheduler; + this.inflater = TezCommonUtils.newInflater(); } @Override @@ -110,7 +112,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl if (shufflePayload.hasEmptyPartitions()) { try { - byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); + byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions(), inflater); BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartitionsBitSet.get(partitionId)) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 609e9ff..9b3aadb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.zip.Deflater; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -52,6 +53,7 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -113,6 +115,7 @@ public class PipelinedSorter extends ExternalSorter { private int bufferIndex = -1; private final int MIN_BLOCK_SIZE; private final boolean lazyAllocateMem; + private final Deflater deflater; // TODO Set additional countesr - total bytes written, spills etc. @@ -224,6 +227,7 @@ public class PipelinedSorter extends ExternalSorter { valSerializer.open(span.out); keySerializer.open(span.out); minSpillsForCombine = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3); + deflater = TezCommonUtils.newBestCompressionDeflater(); } ByteBuffer allocateSpace() { @@ -350,7 +354,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats()); + reportDetailedPartitionStats(), deflater); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@ -673,7 +677,7 @@ public class PipelinedSorter extends ExternalSorter { ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats()); + reportDetailedPartitionStats(), deflater); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 873d8e1..b5c3071 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.Deflater; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -41,6 +42,7 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.Progress; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.api.Event; @@ -112,6 +114,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab final BlockingBuffer bb = new BlockingBuffer(); volatile boolean spillThreadRunning = false; final SpillThread spillThread = new SpillThread(); + private final Deflater deflater; final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>(); @@ -127,6 +130,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { super(outputContext, conf, numOutputs, initialMemoryAvailable); + deflater = TezCommonUtils.newBestCompressionDeflater(); // sanity checks final float spillper = this.conf.getFloat( TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, @@ -1133,7 +1137,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, - partitionStats, reportDetailedPartitionStats()); + partitionStats, reportDetailedPartitionStats(), deflater); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index eff29a5..0f38a29 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.Deflater; import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; @@ -116,6 +117,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // uncompressed size for each partition private final long[] sizePerPartition; private volatile long spilledSize = 0; + private final Deflater deflater; /** * Represents final number of records written (spills are not counted) @@ -158,6 +160,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit super(outputContext, conf, numOutputs); Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); + this.deflater = TezCommonUtils.newBestCompressionDeflater(); this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in // this case. Add it later if needed. @@ -594,7 +597,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private Event generateVMEvent() throws IOException { return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition, - this.reportDetailedPartitionStats()); + this.reportDetailedPartitionStats(), deflater); } private Event generateDMEvent() throws IOException { @@ -614,7 +617,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (emptyPartitions.cardinality() != 0) { // Empty partitions exist ByteString emptyPartitionsByteString = - TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions)); + TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(emptyPartitions), deflater); payloadBuilder.setEmptyPartitions(emptyPartitionsByteString); } @@ -658,7 +661,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit List<Event> eventList = Lists.newLinkedList(); eventList.add(ShuffleUtils.generateVMEvent(outputContext, reportPartitionStats() ? new long[numPartitions] : null, - reportDetailedPartitionStats())); + reportDetailedPartitionStats(), deflater)); //Send final event with all empty partitions and null path component. BitSet emptyPartitions = new BitSet(numPartitions); emptyPartitions.flip(0, numPartitions); @@ -979,7 +982,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); if (isFinalUpdate) { eventList.add(ShuffleUtils.generateVMEvent(outputContext, - sizePerPartition, reportDetailedPartitionStats())); + sizePerPartition, reportDetailedPartitionStats(), deflater)); } Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions); http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 9a3d778..5f6a304 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Deflater; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezConfiguration; @@ -69,6 +71,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { private long startTime; private long endTime; private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final Deflater deflater; @VisibleForTesting boolean pipelinedShuffle; @@ -78,6 +81,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { public OrderedPartitionedKVOutput(OutputContext outputContext, int numPhysicalOutputs) { super(outputContext, numPhysicalOutputs); + deflater = TezCommonUtils.newBestCompressionDeflater(); } @Override @@ -200,14 +204,14 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), - sorter.getPartitionStats(), sorter.reportDetailedPartitionStats()); + sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), deflater); } return eventList; } private List<Event> generateEmptyEvents() throws IOException { List<Event> eventList = Lists.newLinkedList(); - ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true); + ShuffleUtils.generateEventsForNonStartedOutput(eventList, getNumPhysicalOutputs(), getContext(), true, true, deflater); return eventList; } http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 4f74f7d..cc7b27c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; @@ -133,7 +134,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { returnEvents = new LinkedList<Event>(); ShuffleUtils .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), - false, false); + false, false, TezCommonUtils.newBestCompressionDeflater()); } // This works for non-started outputs since new counters will be created with an initial value of 0 http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index c4b3b22..3d16181 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; @@ -110,7 +111,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { returnEvents = new LinkedList<Event>(); ShuffleUtils .generateEventsForNonStartedOutput(returnEvents, getNumPhysicalOutputs(), getContext(), - false, true); + false, true, TezCommonUtils.newBestCompressionDeflater()); } // This works for non-started outputs since new counters will be created with an initial value of 0 http://git-wip-us.apache.org/repos/asf/tez/blob/da4098b9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 4233f5d..496468b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -163,7 +163,7 @@ public class TestShuffleUtils { String pathComponent = "/attempt_x_y_0/file.out"; ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -202,7 +202,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@ -243,7 +243,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false); + physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
