This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch branch-0.10.0 in repository https://gitbox.apache.org/repos/asf/tez.git
commit 40c86627143a51e8da869615f1bf54ed4a22a641 Author: László Bodor <[email protected]> AuthorDate: Tue Oct 6 08:56:57 2020 +0200 TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles) Signed-off-by: Laszlo Bodor <[email protected]> --- .../hadoop/TestConfigTranslationMRToTez.java | 1 - .../tez/runtime/library/common/ConfigUtils.java | 23 ---- .../runtime/library/common/TezRuntimeUtils.java | 18 ++- .../common/shuffle/orderedgrouped/Shuffle.java | 15 +-- .../library/common/sort/impl/ExternalSorter.java | 29 +---- .../runtime/library/common/sort/impl/IFile.java | 25 +--- .../writers/BaseUnorderedPartitionedKVWriter.java | 17 ++- .../runtime/library/input/UnorderedKVInput.java | 14 +-- .../tez/runtime/library/utils/CodecUtils.java | 127 +++++++++++++++++++++ .../library/common/shuffle/TestShuffleUtils.java | 8 +- .../library/common/sort/impl/TestIFile.java | 59 +++++++++- 11 files changed, 217 insertions(+), 119 deletions(-) diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java index deab64f..df68c8d 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java @@ -70,6 +70,5 @@ public class TestConfigTranslationMRToTez { assertEquals(LongWritable.class.getName(), ConfigUtils .getIntermediateInputValueClass(confVertex1).getName()); assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1)); - assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1)); } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java index 76d3dff..f83fdc9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java @@ -56,24 +56,6 @@ public class ConfigUtils { } return codecClass; } - - public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass( - Configuration conf, Class<DefaultCodec> defaultValue) { - Class<? extends CompressionCodec> codecClass = defaultValue; - String name = conf - .get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC); - if (name != null) { - try { - codecClass = conf.getClassByName(name).asSubclass( - CompressionCodec.class); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Compression codec " + name - + " was not found.", e); - } - } - return codecClass; - } - // TODO Move defaults over to a constants file. @@ -82,11 +64,6 @@ public class ConfigUtils { TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); } - public static boolean isIntermediateInputCompressed(Configuration conf) { - return conf.getBoolean( - TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); - } - public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) { Class<V> retv = (Class<V>) conf.getClass( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 8be8fa2..daeafbc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -265,16 +265,22 @@ public class TezRuntimeUtils { } public static String getBufferSizeProperty(CompressionCodec codec) { - switch (codec.getClass().getSimpleName().toString()) { - case "DefaultCodec": + return getBufferSizeProperty(codec.getClass().getName()); + } + + public static String getBufferSizeProperty(String className) { + switch (className) { + case "org.apache.hadoop.io.compress.DefaultCodec": return "io.file.buffer.size"; - case "SnappyCodec": + case "org.apache.hadoop.io.compress.SnappyCodec": return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY; - case "ZStandardCodec": + case "org.apache.hadoop.io.compress.ZStandardCodec": return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; - case "LzoCodec": + case "org.apache.hadoop.io.compress.LzoCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; + case "com.hadoop.compression.lzo.LzoCodec": return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; - case "Lz4Codec": + case "org.apache.hadoop.io.compress.Lz4Codec": return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY; default: return null; diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index 38f079a..db5ef73 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -39,8 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.GuavaShim; import org.apache.tez.common.TezRuntimeFrameworkConfigs; @@ -51,12 +49,11 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException; - +import org.apache.tez.runtime.library.utils.CodecUtils; import org.apache.tez.common.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -114,16 +111,8 @@ public class Shuffle implements ExceptionReporter { this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); + this.codec = CodecUtils.getCodec(conf); - if (ConfigUtils.isIntermediateInputCompressed(conf)) { - Class<? extends CompressionCodec> codecClass = - ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, conf); - // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race - codec.getDecompressorType(); - } else { - codec = null; - } this.ifileReadAhead = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 194e899..3ff74f7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progressable; @@ -63,7 +61,7 @@ import org.apache.tez.runtime.library.common.serializer.SerializationContext; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; - +import org.apache.tez.runtime.library.utils.CodecUtils; import org.apache.tez.common.Preconditions; @SuppressWarnings({"rawtypes"}) @@ -224,30 +222,7 @@ public abstract class ExternalSorter { numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); // compression - if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) { - Class<? extends CompressionCodec> codecClass = - ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, this.conf); - - if (codec != null) { - Class<? extends Compressor> compressorType = null; - Throwable cause = null; - try { - compressorType = codec.getCompressorType(); - } catch (RuntimeException e) { - cause = e; - } - if (compressorType == null) { - String errMsg = - String.format("Unable to get CompressorType for codec (%s). This is most" + - " likely due to missing native libraries for the codec.", - conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC)); - throw new IOException(errMsg, cause); - } - } - } else { - codec = null; - } + this.codec = CodecUtils.getCodec(conf); this.ifileReadAhead = this.conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 6aa44e2..1b2aeff 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -30,20 +30,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.io.BoundedByteArrayOutputStream; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.runtime.library.utils.BufferUtils; +import org.apache.tez.runtime.library.utils.CodecUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CodecPool; @@ -823,7 +821,8 @@ public class IFile { decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { decompressor.reset(); - in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength); + in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, + compressedLength); } else { LOG.warn("Could not obtain decompressor from CodecPool"); in = checksumIn; @@ -859,24 +858,6 @@ public class IFile { } } - private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec, - IFileInputStream checksumIn, Decompressor decompressor, int compressedLength) - throws IOException { - String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec); - - if (bufferSizeProp != null) { - Configurable configurableCodec = (Configurable) codec; - Configuration conf = configurableCodec.getConf(); - - int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); - LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", - DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize); - conf.setInt(bufferSizeProp, bufSize); - } - - return codec.createInputStream(checksumIn, decompressor); - } - /** * Read entire IFile content to disk. * diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java index ecc9e03..adea49f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java @@ -29,10 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.runtime.api.Event; @@ -43,6 +41,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; +import org.apache.tez.runtime.library.utils.CodecUtils; @SuppressWarnings("rawtypes") public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter { @@ -141,16 +140,14 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter { additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); dataViaEventSize = outputContext.getCounters().findCounter(TaskCounter.DATA_BYTES_VIA_EVENT); - + // compression - if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) { - Class<? extends CompressionCodec> codecClass = - ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, this.conf); - } else { - codec = null; + try { + this.codec = CodecUtils.getCodec(conf); + } catch (IOException e) { + throw new RuntimeException(e); } - + this.ifileReadAhead = this.conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 1db7869..c67c405 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -35,8 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -46,14 +44,13 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.readers.UnorderedKVReader; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl; import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager; import org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator; - +import org.apache.tez.runtime.library.utils.CodecUtils; import org.apache.tez.common.Preconditions; /** @@ -114,14 +111,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { if (!isStarted.get()) { ////// Initial configuration memoryUpdateCallbackHandler.validateUpdateReceived(); - CompressionCodec codec; - if (ConfigUtils.isIntermediateInputCompressed(conf)) { - Class<? extends CompressionCodec> codecClass = ConfigUtils - .getIntermediateInputCompressorClass(conf, DefaultCodec.class); - codec = ReflectionUtils.newInstance(codecClass, conf); - } else { - codec = null; - } + CompressionCodec codec = CodecUtils.getCodec(conf); boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java new file mode 100644 index 0000000..99d22c5 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.ConfigUtils; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.sort.impl.IFile; +import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class CodecUtils { + + private static final Logger LOG = LoggerFactory.getLogger(IFile.class); + private static final int DEFAULT_BUFFER_SIZE = 128 * 1024; + + private CodecUtils() { + } + + public static CompressionCodec getCodec(Configuration conf) throws IOException { + if (ConfigUtils.shouldCompressIntermediateOutput(conf)) { + Class<? extends CompressionCodec> codecClass = + ConfigUtils.getIntermediateOutputCompressorClass(conf, DefaultCodec.class); + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); + + if (codec != null) { + Class<? extends Compressor> compressorType = null; + Throwable cause = null; + try { + compressorType = codec.getCompressorType(); + } catch (RuntimeException e) { + cause = e; + } + if (compressorType == null) { + String errMsg = String.format( + "Unable to get CompressorType for codec (%s). This is most" + + " likely due to missing native libraries for the codec.", + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC)); + throw new IOException(errMsg, cause); + } + } + return codec; + } else { + return null; + } + } + + public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec, + IFileInputStream checksumIn, Decompressor decompressor, int compressedLength) + throws IOException { + String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec); + Configurable configurableCodec = (Configurable) codec; + int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE); + + CompressionInputStream in = null; + + if (bufferSizeProp != null) { + Configuration conf = configurableCodec.getConf(); + int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); + LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", + DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize); + + synchronized (codec) { + conf.setInt(bufferSizeProp, newBufSize); + + in = codec.createInputStream(checksumIn, decompressor); + /* + * We would better reset the original buffer size into the codec. Basically the buffer size + * is used at 2 places. + * + * 1. It can tell the inputstream/outputstream buffersize (which is created by + * codec.createInputStream/codec.createOutputStream). This is something which might and + * should be optimized in config, as inputstreams instantiate and use their own buffer and + * won't reuse buffers from previous streams (TEZ-4135). + * + * 2. The same buffersize is used when a codec creates a new Compressor/Decompressor. The + * fundamental difference is that Compressor/Decompressor instances are expensive and reused + * by hadoop's CodecPool. Here is a hidden mismatch, which can happen when a codec is + * created with a small buffersize config. Once it creates a Compressor/Decompressor + * instance from its config field, the reused Compressor/Decompressor instance will be + * reused later, even when application handles large amount of data. This way we can end up + * in large stream buffers + small compressor/decompressor buffers, which can be suboptimal, + * moreover, it can lead to strange errors, when a compressed output exceeds the size of the + * buffer (TEZ-4234). + * + * An interesting outcome is that - as the codec buffersize config affects both + * compressor(output) and decompressor(input) paths - an altered codec config can cause the + * issues above for Compressor instances as well, even when we tried to leverage from + * smaller buffer size only on decompression paths. + */ + configurableCodec.getConf().setInt(bufferSizeProp, originalSize); + } + } else { + in = codec.createInputStream(checksumIn, decompressor); + } + + return in; + } +} \ No newline at end of file diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 520dec7..446801a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -290,6 +290,7 @@ public class TestShuffleUtils { .thenThrow(new InternalError(codecErrorMsg)); Decompressor mockDecoder = mock(Decompressor.class); CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class); + when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class)); when(mockCodec.createDecompressor()).thenReturn(mockDecoder); when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) .thenReturn(mockCodecStream); @@ -312,6 +313,7 @@ public class TestShuffleUtils { .thenThrow(new IllegalArgumentException(codecErrorMsg)); Decompressor mockDecoder = mock(Decompressor.class); CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class); + when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class)); when(mockCodec.createDecompressor()).thenReturn(mockDecoder); when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) .thenReturn(mockCodecStream); @@ -327,7 +329,8 @@ public class TestShuffleUtils { CompressionInputStream mockCodecStream1 = mock(CompressionInputStream.class); when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt())) .thenThrow(new SocketTimeoutException(codecErrorMsg)); - CompressionCodec mockCodec1 = mock(CompressionCodec.class); + CompressionCodec mockCodec1 = mock(ConfigurableCodecForTest.class); + when(((ConfigurableCodecForTest) mockCodec1).getConf()).thenReturn(mock(Configuration.class)); when(mockCodec1.createDecompressor()).thenReturn(mockDecoder); when(mockCodec1.createInputStream(any(InputStream.class), any(Decompressor.class))) .thenReturn(mockCodecStream1); @@ -342,7 +345,8 @@ public class TestShuffleUtils { CompressionInputStream mockCodecStream2 = mock(CompressionInputStream.class); when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt())) .thenThrow(new InternalError(codecErrorMsg)); - CompressionCodec mockCodec2 = mock(CompressionCodec.class); + CompressionCodec mockCodec2 = mock(ConfigurableCodecForTest.class); + when(((ConfigurableCodecForTest) mockCodec2).getConf()).thenReturn(mock(Configuration.class)); when(mockCodec2.createDecompressor()).thenReturn(mockDecoder); when(mockCodec2.createInputStream(any(InputStream.class), any(Decompressor.class))) .thenReturn(mockCodecStream2); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index c74496e..bf35955 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -50,9 +50,11 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.util.NativeCodeLoader; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; @@ -66,6 +68,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair; import org.apache.tez.runtime.library.utils.BufferUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -729,13 +732,16 @@ public class TestIFile { @Test public void testInMemoryBufferSize() throws IOException { + Configurable configurableCodec = (Configurable) codec; + int originalCodecBufferSize = + configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1); + // for smaller amount of data, codec buffer should be sized according to compressed data length List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100)); Writer writer = writeTestFile(false, false, data, codec); readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); - Configurable configurableCodec = (Configurable) codec; - Assert.assertEquals(writer.getCompressedLength(), + Assert.assertEquals(originalCodecBufferSize, // original size is repaired configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); // buffer size cannot grow infinitely with compressed data size @@ -743,10 +749,57 @@ public class TestIFile { writer = writeTestFile(false, false, data, codec); readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); - Assert.assertEquals(128*1024, + Assert.assertEquals(originalCodecBufferSize, // original size is repaired configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); } + @Test(expected = IllegalArgumentException.class) + public void testSmallDataCompression() throws IOException { + Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded()); + + tryWriteFileWithBufferSize(17, "org.apache.hadoop.io.compress.Lz4Codec"); + tryWriteFileWithBufferSize(32, "org.apache.hadoop.io.compress.Lz4Codec"); + } + + private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName) + throws IOException { + Configuration conf = new Configuration(); + + System.out.println("trying with buffer size: " + bufferSize); + conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize)); + CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); + CompressionCodec codecToTest = + codecFactory.getCodecByClassName(codecClassName); + List<KVPair> data = KVDataGen.generateTestDataOfKeySize(false, 1, 0); + writeTestFile(false, false, data, codecToTest); + } + + @Test(expected = IllegalArgumentException.class) + public void testLz4CompressedDataIsLargerThanOriginal() throws IOException { + Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded()); + + // this one succeeds + byte[] buf = new byte[32]; + initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48); + Lz4Compressor comp = new Lz4Compressor(32, false); + comp.setInput(buf, 0, 32); + comp.compress(buf, 0, 32); + + // adding 1 more element makes that fail + buf = new byte[32]; + initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48, + 50); + comp = new Lz4Compressor(32, false); + comp.setInput(buf, 0, 32); + comp.compress(buf, 0, 32); + } + + private void initBufWithNumbers(byte[] buf, int... args) { + for (int i = 0; i < args.length; i++) { + buf[i] = (byte) args[i]; + } + } + /** * Test different options (RLE, repeat keys, compression) on reader/writer *
