Repository: orc Updated Branches: refs/heads/orc-310 [created] c6a58c670
ORC-310 better error handling for codec Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/d9727cb3 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/d9727cb3 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/d9727cb3 Branch: refs/heads/orc-310 Commit: d9727cb36c9b1e9f1c045e29d928401a680062cb Parents: 411e633 Author: sergey <[email protected]> Authored: Wed Feb 28 16:00:51 2018 -0800 Committer: sergey <[email protected]> Committed: Wed Feb 28 16:00:51 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/orc/impl/OrcCodecPool.java | 15 +++++- .../src/java/org/apache/orc/impl/OrcTail.java | 4 +- .../org/apache/orc/impl/PhysicalFsWriter.java | 7 ++- .../java/org/apache/orc/impl/ReaderImpl.java | 12 +++-- .../org/apache/orc/impl/RecordReaderImpl.java | 6 ++- .../org/apache/orc/impl/RecordReaderUtils.java | 54 +++++++++++++++----- .../java/org/apache/orc/impl/WriterImpl.java | 6 ++- 7 files changed, 80 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 56b9896..34f0bd4 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -18,10 +18,8 @@ package org.apache.orc.impl; import java.util.concurrent.ConcurrentHashMap; - import java.util.ArrayList; import java.util.List; - import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.slf4j.Logger; @@ -62,6 +60,19 @@ public final class OrcCodecPool { return codec; } + public static void returnCodecSafely( + CompressionKind kind, CompressionCodec codec, boolean hasError) { + try { + if (!hasError) { + returnCodec(kind, codec); + } else { + codec.close(); + } + } catch (Exception ex) { + LOG.error("Ignoring codec cleanup error", ex); + } + } + public static void returnCodec(CompressionKind kind, CompressionCodec codec) { if (codec == null) { return; http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/OrcTail.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java index 3c78874..0136511 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcTail.java +++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java @@ -107,11 +107,13 @@ public final class OrcTail { if (serializedTail == null) return null; if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind()); + boolean isCodecError = true; try { metadata = extractMetadata(serializedTail, 0, (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize()); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(getCompressionKind(), codec); + OrcCodecPool.returnCodecSafely(getCompressionKind(), codec, isCodecError); } // clear does not clear the contents but sets position to 0 and limit = capacity serializedTail.clear(); http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 918fae8..5d7f75a 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -225,7 +225,12 @@ public class PhysicalFsWriter implements PhysicalWriter { @Override public void close() throws IOException { - OrcCodecPool.returnCodec(compress, codec); + // We don't use the codec directly but do give it out codec in getCompressionCodec; + // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that + // would get rid of this pattern require cross-project interface changes, so just return the + // codec for now. If the codec is broken, reset will usually throw, so this is still the\ + // correct thing to do. + OrcCodecPool.returnCodecSafely(compress, codec, false); rawWriter.close(); } http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 0daa0ed..94d693b 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -463,6 +463,7 @@ public class ReaderImpl implements Reader { CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name()); OrcProto.FileTail.Builder fileTailBuilder; CompressionCodec codec = OrcCodecPool.getCodec(kind); + boolean isCodecError = true; try { OrcProto.Footer footer = extractFooter(buffer, (int) (buffer.position() + ps.getMetadataLength()), @@ -472,8 +473,9 @@ public class ReaderImpl implements Reader { .setPostscript(ps) .setFooter(footer) .setFileLength(fileLength); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(kind, codec); + OrcCodecPool.returnCodecSafely(kind, codec, isCodecError); } // clear does not clear the contents but sets position to 0 and limit = capacity buffer.clear(); @@ -593,10 +595,12 @@ public class ReaderImpl implements Reader { buffer.reset(); OrcProto.Footer footer; CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); + boolean isCodecError = true; try { footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); } fileTailBuilder.setFooter(footer); } @@ -782,10 +786,12 @@ public class ReaderImpl implements Reader { public List<StripeStatistics> getStripeStatistics() throws IOException { if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); + boolean isCodecError = true; try { metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); } } if (stripeStats == null) { http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java index 0dacc70..d7722d1 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -17,8 +17,9 @@ */ package org.apache.orc.impl; -import org.apache.orc.CompressionKind; +import com.google.common.annotations.VisibleForTesting; +import org.apache.orc.CompressionKind; import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -29,7 +30,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; - import org.apache.orc.OrcFile; import org.apache.orc.util.BloomFilter; import org.apache.orc.util.BloomFilterIO; @@ -1353,6 +1353,8 @@ public class RecordReaderImpl implements RecordReader { return result; } + // TODO: remove this + @VisibleForTesting public CompressionCodec getCompressionCodec() { return dataReader.getCompressionCodec(); } http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 1e2d0f1..b486571 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -149,6 +149,7 @@ public class RecordReaderUtils { private final Path path; private final boolean useZeroCopy; private final CompressionCodec codec; + private boolean hasCodecError = false; private final int bufferSize; private final int typeCount; private CompressionKind compressionKind; @@ -172,6 +173,7 @@ public class RecordReaderUtils { public void open() throws IOException { this.file = fs.open(path); if (useZeroCopy) { + // ZCR only uses codec for boolean checks. zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { zcr = null; @@ -228,11 +230,18 @@ public class RecordReaderUtils { ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - indexes[column] = OrcProto.RowIndex.parseFrom( - InStream.createCodedInputStream("index", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), - codec, bufferSize)); + boolean isOk = false; + try { + indexes[column] = OrcProto.RowIndex.parseFrom( + InStream.createCodedInputStream("index", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); + isOk = true; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } break; case BLOOM_FILTER: @@ -241,10 +250,18 @@ public class RecordReaderUtils { ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom - (InStream.createCodedInputStream("bloom_filter", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), codec, bufferSize)); + boolean isOk = false; + try { + bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom + (InStream.createCodedInputStream("bloom_filter", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); + isOk = true; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } break; default: @@ -267,9 +284,18 @@ public class RecordReaderUtils { // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", - ReaderImpl.singleton(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize)); + boolean isOk = false; + try { + OrcProto.StripeFooter result = OrcProto.StripeFooter.parseFrom( + InStream.createCodedInputStream("footer", ReaderImpl.singleton( + new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); + isOk = true; + return result; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } @Override @@ -281,7 +307,7 @@ public class RecordReaderUtils { @Override public void close() throws IOException { if (codec != null) { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError); } if (pool != null) { pool.clear(); @@ -315,6 +341,8 @@ public class RecordReaderUtils { @Override public CompressionCodec getCompressionCodec() { + // Note: see comments in PhysicalFsWriter; we should probably get rid of this usage + // pattern to make error handling for codec pool more robust. return codec; } } http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/WriterImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index 90b410c..343f565 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -18,6 +18,8 @@ package org.apache.orc.impl; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -26,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; import java.util.TreeMap; - import io.airlift.compress.lz4.Lz4Compressor; import io.airlift.compress.lz4.Lz4Decompressor; import io.airlift.compress.lzo.LzoCompressor; @@ -50,7 +51,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; - import com.google.protobuf.ByteString; /** @@ -645,6 +645,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { return ReaderImpl.deserializeStats(builder.getStatisticsList()); } + // TODO: remove this + @VisibleForTesting public CompressionCodec getCompressionCodec() { return physicalWriter.getCompressionCodec(); }
