Repository: orc Updated Branches: refs/heads/master 3d44366a4 -> 7dfe4a748
ORC-310 Improved error handling of compression codecs when being reset. Also fixes reuse of codecs via DataReader.clone(). Fixes #222 Signed-off-by: Owen O'Malley <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/7dfe4a74 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/7dfe4a74 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/7dfe4a74 Branch: refs/heads/master Commit: 7dfe4a7483145be109201f1dbeeffac875ccef1a Parents: 3d44366 Author: sergey <[email protected]> Authored: Wed Feb 28 16:00:51 2018 -0800 Committer: Owen O'Malley <[email protected]> Committed: Thu Mar 15 09:37:24 2018 -0700 ---------------------------------------------------------------------- .../src/java/org/apache/orc/DataReader.java | 7 +++- .../java/org/apache/orc/impl/OrcCodecPool.java | 37 ++++++++++------- .../org/apache/orc/impl/PhysicalFsWriter.java | 10 ++++- .../org/apache/orc/impl/RecordReaderUtils.java | 42 ++++++++++++-------- 4 files changed, 61 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/DataReader.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java index 3155862..ed3acbb 100644 --- a/java/core/src/java/org/apache/orc/DataReader.java +++ b/java/core/src/java/org/apache/orc/DataReader.java @@ -79,7 +79,10 @@ public interface DataReader extends AutoCloseable, Cloneable { @Override public void close() throws IOException; - /** Returns the compression codec used by this datareader. - * @return */ + /** + * Returns the compression codec used by this datareader. + * We should consider removing this from the interface. + * @return the compression codec + */ CompressionCodec getCompressionCodec(); } http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 56b9896..24ee618 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -18,10 +18,8 @@ package org.apache.orc.impl; import java.util.concurrent.ConcurrentHashMap; - import java.util.ArrayList; import java.util.List; - import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.slf4j.Logger; @@ -62,26 +60,35 @@ public final class OrcCodecPool { return codec; } + /** + * Returns the codec to the pool or closes it, suppressing exceptions. + * @param kind Compression kind. + * @param codec Codec. + */ public static void returnCodec(CompressionKind kind, CompressionCodec codec) { if (codec == null) { return; } assert kind != CompressionKind.NONE; - codec.reset(); - List<CompressionCodec> list = POOL.get(kind); - if (list == null) { - List<CompressionCodec> newList = new ArrayList<>(); - List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList); - list = (oldList == null) ? newList : oldList; - } - synchronized (list) { - if (list.size() < MAX_PER_KIND) { - list.add(codec); - return; + try { + codec.reset(); + List<CompressionCodec> list = POOL.get(kind); + if (list == null) { + List<CompressionCodec> newList = new ArrayList<>(); + List<CompressionCodec> oldList = POOL.putIfAbsent(kind, newList); + list = (oldList == null) ? newList : oldList; + } + synchronized (list) { + if (list.size() < MAX_PER_KIND) { + list.add(codec); + return; + } } + // We didn't add the codec to the list. + codec.close(); + } catch (Exception ex) { + LOG.error("Ignoring codec cleanup error", ex); } - // We didn't add the codec to the list. - codec.close(); } public static int getPoolSize(CompressionKind kind) { http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 918fae8..38ca40e 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final HadoopShims shims = HadoopShimsFactory.get(); - private final FSDataOutputStream rawWriter; + private FSDataOutputStream rawWriter; // the compressed metadata information outStream private OutStream writer = null; // a protobuf outStream around streamFactory @@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private final double paddingTolerance; private final long defaultStripeSize; private final CompressionKind compress; - private final CompressionCodec codec; + private CompressionCodec codec; private final boolean addBlockPadding; // the streams that make up the current stripe @@ -225,8 +225,14 @@ public class PhysicalFsWriter implements PhysicalWriter { @Override public void close() throws IOException { + // We don't use the codec directly but do give it out codec in getCompressionCodec; + // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that + // would get rid of this pattern require cross-project interface changes, so just return the + // codec for now. OrcCodecPool.returnCodec(compress, codec); + codec = null; rawWriter.close(); + rawWriter = null; } @Override http://git-wip-us.apache.org/repos/asf/orc/blob/7dfe4a74/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 1e2d0f1..705e768 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,13 +17,15 @@ */ package org.apache.orc.impl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; - import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -35,7 +37,6 @@ import org.apache.orc.CompressionKind; import org.apache.orc.DataReader; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; - import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -44,6 +45,7 @@ import org.apache.orc.TypeDescription; */ public class RecordReaderUtils { private static final HadoopShims SHIMS = HadoopShimsFactory.get(); + private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class); static boolean hadBadBloomFilters(TypeDescription.Category category, OrcFile.WriterVersion version) { @@ -143,12 +145,12 @@ public class RecordReaderUtils { private static class DefaultDataReader implements DataReader { private FSDataInputStream file = null; - private final ByteBufferAllocatorPool pool; + private ByteBufferAllocatorPool pool; private HadoopShims.ZeroCopyReaderShim zcr = null; private final FileSystem fs; private final Path path; private final boolean useZeroCopy; - private final CompressionCodec codec; + private CompressionCodec codec; private final int bufferSize; private final int typeCount; private CompressionKind compressionKind; @@ -161,17 +163,14 @@ public class RecordReaderUtils { this.codec = OrcCodecPool.getCodec(compressionKind); this.bufferSize = properties.getBufferSize(); this.typeCount = properties.getTypeCount(); - if (useZeroCopy) { - this.pool = new ByteBufferAllocatorPool(); - } else { - this.pool = null; - } } @Override public void open() throws IOException { this.file = fs.open(path); if (useZeroCopy) { + // ZCR only uses codec for boolean checks. + pool = new ByteBufferAllocatorPool(); zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { zcr = null; @@ -231,8 +230,7 @@ public class RecordReaderUtils { indexes[column] = OrcProto.RowIndex.parseFrom( InStream.createCodedInputStream("index", ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), - codec, bufferSize)); + stream.getLength(), codec, bufferSize)); } break; case BLOOM_FILTER: @@ -267,9 +265,9 @@ public class RecordReaderUtils { // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", - ReaderImpl.singleton(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize)); + return OrcProto.StripeFooter.parseFrom( + InStream.createCodedInputStream("footer", ReaderImpl.singleton( + new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); } @Override @@ -282,6 +280,7 @@ public class RecordReaderUtils { public void close() throws IOException { if (codec != null) { OrcCodecPool.returnCodec(compressionKind, codec); + codec = null; } if (pool != null) { pool.clear(); @@ -290,6 +289,7 @@ public class RecordReaderUtils { try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { if (file != null) { file.close(); + file = null; } } } @@ -306,8 +306,18 @@ public class RecordReaderUtils { @Override public DataReader clone() { + if (this.file != null) { + // We should really throw here, but that will cause failures in Hive. + // While Hive uses clone, just log a warning. + LOG.warn("Cloning an opened DataReader; the stream will be reused and closed twice"); + } try { - return (DataReader) super.clone(); + DefaultDataReader clone = (DefaultDataReader) super.clone(); + if (codec != null) { + // Make sure we don't share the same codec between two readers. + clone.codec = OrcCodecPool.getCodec(clone.compressionKind); + } + return clone; } catch (CloneNotSupportedException e) { throw new UnsupportedOperationException("uncloneable", e); }
