This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f792643657e [HUDI-7341] Support unmerged record read (#10632)
f792643657e is described below

commit f792643657ebba69edd2b2aeeb4a37d15c39beba
Author: Lin Liu <[email protected]>
AuthorDate: Sat Mar 2 12:58:15 2024 -0800

    [HUDI-7341] Support unmerged record read (#10632)
---
 .../hudi/common/engine/HoodieReaderContext.java    |   7 +
 .../table/read/HoodieFileGroupRecordBuffer.java    |   8 +-
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |   2 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |   2 +-
 .../read/HoodieUnmergedFileGroupRecordBuffer.java  | 146 +++++++++++++++++++++
 .../testutils/reader/HoodieTestReaderContext.java  |   9 ++
 6 files changed, 170 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 1d81007c375..86a875c9df3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -219,4 +219,11 @@ public abstract class HoodieReaderContext<T> {
   public long extractRecordPosition(T record, Schema schema, String fieldName, 
long providedPositionIfNeeded) {
     return providedPositionIfNeeded;
   }
+
+  /**
+   * Constructs engine specific delete record.
+   */
+  public T constructRawDeleteRecord(Map<String, Object> metadata) {
+    return null;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index ccc001e79c9..d9ba8bcd90e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -34,8 +34,12 @@ import java.util.Map;
 
 public interface HoodieFileGroupRecordBuffer<T> {
   enum BufferType {
-    KEY_BASED,
-    POSITION_BASED
+    // Merging based on record key.
+    KEY_BASED_MERGE,
+    // Merging based on record position.
+    POSITION_BASED_MERGE,
+    // No Merging at all.
+    UNMERGED
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index b4e32be8c65..0430a42e863 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -65,7 +65,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
 
   @Override
   public BufferType getBufferType() {
-    return BufferType.KEY_BASED;
+    return BufferType.KEY_BASED_MERGE;
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 4412713928f..50e969343e1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -72,7 +72,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
 
   @Override
   public BufferType getBufferType() {
-    return BufferType.POSITION_BASED;
+    return BufferType.POSITION_BASED_MERGE;
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..76aa28308c4
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+
+public class HoodieUnmergedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+  // Used to order the records in the record map.
+  private Long putIndex = 0L;
+  private Long getIndex = 0L;
+
+  public HoodieUnmergedFileGroupRecordBuffer(
+      HoodieReaderContext<T> readerContext,
+      Schema readerSchema,
+      Schema baseFileSchema,
+      Option<String> partitionNameOverrideOpt,
+      Option<String[]> partitionPathFieldOpt,
+      HoodieRecordMerger recordMerger,
+      TypedProperties payloadProps,
+      long maxMemorySizeInBytes,
+      String spillableMapBasePath,
+      ExternalSpillableMap.DiskMapType diskMapType,
+      boolean isBitCaskDiskMapCompressionEnabled) {
+    super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
+        recordMerger, payloadProps, maxMemorySizeInBytes, 
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+  }
+
+  @Override
+  protected boolean doHasNext() throws IOException {
+    ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
+
+    // Output from base file first.
+    if (baseFileIterator.hasNext()) {
+      nextRecord = baseFileIterator.next();
+      return true;
+    }
+
+    // Output records based on the index to preserve the order.
+    if (!records.isEmpty()) {
+      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
records.remove(getIndex++);
+
+      if (nextRecordInfo == null) {
+        throw new HoodieException("Row index should be continuous!");
+      }
+
+      if (nextRecordInfo.getLeft().isPresent()) {
+        nextRecord = nextRecordInfo.getKey().get();
+      } else {
+        nextRecord = 
readerContext.constructRawDeleteRecord(nextRecordInfo.getRight());
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() 
{
+    return records.values().iterator();
+  }
+
+  @Override
+  public BufferType getBufferType() {
+    return BufferType.UNMERGED;
+  }
+
+  @Override
+  public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> 
keySpecOpt) {
+    Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
+        getRecordsIterator(dataBlock, keySpecOpt);
+    if (dataBlock.containsPartialUpdates()) {
+      throw new HoodieException("Partial update is not supported for unmerged 
record read");
+    }
+
+    try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
+      while (recordIterator.hasNext()) {
+        T nextRecord = recordIterator.next();
+        Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+            nextRecord, recordsIteratorSchemaPair.getRight());
+        processNextDataRecord(nextRecord, metadata, putIndex++);
+      }
+    }
+  }
+
+  @Override
+  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable index) {
+    records.put(index, Pair.of(Option.ofNullable(readerContext.seal(record)), 
metadata));
+  }
+
+  @Override
+  public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
+    Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+    while (it.hasNext()) {
+      DeleteRecord record = it.next();
+      processNextDeletedRecord(record, putIndex++);
+    }
+  }
+
+  @Override
+  public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
index) {
+    records.put(index, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
+        deleteRecord.getRecordKey(), deleteRecord.getPartitionPath(), 
deleteRecord.getOrderingValue())));
+  }
+
+  @Override
+  public boolean containsLogRecord(String recordKey) {
+    return records.containsKey(recordKey);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index bd6efa3886b..3aad5e9a0aa 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.common.testutils.reader;
 
+import org.apache.hudi.avro.model.HoodieDeleteRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -208,6 +209,14 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
     };
   }
 
+  @Override
+  public IndexedRecord constructRawDeleteRecord(Map<String, Object> metadata) {
+    return new HoodieDeleteRecord(
+        (String) metadata.get(INTERNAL_META_RECORD_KEY),
+        (String) metadata.get(INTERNAL_META_PARTITION_PATH),
+        metadata.get(INTERNAL_META_ORDERING_FIELD));
+  }
+
   private Object getFieldValueFromIndexedRecord(
       IndexedRecord record,
       Schema recordSchema,

Reply via email to