This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b451b3  HUDI-140 : GCS: Log File Reading not working due to 
difference in seek() behavior for EOF
0b451b3 is described below

commit 0b451b3a58cabe25c0cecd3fd8847a8597e2313a
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Mon Jul 15 17:29:34 2019 -0700

    HUDI-140 : GCS: Log File Reading not working due to difference in seek() 
behavior for EOF
---
 .../common/table/log/HoodieLogFileReader.java      |  7 ++++++-
 .../common/table/log/block/HoodieLogBlock.java     | 24 +++++++++++++++++++---
 .../java/com/uber/hoodie/common/util/FSUtils.java  | 12 +++++++++++
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
index 70afe22..21ffc60 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
@@ -27,6 +27,7 @@ import 
com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
 import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
 import 
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import 
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.exception.CorruptedLogFileException;
 import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.exception.HoodieNotSupportedException;
@@ -234,7 +235,11 @@ class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private boolean isBlockCorrupt(int blocksize) throws IOException {
     long currentPos = inputStream.getPos();
     try {
-      inputStream.seek(currentPos + blocksize);
+      if (FSUtils.isGCSInputStream(inputStream)) {
+        inputStream.seek(currentPos + blocksize - 1);
+      } else {
+        inputStream.seek(currentPos + blocksize);
+      }
     } catch (EOFException e) {
       // this is corrupt
       // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
index 011c83b..ebd978a 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.common.table.log.block;
 import com.google.common.collect.Maps;
 import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
+import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
 import java.io.ByteArrayOutputStream;
@@ -233,7 +234,7 @@ public abstract class HoodieLogBlock {
       inputStream.readFully(content, 0, contentLength);
     } else {
       // Seek to the end of the content block
-      inputStream.seek(inputStream.getPos() + contentLength);
+      safeSeek(inputStream, inputStream.getPos() + contentLength);
     }
     return content;
   }
@@ -245,9 +246,9 @@ public abstract class HoodieLogBlock {
 
     try {
       content = Optional.of(new byte[(int) 
this.getBlockContentLocation().get().getBlockSize()]);
-      
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
+      safeSeek(inputStream, 
this.getBlockContentLocation().get().getContentPositionInLogFile());
       inputStream.readFully(content.get(), 0, content.get().length);
-      inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos());
+      safeSeek(inputStream, 
this.getBlockContentLocation().get().getBlockEndPos());
     } catch (IOException e) {
       try {
         // TODO : fs.open() and return inputstream again, need to pass FS 
configuration
@@ -268,4 +269,21 @@ public abstract class HoodieLogBlock {
     content = Optional.empty();
   }
 
+  /**
+   * Handles difference in seek behavior for GCS and non-GCS input stream
+   * @param inputStream Input Stream
+   * @param pos  Position to seek
+   * @throws IOException
+   */
+  private static void safeSeek(FSDataInputStream inputStream, long pos) throws 
IOException {
+    try {
+      inputStream.seek(pos);
+    } catch (EOFException e) {
+      if (FSUtils.isGCSInputStream(inputStream)) {
+        inputStream.seek(pos - 1);
+      } else {
+        throw e;
+      }
+    }
+  }
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index d9069a1..0d67d03 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -45,6 +45,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -552,4 +553,15 @@ public class FSUtils {
     return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
         new Path(basePath, partitionPath);
   }
+
+  /**
+   * This is due to HUDI-140 GCS has a different behavior for detecting EOF 
during seek().
+   * @param inputStream FSDataInputStream
+   * @return true if the inputstream or the wrapped one is of type 
GoogleHadoopFSInputStream
+   */
+  public static boolean isGCSInputStream(FSDataInputStream inputStream) {
+    return 
inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream")
+        || inputStream.getWrappedStream().getClass().getCanonicalName()
+        .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
+  }
 }

Reply via email to