This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65044d38fbe [HUDI-2118] Skip checking corrupt log blocks for 
transactional write file systems (#6830)
65044d38fbe is described below

commit 65044d38fbefc6af644be02197bb8b1911746202
Author: Bowen Zhu <[email protected]>
AuthorDate: Tue Jan 24 23:52:33 2023 -0800

    [HUDI-2118] Skip checking corrupt log blocks for transactional write file 
systems (#6830)
    
    Skip checking corrupt log blocks for transactional write file systems
    
    Impact
    The benchmark results show corrupted block check could be 100's of msecs 
for larger file sizes.
    This change would boost block read by 100's of ms per block.
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/common/fs/StorageSchemes.java  | 67 +++++++++++++--------
 .../hudi/common/table/log/HoodieLogFileReader.java |  7 +++
 .../common/functional/TestHoodieLogFormat.java     | 70 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 26 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
index 9b5af8bc648..cb8b4c51181 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/StorageSchemes.java
@@ -25,62 +25,65 @@ import java.util.Arrays;
  */
 public enum StorageSchemes {
   // Local filesystem
-  FILE("file", false),
+  FILE("file", false, false),
   // Hadoop File System
-  HDFS("hdfs", true),
+  HDFS("hdfs", true, false),
   // Baidu Advanced File System
-  AFS("afs", true),
+  AFS("afs", true, null),
   // Mapr File System
-  MAPRFS("maprfs", true),
+  MAPRFS("maprfs", true, null),
   // Apache Ignite FS
-  IGNITE("igfs", true),
+  IGNITE("igfs", true, null),
   // AWS S3
-  S3A("s3a", false), S3("s3", false),
+  S3A("s3a", false, true), S3("s3", false, true),
   // Google Cloud Storage
-  GCS("gs", false),
+  GCS("gs", false, true),
   // Azure WASB
-  WASB("wasb", false), WASBS("wasbs", false),
+  WASB("wasb", false, null), WASBS("wasbs", false, null),
   // Azure ADLS
-  ADL("adl", false),
+  ADL("adl", false, null),
   // Azure ADLS Gen2
-  ABFS("abfs", false), ABFSS("abfss", false),
+  ABFS("abfs", false, null), ABFSS("abfss", false, null),
   // Aliyun OSS
-  OSS("oss", false),
+  OSS("oss", false, null),
   // View FS for federated setups. If federating across cloud stores, then 
append support is false
-  VIEWFS("viewfs", true),
+  VIEWFS("viewfs", true, null),
   //ALLUXIO
-  ALLUXIO("alluxio", false),
+  ALLUXIO("alluxio", false, null),
   // Tencent Cloud Object Storage
-  COSN("cosn", false),
+  COSN("cosn", false, null),
   // Tencent Cloud HDFS
-  CHDFS("ofs", true),
+  CHDFS("ofs", true, null),
   // Tencent Cloud CacheFileSystem
-  GOOSEFS("gfs", false),
+  GOOSEFS("gfs", false, null),
   // Databricks file system
-  DBFS("dbfs", false),
+  DBFS("dbfs", false, null),
   // IBM Cloud Object Storage
-  COS("cos", false),
+  COS("cos", false, null),
   // Huawei Cloud Object Storage
-  OBS("obs", false),
+  OBS("obs", false, null),
   // Kingsoft Standard Storage ks3
-  KS3("ks3", false),
+  KS3("ks3", false, null),
   // JuiceFileSystem
-  JFS("jfs", true),
+  JFS("jfs", true, null),
   // Baidu Object Storage
-  BOS("bos", false),
+  BOS("bos", false, null),
   // Oracle Cloud Infrastructure Object Storage
-  OCI("oci", false),
+  OCI("oci", false, null),
   // Volcengine Object Storage
-  TOS("tos", false),
+  TOS("tos", false, null),
   // Volcengine Cloud HDFS
-  CFS("cfs", true);
+  CFS("cfs", true, null);
 
   private String scheme;
   private boolean supportsAppend;
+  // null for uncertain if write is transactional, please update this for each 
FS
+  private Boolean isWriteTransactional;
 
-  StorageSchemes(String scheme, boolean supportsAppend) {
+  StorageSchemes(String scheme, boolean supportsAppend, Boolean 
isWriteTransactional) {
     this.scheme = scheme;
     this.supportsAppend = supportsAppend;
+    this.isWriteTransactional = isWriteTransactional;
   }
 
   public String getScheme() {
@@ -91,6 +94,10 @@ public enum StorageSchemes {
     return supportsAppend;
   }
 
+  public boolean isWriteTransactional() {
+    return isWriteTransactional != null && isWriteTransactional;
+  }
+
   public static boolean isSchemeSupported(String scheme) {
     return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme));
   }
@@ -101,4 +108,12 @@ public enum StorageSchemes {
     }
     return Arrays.stream(StorageSchemes.values()).anyMatch(s -> 
s.supportsAppend() && s.scheme.equals(scheme));
   }
+
+  public static boolean isWriteTransactional(String scheme) {
+    if (!isSchemeSupported(scheme)) {
+      throw new IllegalArgumentException("Unsupported scheme :" + scheme);
+    }
+
+    return Arrays.stream(StorageSchemes.values()).anyMatch(s -> 
s.isWriteTransactional() && s.scheme.equals(scheme));
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index adbc0a1e1b3..0542a3b7da0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.fs.BoundedFsDataInputStream;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
+import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.fs.TimedFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -72,6 +73,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
   private static final Logger LOG = 
LogManager.getLogger(HoodieLogFileReader.class);
 
+  private final FileSystem fs;
   private final Configuration hadoopConf;
   private final FSDataInputStream inputStream;
   private final HoodieLogFile logFile;
@@ -107,6 +109,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
                              boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
                              String keyField, InternalSchema internalSchema) 
throws IOException {
+    this.fs = fs;
     this.hadoopConf = fs.getConf();
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
     //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
@@ -282,6 +285,10 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   }
 
   private boolean isBlockCorrupted(int blocksize) throws IOException {
+    if (StorageSchemes.isWriteTransactional(fs.getScheme())) {
+      // skip block corrupt check if writes are transactional. see 
https://issues.apache.org/jira/browse/HUDI-2118
+      return false;
+    }
     long currentPos = inputStream.getPos();
     long blockSizeFromFooter;
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 4c12c5e8aa5..a8828514eeb 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -85,9 +85,11 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -112,6 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests hoodie log format {@link HoodieLogFormat}.
@@ -914,6 +917,34 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     reader.close();
   }
 
+  @Test
+  public void testSkipCorruptedCheck() throws Exception {
+    // normal case: if the block is corrupted, we should be able to read back 
a corrupted block
+    Reader reader1 = createCorruptedFile("test-fileid1");
+    HoodieLogBlock block = reader1.next();
+    assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The 
read block should be a corrupt block");
+    reader1.close();
+
+    // as we can't mock a private method or directly test it, we are going 
this route.
+    // So adding a corrupted block which ideally should have been skipped for 
write transactional system. and hence when we call next() on log block reader, 
it will fail.
+    Reader reader2 = createCorruptedFile("test-fileid2");
+    assertTrue(reader2.hasNext(), "We should have corrupted block next");
+
+    // mock the fs to be GCS to skip isBlockCorrupted() check
+    Field f1 = reader2.getClass().getDeclaredField("fs");
+    f1.setAccessible(true);
+    FileSystem spyfs = Mockito.spy(fs);
+    when(spyfs.getScheme()).thenReturn("gs");
+    f1.set(reader2, spyfs);
+
+    // except an exception for block type since the block is corrupted
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      reader2.next();
+    });
+    assertTrue(exception.getMessage().contains("Invalid block byte type 
found"));
+    reader2.close();
+  }
+
   @Test
   public void testMissingBlockExceptMagicBytes() throws IOException, 
URISyntaxException, InterruptedException {
     HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
@@ -2648,4 +2679,43 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     copy.sort(Comparator.comparing(r -> ((GenericRecord) 
r).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
     return copy;
   }
+
+  private HoodieLogFormat.Reader createCorruptedFile(String fileId) throws 
Exception {
+    // block is corrupted, but check is skipped.
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit("100").withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, 
header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+
+    // Append some arbit byte[] to thee end of the log (mimics a partially 
written commit)
+    fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+    FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+    // create a block with
+    outputStream.write(HoodieLogFormat.MAGIC);
+    // Write out a length that does not confirm with the content
+    outputStream.writeLong(473);
+    outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
+    outputStream.writeInt(10000); // an invalid block type
+
+    // Write out a length that does not confirm with the content
+    outputStream.writeLong(400);
+    // Write out incomplete content
+    outputStream.write("something-random".getBytes());
+    outputStream.flush();
+    outputStream.close();
+
+    // First round of reads - we should be able to read the first block and 
then EOF
+    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
+
+    assertTrue(reader.hasNext(), "First block should be available");
+    reader.next();
+
+    return reader;
+  }
 }

Reply via email to