Repository: orc
Updated Branches:
  refs/heads/orc-310 [created] c6a58c670

ORC-310
better error handling for codec


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/d9727cb3
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/d9727cb3
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/d9727cb3

Branch: refs/heads/orc-310
Commit: d9727cb36c9b1e9f1c045e29d928401a680062cb
Parents: 411e633
Author: sergey <[email protected]>
Authored: Wed Feb 28 16:00:51 2018 -0800
Committer: sergey <[email protected]>
Committed: Wed Feb 28 16:00:51 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/orc/impl/OrcCodecPool.java  | 15 +++++-
 .../src/java/org/apache/orc/impl/OrcTail.java   |  4 +-
 .../org/apache/orc/impl/PhysicalFsWriter.java   |  7 ++-
 .../java/org/apache/orc/impl/ReaderImpl.java    | 12 +++--
 .../org/apache/orc/impl/RecordReaderImpl.java   |  6 ++-
 .../org/apache/orc/impl/RecordReaderUtils.java  | 54 +++++++++++++++-----
 .../java/org/apache/orc/impl/WriterImpl.java    |  6 ++-
 7 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java 
b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 56b9896..34f0bd4 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -18,10 +18,8 @@
 package org.apache.orc.impl;
 
 import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.slf4j.Logger;
@@ -62,6 +60,19 @@ public final class OrcCodecPool {
     return codec;
   }
 
+  public static void returnCodecSafely(
+      CompressionKind kind, CompressionCodec codec, boolean hasError) {
+    try {
+      if (!hasError) {
+        returnCodec(kind, codec);
+      } else {
+        codec.close();
+      }
+    } catch (Exception ex) {
+      LOG.error("Ignoring codec cleanup error", ex);
+    }
+  }
+
   public static void returnCodec(CompressionKind kind, CompressionCodec codec) 
{
     if (codec == null) {
       return;

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java 
b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index 3c78874..0136511 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -107,11 +107,13 @@ public final class OrcTail {
     if (serializedTail == null) return null;
     if (metadata == null) {
       CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
+      boolean isCodecError = true;
       try {
         metadata = extractMetadata(serializedTail, 0,
             (int) fileTail.getPostscript().getMetadataLength(), codec, 
getCompressionBufferSize());
+        isCodecError = false;
       } finally {
-        OrcCodecPool.returnCodec(getCompressionKind(), codec);
+        OrcCodecPool.returnCodecSafely(getCompressionKind(), codec, 
isCodecError);
       }
       // clear does not clear the contents but sets position to 0 and limit = 
capacity
       serializedTail.clear();

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java 
b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 918fae8..5d7f75a 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -225,7 +225,12 @@ public class PhysicalFsWriter implements PhysicalWriter {
 
   @Override
   public void close() throws IOException {
-    OrcCodecPool.returnCodec(compress, codec);
+    // We don't use the codec directly but do give it out codec in 
getCompressionCodec;
+    // that is used in tests, for boolean checks, and in StreamFactory. Some 
of the changes that
+    // would get rid of this pattern require cross-project interface changes, 
so just return the
+    // codec for now. If the codec is broken, reset will usually throw, so 
this is still the\
+    // correct thing to do.
+    OrcCodecPool.returnCodecSafely(compress, codec, false);
     rawWriter.close();
   }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 0daa0ed..94d693b 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -463,6 +463,7 @@ public class ReaderImpl implements Reader {
     CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
     OrcProto.FileTail.Builder fileTailBuilder;
     CompressionCodec codec = OrcCodecPool.getCodec(kind);
+    boolean isCodecError = true;
     try {
       OrcProto.Footer footer = extractFooter(buffer,
         (int) (buffer.position() + ps.getMetadataLength()),
@@ -472,8 +473,9 @@ public class ReaderImpl implements Reader {
         .setPostscript(ps)
         .setFooter(footer)
         .setFileLength(fileLength);
+      isCodecError = false;
     } finally {
-      OrcCodecPool.returnCodec(kind, codec);
+      OrcCodecPool.returnCodecSafely(kind, codec, isCodecError);
     }
     // clear does not clear the contents but sets position to 0 and limit = 
capacity
     buffer.clear();
@@ -593,10 +595,12 @@ public class ReaderImpl implements Reader {
       buffer.reset();
       OrcProto.Footer footer;
       CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+      boolean isCodecError = true;
       try {
         footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+        isCodecError = false;
       } finally {
-        OrcCodecPool.returnCodec(compressionKind, codec);
+        OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError);
       }
       fileTailBuilder.setFooter(footer);
     }
@@ -782,10 +786,12 @@ public class ReaderImpl implements Reader {
   public List<StripeStatistics> getStripeStatistics() throws IOException {
     if (metadata == null) {
       CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
+      boolean isCodecError = true;
       try {
         metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, 
codec, bufferSize);
+        isCodecError = false;
       } finally {
-        OrcCodecPool.returnCodec(compressionKind, codec);
+        OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError);
       }
     }
     if (stripeStats == null) {

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 0dacc70..d7722d1 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -17,8 +17,9 @@
  */
 package org.apache.orc.impl;
 
-import org.apache.orc.CompressionKind;
+import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.orc.CompressionKind;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -29,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
-
 import org.apache.orc.OrcFile;
 import org.apache.orc.util.BloomFilter;
 import org.apache.orc.util.BloomFilterIO;
@@ -1353,6 +1353,8 @@ public class RecordReaderImpl implements RecordReader {
     return result;
   }
 
+  // TODO: remove this
+  @VisibleForTesting
   public CompressionCodec getCompressionCodec() {
     return dataReader.getCompressionCodec();
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 1e2d0f1..b486571 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -149,6 +149,7 @@ public class RecordReaderUtils {
     private final Path path;
     private final boolean useZeroCopy;
     private final CompressionCodec codec;
+    private boolean hasCodecError = false;
     private final int bufferSize;
     private final int typeCount;
     private CompressionKind compressionKind;
@@ -172,6 +173,7 @@ public class RecordReaderUtils {
     public void open() throws IOException {
       this.file = fs.open(path);
       if (useZeroCopy) {
+        // ZCR only uses codec for boolean checks.
         zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
       } else {
         zcr = null;
@@ -228,11 +230,18 @@ public class RecordReaderUtils {
                 ByteBuffer bb = range.getData().duplicate();
                 bb.position((int) (offset - range.getOffset()));
                 bb.limit((int) (bb.position() + stream.getLength()));
-                indexes[column] = OrcProto.RowIndex.parseFrom(
-                    InStream.createCodedInputStream("index",
-                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                        stream.getLength(),
-                    codec, bufferSize));
+                boolean isOk = false;
+                try {
+                  indexes[column] = OrcProto.RowIndex.parseFrom(
+                      InStream.createCodedInputStream("index",
+                          ReaderImpl.singleton(new BufferChunk(bb, 0)),
+                          stream.getLength(), codec, bufferSize));
+                  isOk = true;
+                } finally {
+                  if (!isOk) {
+                    hasCodecError = true;
+                  }
+                }
               }
               break;
             case BLOOM_FILTER:
@@ -241,10 +250,18 @@ public class RecordReaderUtils {
                 ByteBuffer bb = range.getData().duplicate();
                 bb.position((int) (offset - range.getOffset()));
                 bb.limit((int) (bb.position() + stream.getLength()));
-                bloomFilterIndices[column] = 
OrcProto.BloomFilterIndex.parseFrom
-                    (InStream.createCodedInputStream("bloom_filter",
-                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                    stream.getLength(), codec, bufferSize));
+                boolean isOk = false;
+                try {
+                  bloomFilterIndices[column] = 
OrcProto.BloomFilterIndex.parseFrom
+                      (InStream.createCodedInputStream("bloom_filter",
+                          ReaderImpl.singleton(new BufferChunk(bb, 0)),
+                      stream.getLength(), codec, bufferSize));
+                  isOk = true;
+                } finally {
+                  if (!isOk) {
+                    hasCodecError = true;
+                  }
+                }
               }
               break;
             default:
@@ -267,9 +284,18 @@ public class RecordReaderUtils {
       // read the footer
       ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
       file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), 
tailLength);
-      return 
OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
-          ReaderImpl.singleton(new BufferChunk(tailBuf, 0)),
-          tailLength, codec, bufferSize));
+      boolean isOk = false;
+      try {
+        OrcProto.StripeFooter result = OrcProto.StripeFooter.parseFrom(
+            InStream.createCodedInputStream("footer", ReaderImpl.singleton(
+                new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
+        isOk = true;
+        return result;
+      } finally {
+        if (!isOk) {
+          hasCodecError = true;
+        }
+      }
     }
 
     @Override
@@ -281,7 +307,7 @@ public class RecordReaderUtils {
     @Override
     public void close() throws IOException {
       if (codec != null) {
-        OrcCodecPool.returnCodec(compressionKind, codec);
+        OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError);
       }
       if (pool != null) {
         pool.clear();
@@ -315,6 +341,8 @@ public class RecordReaderUtils {
 
     @Override
     public CompressionCodec getCompressionCodec() {
+      // Note: see comments in PhysicalFsWriter; we should probably get rid of 
this usage
+      //       pattern to make error handling for codec pool more robust.
       return codec;
     }
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/d9727cb3/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java 
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 90b410c..343f565 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.orc.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -26,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.TreeMap;
-
 import io.airlift.compress.lz4.Lz4Compressor;
 import io.airlift.compress.lz4.Lz4Decompressor;
 import io.airlift.compress.lzo.LzoCompressor;
@@ -50,7 +51,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-
 import com.google.protobuf.ByteString;
 
 /**
@@ -645,6 +645,8 @@ public class WriterImpl implements Writer, 
MemoryManager.Callback {
     return ReaderImpl.deserializeStats(builder.getStatisticsList());
   }
 
+  // TODO: remove this
+  @VisibleForTesting
   public CompressionCodec getCompressionCodec() {
     return physicalWriter.getCompressionCodec();
   }

Reply via email to