update
Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/c6a58c67 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/c6a58c67 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/c6a58c67 Branch: refs/heads/orc-310 Commit: c6a58c67051b136246030153e798980200b47f1a Parents: 06c93b3 Author: sergey <[email protected]> Authored: Thu Mar 8 17:13:27 2018 -0800 Committer: sergey <[email protected]> Committed: Thu Mar 8 17:13:27 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/orc/impl/OrcCodecPool.java | 3 +++ .../org/apache/orc/impl/PhysicalFsWriter.java | 8 +++++--- .../org/apache/orc/impl/RecordReaderUtils.java | 21 ++++++++++++-------- 3 files changed, 21 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 4ae43a2..8269316 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -69,6 +69,9 @@ public final class OrcCodecPool { */ public static void returnCodecSafely( CompressionKind kind, CompressionCodec codec, boolean observedError) { + if (codec == null) { + return; + } try { if (!observedError) { returnCodec(kind, codec); http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 5d7f75a..3f8e4e2 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final HadoopShims shims = HadoopShimsFactory.get(); - private final FSDataOutputStream rawWriter; + private FSDataOutputStream rawWriter; // the compressed metadata information outStream private OutStream writer = null; // a protobuf outStream around streamFactory @@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private final double paddingTolerance; private final long defaultStripeSize; private final CompressionKind compress; - private final CompressionCodec codec; + private CompressionCodec codec; private final boolean addBlockPadding; // the streams that make up the current stripe @@ -228,10 +228,12 @@ public class PhysicalFsWriter implements PhysicalWriter { // We don't use the codec directly but do give it out codec in getCompressionCodec; // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that // would get rid of this pattern require cross-project interface changes, so just return the - // codec for now. If the codec is broken, reset will usually throw, so this is still the\ + // codec for now. If the codec is broken, reset will usually throw, so this is still the // correct thing to do. OrcCodecPool.returnCodecSafely(compress, codec, false); + codec = null; rawWriter.close(); + rawWriter = null; } @Override http://git-wip-us.apache.org/repos/asf/orc/blob/c6a58c67/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index b486571..9d9e31a 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -143,12 +143,12 @@ public class RecordReaderUtils { private static class DefaultDataReader implements DataReader { private FSDataInputStream file = null; - private final ByteBufferAllocatorPool pool; + private ByteBufferAllocatorPool pool; private HadoopShims.ZeroCopyReaderShim zcr = null; private final FileSystem fs; private final Path path; private final boolean useZeroCopy; - private final CompressionCodec codec; + private CompressionCodec codec; private boolean hasCodecError = false; private final int bufferSize; private final int typeCount; @@ -162,11 +162,6 @@ public class RecordReaderUtils { this.codec = OrcCodecPool.getCodec(compressionKind); this.bufferSize = properties.getBufferSize(); this.typeCount = properties.getTypeCount(); - if (useZeroCopy) { - this.pool = new ByteBufferAllocatorPool(); - } else { - this.pool = null; - } } @Override @@ -174,6 +169,7 @@ public class RecordReaderUtils { this.file = fs.open(path); if (useZeroCopy) { // ZCR only uses codec for boolean checks. + pool = new ByteBufferAllocatorPool(); zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { zcr = null; @@ -308,6 +304,7 @@ public class RecordReaderUtils { public void close() throws IOException { if (codec != null) { OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError); + codec = null; } if (pool != null) { pool.clear(); @@ -316,6 +313,7 @@ public class RecordReaderUtils { try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { if (file != null) { file.close(); + file = null; } } } @@ -332,8 +330,15 @@ public class RecordReaderUtils { @Override public DataReader clone() { + if (this.file != null) { + throw new UnsupportedOperationException( + "Cannot clone a DataReader that is already opened"); + } try { - return (DataReader) super.clone(); + DefaultDataReader clone = (DefaultDataReader) super.clone(); + // Make sure we don't share the same codec between two readers. + clone.codec = OrcCodecPool.getCodec(clone.compressionKind); + return clone; } catch (CloneNotSupportedException e) { throw new UnsupportedOperationException("uncloneable", e); }
