This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b451b3 HUDI-140 : GCS: Log File Reading not working due to
difference in seek() behavior for EOF
0b451b3 is described below
commit 0b451b3a58cabe25c0cecd3fd8847a8597e2313a
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Mon Jul 15 17:29:34 2019 -0700
HUDI-140 : GCS: Log File Reading not working due to difference in seek()
behavior for EOF
---
.../common/table/log/HoodieLogFileReader.java | 7 ++++++-
.../common/table/log/block/HoodieLogBlock.java | 24 +++++++++++++++++++---
.../java/com/uber/hoodie/common/util/FSUtils.java | 12 +++++++++++
3 files changed, 39 insertions(+), 4 deletions(-)
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
index 70afe22..21ffc60 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
@@ -27,6 +27,7 @@ import
com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.CorruptedLogFileException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
@@ -234,7 +235,11 @@ class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
- inputStream.seek(currentPos + blocksize);
+ if (FSUtils.isGCSInputStream(inputStream)) {
+ inputStream.seek(currentPos + blocksize - 1);
+ } else {
+ inputStream.seek(currentPos + blocksize);
+ }
} catch (EOFException e) {
// this is corrupt
// This seek is required because contract of seek() is different for
naked DFSInputStream vs BufferedFSInputStream
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
index 011c83b..ebd978a 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.common.table.log.block;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
+import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.ByteArrayOutputStream;
@@ -233,7 +234,7 @@ public abstract class HoodieLogBlock {
inputStream.readFully(content, 0, contentLength);
} else {
// Seek to the end of the content block
- inputStream.seek(inputStream.getPos() + contentLength);
+ safeSeek(inputStream, inputStream.getPos() + contentLength);
}
return content;
}
@@ -245,9 +246,9 @@ public abstract class HoodieLogBlock {
try {
content = Optional.of(new byte[(int)
this.getBlockContentLocation().get().getBlockSize()]);
-
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
+ safeSeek(inputStream,
this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length);
- inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos());
+ safeSeek(inputStream,
this.getBlockContentLocation().get().getBlockEndPos());
} catch (IOException e) {
try {
// TODO : fs.open() and return inputstream again, need to pass FS
configuration
@@ -268,4 +269,21 @@ public abstract class HoodieLogBlock {
content = Optional.empty();
}
+ /**
+ * Handles difference in seek behavior for GCS and non-GCS input stream
+ * @param inputStream Input Stream
+ * @param pos Position to seek
+ * @throws IOException
+ */
+ private static void safeSeek(FSDataInputStream inputStream, long pos) throws
IOException {
+ try {
+ inputStream.seek(pos);
+ } catch (EOFException e) {
+ if (FSUtils.isGCSInputStream(inputStream)) {
+ inputStream.seek(pos - 1);
+ } else {
+ throw e;
+ }
+ }
+ }
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index d9069a1..0d67d03 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -45,6 +45,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -552,4 +553,15 @@ public class FSUtils {
return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
new Path(basePath, partitionPath);
}
+
+ /**
+ * This is due to HUDI-140 GCS has a different behavior for detecting EOF
during seek().
+ * @param inputStream FSDataInputStream
+ * @return true if the inputstream or the wrapped one is of type
GoogleHadoopFSInputStream
+ */
+ public static boolean isGCSInputStream(FSDataInputStream inputStream) {
+ return
inputStream.getClass().getCanonicalName().equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream")
+ || inputStream.getWrappedStream().getClass().getCanonicalName()
+ .equals("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream");
+ }
}