This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e11e5cabb [HUDI-6452] Add MOR snapshot reader to integrate with 
query engines without using Hadoop APIs (#9066)
66e11e5cabb is described below

commit 66e11e5cabb85c43f7f3dffe25432a5edaec9d7d
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jul 5 17:11:20 2023 +0530

    [HUDI-6452] Add MOR snapshot reader to integrate with query engines without 
using Hadoop APIs (#9066)
    
    Add a new `HoodieMergeOnReadSnapshotReader` that implements
    `Iterator<HoodieRecord>`. It merges the base Parquet data with
    Avro data in log files.
---
 .../table/log/AbstractHoodieLogRecordReader.java   |   2 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |   2 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   2 +-
 .../realtime/AbstractRealtimeRecordReader.java     |  14 +-
 .../realtime/HoodieMergeOnReadSnapshotReader.java  | 216 +++++++++++++++++++++
 .../HoodieMergeOnReadTableInputFormat.java         |  19 +-
 .../hadoop/realtime/HoodieRealtimeFileSplit.java   |  14 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  39 ++--
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  11 +-
 .../TestHoodieMergeOnReadSnapshotReader.java       | 176 +++++++++++++++++
 10 files changed, 454 insertions(+), 41 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 1e5bef103f4..78e701a07d4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -641,7 +641,7 @@ public abstract class AbstractHoodieLogRecordReader {
    *
    * @param hoodieRecord Hoodie Record to process
    */
-  protected abstract <T> void processNextRecord(HoodieRecord<T> hoodieRecord) 
throws Exception;
+  public abstract <T> void processNextRecord(HoodieRecord<T> hoodieRecord) 
throws Exception;
 
   /**
    * Process next deleted record.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index c150ba9a3e6..7b98b4cc35e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -236,7 +236,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
   }
 
   @Override
-  protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
+  public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
     String key = newRecord.getRecordKey();
     HoodieRecord<T> prevRecord = records.get(key);
     if (prevRecord != null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 51044c0814c..f62ec0febd5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -69,7 +69,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
   }
 
   @Override
-  protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws 
Exception {
+  public <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws 
Exception {
     // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
     //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
     //       it since these records will be put into queue of 
BoundedInMemoryExecutor.
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 85d4a92311b..04a05a1d6f0 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -53,6 +53,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+
 /**
  * Record Reader implementation to merge fresh avro data with base parquet 
data, to support real time queries.
  */
@@ -67,10 +69,10 @@ public abstract class AbstractRealtimeRecordReader {
   private Schema readerSchema;
   private Schema writerSchema;
   private Schema hiveSchema;
-  private HoodieTableMetaClient metaClient;
+  private final HoodieTableMetaClient metaClient;
   protected SchemaEvolutionContext schemaEvolutionContext;
   // support merge operation
-  protected boolean supportPayload = true;
+  protected boolean supportPayload;
   // handle hive type to avro record
   protected HiveAvroSerializer serializer;
   private boolean supportTimestamp;
@@ -149,11 +151,11 @@ public abstract class AbstractRealtimeRecordReader {
         partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
             : new ArrayList<>();
     writerSchema = 
HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, 
partitioningFields);
-    List<String> projectionFields = 
HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-        jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), 
partitioningFields);
+    List<String> projectionFields = 
HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
 EMPTY_STRING),
+        jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
EMPTY_STRING), partitioningFields);
 
     Map<String, Field> schemaFieldsMap = 
HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
-    hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS));
+    hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap, 
jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS, EMPTY_STRING));
     // TODO(vc): In the future, the reader schema should be updated based on 
log files & be able
     // to null out fields not present before
 
@@ -166,7 +168,7 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   public Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, 
Field> schemaFieldsMap, String hiveColumnString) {
-    String[] hiveColumns = hiveColumnString.split(",");
+    String[] hiveColumns = hiveColumnString.isEmpty() ? new String[0] : 
hiveColumnString.split(",");
     LOG.info("Hive Columns : " + hiveColumnString);
     List<Field> hiveSchemaFields = new ArrayList<>();
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
new file mode 100644
index 00000000000..1cc8bf91b25
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP;
+import static 
org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
+import static 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
+import static 
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
+
+/**
+ * An implementation of {@link AbstractRealtimeRecordReader} that reads from 
base parquet files and log files,
+ * and merges the records on the fly. It differs from {@link 
HoodieRealtimeRecordReader} in that it does not
+ * implement Hadoop's RecordReader interface, and instead implements Iterator 
interface that returns an iterator
+ * of {@link HoodieRecord}s which are {@link HoodieAvroIndexedRecord}s. This 
can be used by query engines like
+ * Trino that do not use Hadoop's RecordReader interface. However, the engine 
must support reading from iterators
+ * and also support Avro (de)serialization.
+ */
+public class HoodieMergeOnReadSnapshotReader extends 
AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
+
+  private final String tableBasePath;
+  private final List<HoodieLogFile> logFilePaths;
+  private final String latestInstantTime;
+  private final Schema readerSchema;
+  private final JobConf jobConf;
+  private final HoodieMergedLogRecordScanner logRecordScanner;
+  private final HoodieFileReader baseFileReader;
+  private final Map<String, HoodieRecord> logRecordsByKey;
+  private final Iterator<HoodieRecord> recordsIterator;
+  private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;
+
+  /**
+   * In order to instantiate this record reader, one needs to provide 
following parameters.
+   * An example usage is demonstrated in TestHoodieMergeOnReadSnapshotReader.
+   *
+   * @param tableBasePath     Base path of the Hudi table
+   * @param baseFilePath      Path of the base file as of the latest instant 
time for the split being processed
+   * @param logFilePaths      Paths of the log files as of the latest file 
slices pertaining to file group id of the base file
+   * @param latestInstantTime Latest instant time
+   * @param readerSchema      Schema of the reader
+   * @param jobConf           Any job configuration
+   * @param start             Start offset
+   * @param length            Length of the split
+   */
+  public HoodieMergeOnReadSnapshotReader(String tableBasePath,
+                                         String baseFilePath,
+                                         List<HoodieLogFile> logFilePaths,
+                                         String latestInstantTime,
+                                         Schema readerSchema,
+                                         JobConf jobConf,
+                                         long start,
+                                         long length) throws IOException {
+    super(getRealtimeSplit(tableBasePath, baseFilePath, logFilePaths, 
latestInstantTime, start, length, new String[0]), jobConf);
+    this.tableBasePath = tableBasePath;
+    this.logFilePaths = logFilePaths;
+    this.latestInstantTime = latestInstantTime;
+    this.readerSchema = readerSchema;
+    this.jobConf = jobConf;
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    this.logRecordScanner = getMergedLogRecordScanner();
+    LOG.debug("Time taken to scan log records: {}", timer.endTimer());
+    this.baseFileReader = getBaseFileReader(new Path(baseFilePath), jobConf);
+    this.logRecordsByKey = logRecordScanner.getRecords();
+    Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
+    this.mergedRecordsByKey = new ExternalSpillableMap<>(
+        getMaxCompactionMemoryInBytes(jobConf),
+        jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH),
+        new DefaultSizeEstimator(),
+        new HoodieRecordSizeEstimator(readerSchema),
+        jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
+        jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
+    try (ClosableIterator<String> baseFileIterator = 
baseFileReader.getRecordKeyIterator()) {
+      timer.startTimer();
+      while (baseFileIterator.hasNext()) {
+        String key = baseFileIterator.next();
+        if (logRecordKeys.contains(key)) {
+          logRecordKeys.remove(key);
+          Option<HoodieAvroIndexedRecord> mergedRecord = 
buildGenericRecordWithCustomPayload(logRecordsByKey.get(key));
+          if (mergedRecord.isPresent()) {
+            HoodieRecord hoodieRecord = mergedRecord.get().copy();
+            mergedRecordsByKey.put(key, hoodieRecord);
+          }
+        }
+      }
+    }
+    LOG.debug("Time taken to merge base file and log file records: {}", 
timer.endTimer());
+    this.recordsIterator = mergedRecordsByKey.values().iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return recordsIterator.hasNext();
+  }
+
+  @Override
+  public HoodieRecord next() {
+    return recordsIterator.next();
+  }
+
+  public Map<String, HoodieRecord> getRecordsByKey() {
+    return mergedRecordsByKey;
+  }
+
+  public Iterator<HoodieRecord> getRecordsIterator() {
+    return recordsIterator;
+  }
+
+  public Map<String, HoodieRecord> getLogRecordsByKey() {
+    return logRecordsByKey;
+  }
+
+  private static HoodieRealtimeFileSplit getRealtimeSplit(String 
tableBasePath, String baseFilePath,
+                                                          List<HoodieLogFile> 
logFilePaths,
+                                                          String 
latestInstantTime,
+                                                          long start, long 
length, String[] hosts) {
+    HoodieRealtimePath realtimePath = new HoodieRealtimePath(
+        new Path(baseFilePath).getParent(),
+        baseFilePath,
+        tableBasePath,
+        logFilePaths,
+        latestInstantTime,
+        false, // TODO: Fix this to support incremental queries
+        Option.empty());
+    return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start, 
length, hosts);
+  }
+
+  private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
+    return HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
+        .withBasePath(tableBasePath)
+        .withLogFilePaths(logFilePaths.stream().map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList()))
+        .withReaderSchema(readerSchema)
+        .withLatestInstantTime(latestInstantTime)
+        .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
+        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReverseReader(false)
+        .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
+        
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+        
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
 false))
+        
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
+        .build();
+  }
+
+  private Option<HoodieAvroIndexedRecord> 
buildGenericRecordWithCustomPayload(HoodieRecord record) throws IOException {
+    if (usesCustomPayload) {
+      return record.toIndexedRecord(getWriterSchema(), payloadProps);
+    } else {
+      return record.toIndexedRecord(readerSchema, payloadProps);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (baseFileReader != null) {
+      baseFileReader.close();
+    }
+    if (logRecordScanner != null) {
+      logRecordScanner.close();
+    }
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 51ff0c4c8f6..a5f24954c09 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -65,6 +65,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.createRealtimeFileSplit;
 
 /**
  * Base implementation of the Hive's {@link FileInputFormat} allowing for 
reading of Hudi's
@@ -304,20 +305,12 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
         .collect(Collectors.toList());
   }
 
-  private static HoodieRealtimeFileSplit 
createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, 
String[] hosts) {
-    try {
-      return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, 
hosts), path);
-    } catch (IOException e) {
-      throw new HoodieIOException(String.format("Failed to create instance of 
%s", HoodieRealtimeFileSplit.class.getName()), e);
-    }
-  }
-
   private static HoodieRealtimeBootstrapBaseFileSplit 
createRealtimeBootstrapBaseFileSplit(BootstrapBaseFileSplit split,
-                                                                               
           String basePath,
-                                                                               
           List<HoodieLogFile> logFiles,
-                                                                               
           String maxInstantTime,
-                                                                               
           boolean belongsToIncrementalQuery,
-                                                                               
           Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
+                                                                               
            String basePath,
+                                                                               
            List<HoodieLogFile> logFiles,
+                                                                               
            String maxInstantTime,
+                                                                               
            boolean belongsToIncrementalQuery,
+                                                                               
            Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
     try {
       String[] hosts = split.getLocationInfo() != null ? 
Arrays.stream(split.getLocationInfo())
           .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0];
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
index a424f021c2d..b45f91084c7 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.hadoop.mapred.FileSplit;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -34,9 +35,9 @@ import java.util.List;
  *   <li>Split corresponding to the base file</li>
  *   <li>List of {@link HoodieLogFile} that holds the delta to be merged (upon 
reading)</li>
  * </ol>
- *
+ * <p>
  * This split is correspondent to a single file-slice in the Hudi terminology.
- *
+ * <p>
  * NOTE: If you're adding fields here you need to make sure that you 
appropriately de-/serialize them
  *       in {@link #readFromInput(DataInput)} and {@link 
#writeToOutput(DataOutput)}
  */
@@ -63,11 +64,10 @@ public class HoodieRealtimeFileSplit extends FileSplit 
implements RealtimeSplit
    */
   private Option<HoodieVirtualKeyInfo> virtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {}
+  public HoodieRealtimeFileSplit() {
+  }
 
-  public HoodieRealtimeFileSplit(FileSplit baseSplit,
-                                 HoodieRealtimePath path)
-      throws IOException {
+  public HoodieRealtimeFileSplit(FileSplit baseSplit, HoodieRealtimePath path) 
throws IOException {
     this(baseSplit,
         path.getBasePath(),
         path.getDeltaLogFiles(),
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 70e8f49ca51..c3984c5d171 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -41,6 +41,8 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
 import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.slf4j.Logger;
@@ -187,16 +190,17 @@ public class HoodieInputFormatUtils {
   /**
    * Filter any specific instants that we do not want to process.
    * example timeline:
-   *
+   * <p>
    * t0 -> create bucket1.parquet
    * t1 -> create and append updates bucket1.log
    * t2 -> request compaction
    * t3 -> create bucket2.parquet
-   *
+   * <p>
    * if compaction at t2 takes a long time, incremental readers on RO tables 
can move to t3 and would skip updates in t1
-   *
+   * <p>
    * To workaround this problem, we want to stop returning data belonging to 
commits > t2.
    * After compaction is complete, incremental reader would see updates in t2, 
t3, so on.
+   *
    * @param timeline
    * @return
    */
@@ -220,6 +224,7 @@ public class HoodieInputFormatUtils {
 
   /**
    * Extract partitions touched by the commitsToCheck.
+   *
    * @param commitsToCheck
    * @param tableMetaClient
    * @param timeline
@@ -228,9 +233,9 @@ public class HoodieInputFormatUtils {
    * @throws IOException
    */
   public static Option<String> getAffectedPartitions(List<HoodieInstant> 
commitsToCheck,
-                                      HoodieTableMetaClient tableMetaClient,
-                                      HoodieTimeline timeline,
-                                      List<Path> inputPaths) throws 
IOException {
+                                                     HoodieTableMetaClient 
tableMetaClient,
+                                                     HoodieTimeline timeline,
+                                                     List<Path> inputPaths) 
throws IOException {
     Set<String> partitionsToList = new HashSet<>();
     for (HoodieInstant commit : commitsToCheck) {
       HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
@@ -274,6 +279,7 @@ public class HoodieInputFormatUtils {
 
   /**
    * Extract HoodieTimeline based on HoodieTableMetaClient.
+   *
    * @param job
    * @param tableMetaClient
    * @return
@@ -298,6 +304,7 @@ public class HoodieInputFormatUtils {
 
   /**
    * Get commits for incremental query from Hive map reduce configuration.
+   *
    * @param job
    * @param tableName
    * @param timeline
@@ -325,6 +332,7 @@ public class HoodieInputFormatUtils {
 
   /**
    * Extract HoodieTableMetaClient by partition path.
+   *
    * @param conf       The hadoop conf
    * @param partitions The partitions
    * @return partition path to table meta client mapping
@@ -371,7 +379,7 @@ public class HoodieInputFormatUtils {
   public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws 
IOException {
     if (baseFile.getBootstrapBaseFile().isPresent()) {
       if (baseFile.getFileStatus() instanceof LocatedFileStatus) {
-        return new 
LocatedFileStatusWithBootstrapBaseFile((LocatedFileStatus)baseFile.getFileStatus(),
+        return new LocatedFileStatusWithBootstrapBaseFile((LocatedFileStatus) 
baseFile.getFileStatus(),
             baseFile.getBootstrapBaseFile().get().getFileStatus());
       } else {
         return new FileStatusWithBootstrapBaseFile(baseFile.getFileStatus(),
@@ -383,6 +391,7 @@ public class HoodieInputFormatUtils {
 
   /**
    * Filter a list of FileStatus based on commitsToCheck for incremental view.
+   *
    * @param job
    * @param tableMetaClient
    * @param timeline
@@ -391,7 +400,7 @@ public class HoodieInputFormatUtils {
    * @return
    */
   public static List<FileStatus> filterIncrementalFileStatus(Job job, 
HoodieTableMetaClient tableMetaClient,
-      HoodieTimeline timeline, FileStatus[] fileStatuses, List<HoodieInstant> 
commitsToCheck) throws IOException {
+                                                             HoodieTimeline 
timeline, FileStatus[] fileStatuses, List<HoodieInstant> commitsToCheck) throws 
IOException {
     TableFileSystemView.BaseFileOnlyView roView = new 
HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses);
     List<String> commitsList = 
commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
     List<HoodieBaseFile> filteredFiles = 
roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList());
@@ -452,7 +461,7 @@ public class HoodieInputFormatUtils {
     for (Path path : snapshotPaths) {
       // Find meta client associated with the input path
       metaClientList.stream().filter(metaClient -> 
path.toString().contains(metaClient.getBasePath()))
-              .forEach(metaClient -> grouped.get(metaClient).add(path));
+          .forEach(metaClient -> grouped.get(metaClient).add(path));
     }
     return grouped;
   }
@@ -467,6 +476,7 @@ public class HoodieInputFormatUtils {
    * Checks the file status for a race condition which can set the file size 
to 0. 1. HiveInputFormat does
    * super.listStatus() and gets back a FileStatus[] 2. Then it creates the 
HoodieTableMetaClient for the paths listed.
    * 3. Generation of splits looks at FileStatus size to create splits, which 
skips this file
+   *
    * @param conf
    * @param dataFile
    * @return
@@ -493,14 +503,13 @@ public class HoodieInputFormatUtils {
    *
    * @param basePath     The table base path
    * @param metadataList The metadata list to read the data from
-   *
    * @return the affected file status array
    */
   public static FileStatus[] listAffectedFilesForCommits(Configuration 
hadoopConf, Path basePath, List<HoodieCommitMetadata> metadataList) {
     // TODO: Use HoodieMetaTable to extract affected file directly.
     HashMap<String, FileStatus> fullPathToFileStatus = new HashMap<>();
     // Iterate through the given commits.
-    for (HoodieCommitMetadata metadata: metadataList) {
+    for (HoodieCommitMetadata metadata : metadataList) {
       fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, 
basePath.toString()));
     }
     return fullPathToFileStatus.values().toArray(new FileStatus[0]);
@@ -518,4 +527,12 @@ public class HoodieInputFormatUtils {
         .flatMap(Collection::stream)
         .collect(Collectors.toSet());
   }
+
+  public static HoodieRealtimeFileSplit 
createRealtimeFileSplit(HoodieRealtimePath path, long start, long length, 
String[] hosts) {
+    try {
+      return new HoodieRealtimeFileSplit(new FileSplit(path, start, length, 
hosts), path);
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to create instance of 
%s", HoodieRealtimeFileSplit.class.getName()), e);
+    }
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 6d010e12cdd..a6d1cf66acb 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,9 +18,12 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.JsonProperties;
@@ -30,6 +33,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -46,6 +50,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -158,7 +163,7 @@ public class HoodieRealtimeRecordReaderUtils {
       case STRING:
         return new Text(value.toString());
       case BYTES:
-        return new BytesWritable(((ByteBuffer)value).array());
+        return new BytesWritable(((ByteBuffer) value).array());
       case INT:
         if (schema.getLogicalType() != null && 
schema.getLogicalType().getName().equals("date")) {
           return HoodieHiveUtils.getDateWriteable((Integer) value);
@@ -297,6 +302,10 @@ public class HoodieRealtimeRecordReaderUtils {
     return appendNullSchemaFields(schema, fieldsToAdd);
   }
 
+  public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) 
throws IOException {
+    return 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(conf,
 path);
+  }
+
   private static Schema appendNullSchemaFields(Schema schema, List<String> 
newFieldNames) {
     List<Schema.Field> newFields = new ArrayList<>();
     for (String newField : newFieldNames) {
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
new file mode 100644
index 00000000000..b37b4170a0c
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.fs.FSUtils.getFs;
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static 
org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieMergeOnReadSnapshotReader {
+
+  private static final int TOTAL_RECORDS = 100;
+  private static final String FILE_ID = "fileid0";
+  private JobConf baseJobConf;
+  private FileSystem fs;
+  private Configuration hadoopConf;
+
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  @BeforeEach
+  public void setUp() {
+    hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    hadoopConf.set("fs.defaultFS", "file:///");
+    hadoopConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
+    baseJobConf = new JobConf(hadoopConf);
+    baseJobConf.set(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
String.valueOf(1024 * 1024));
+    fs = getFs(basePath.toUri().toString(), baseJobConf);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(new Path(basePath.toString()), true);
+      fs.close();
+    }
+  }
+
+  @Test
+  public void testSnapshotReader() throws Exception {
+    testReaderInternal(false, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+  }
+
+  @Test
+  public void testSnapshotReaderPartitioned() throws Exception {
+    testReaderInternal(true, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+  }
+
+  private void testReaderInternal(boolean partitioned, 
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
+    // initial commit
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+    HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
+    String baseInstant = "100";
+    File partitionDir = partitioned ? 
InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, TOTAL_RECORDS, 
baseInstant,
+        HoodieTableType.MERGE_ON_READ)
+        : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, 
schema, 1, TOTAL_RECORDS, baseInstant,
+        HoodieTableType.MERGE_ON_READ);
+
+    HoodieCommitMetadata commitMetadata = 
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), 
Option.empty(), WriteOperationType.UPSERT,
+        schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
+    FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant, 
commitMetadata);
+    // Add the paths
+    FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
+
+    List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
+    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
+    logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
+    String baseFilePath = partitionDir + "/" + FILE_ID + "_1-0-1_" + 
baseInstant + ".parquet";
+    String partitionPath = partitioned ? getRelativePartitionPath(new 
Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) : 
"default";
+    FileSlice fileSlice = new FileSlice(
+        new HoodieFileGroupId(partitionPath, FILE_ID),
+        baseInstant,
+        new HoodieBaseFile(fs.getFileStatus(new Path(baseFilePath))),
+        new ArrayList<>());
+    logVersionsWithAction.forEach(logVersionWithAction -> {
+      try {
+        // update files or generate new log file
+        int logVersion = logVersionWithAction.getRight();
+        String action = logVersionWithAction.getKey();
+        int baseInstantTs = Integer.parseInt(baseInstant);
+        String instantTime = String.valueOf(baseInstantTs + logVersion);
+        String latestInstant =
+            action.equals(HoodieTimeline.ROLLBACK_ACTION) ? 
String.valueOf(baseInstantTs + logVersion - 2)
+                : instantTime;
+
+        HoodieLogFormat.Writer writer = writeDataBlockToLogFile(
+            partitionDir,
+            fs,
+            schema,
+            FILE_ID,
+            baseInstant,
+            latestInstant,
+            120,
+            0,
+            logVersion,
+            logBlockType);
+        long size = writer.getCurrentSize();
+        writer.close();
+        assertTrue(size > 0, "block - size should be > 0");
+        FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime, 
commitMetadata);
+        fileSlice.addLogFile(writer.getLogFile());
+
+        HoodieMergeOnReadSnapshotReader snapshotReader = new 
HoodieMergeOnReadSnapshotReader(
+            basePath.toString(),
+            fileSlice.getBaseFile().get().getPath(),
+            fileSlice.getLogFiles().collect(Collectors.toList()),
+            latestInstant,
+            schema,
+            baseJobConf,
+            0,
+            size);
+        Map<String, HoodieRecord> records = snapshotReader.getRecordsByKey();
+        assertEquals(TOTAL_RECORDS, records.size());
+        snapshotReader.close();
+      } catch (Exception ioe) {
+        throw new HoodieException(ioe.getMessage(), ioe);
+      }
+    });
+  }
+}


Reply via email to