This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f792643657e [HUDI-7341] Support unmerged record read (#10632)
f792643657e is described below
commit f792643657ebba69edd2b2aeeb4a37d15c39beba
Author: Lin Liu <[email protected]>
AuthorDate: Sat Mar 2 12:58:15 2024 -0800
[HUDI-7341] Support unmerged record read (#10632)
---
.../hudi/common/engine/HoodieReaderContext.java | 7 +
.../table/read/HoodieFileGroupRecordBuffer.java | 8 +-
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 2 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 2 +-
.../read/HoodieUnmergedFileGroupRecordBuffer.java | 146 +++++++++++++++++++++
.../testutils/reader/HoodieTestReaderContext.java | 9 ++
6 files changed, 170 insertions(+), 4 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 1d81007c375..86a875c9df3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -219,4 +219,11 @@ public abstract class HoodieReaderContext<T> {
public long extractRecordPosition(T record, Schema schema, String fieldName,
long providedPositionIfNeeded) {
return providedPositionIfNeeded;
}
+
+ /**
+ * Constructs engine specific delete record.
+ */
+ public T constructRawDeleteRecord(Map<String, Object> metadata) {
+ return null;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index ccc001e79c9..d9ba8bcd90e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -34,8 +34,12 @@ import java.util.Map;
public interface HoodieFileGroupRecordBuffer<T> {
enum BufferType {
- KEY_BASED,
- POSITION_BASED
+ // Merging based on record key.
+ KEY_BASED_MERGE,
+ // Merging based on record position.
+ POSITION_BASED_MERGE,
+ // No Merging at all.
+ UNMERGED
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index b4e32be8c65..0430a42e863 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -65,7 +65,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
@Override
public BufferType getBufferType() {
- return BufferType.KEY_BASED;
+ return BufferType.KEY_BASED_MERGE;
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 4412713928f..50e969343e1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -72,7 +72,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
@Override
public BufferType getBufferType() {
- return BufferType.POSITION_BASED;
+ return BufferType.POSITION_BASED_MERGE;
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..76aa28308c4
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+
+public class HoodieUnmergedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+ // Used to order the records in the record map.
+ private Long putIndex = 0L;
+ private Long getIndex = 0L;
+
+ public HoodieUnmergedFileGroupRecordBuffer(
+ HoodieReaderContext<T> readerContext,
+ Schema readerSchema,
+ Schema baseFileSchema,
+ Option<String> partitionNameOverrideOpt,
+ Option<String[]> partitionPathFieldOpt,
+ HoodieRecordMerger recordMerger,
+ TypedProperties payloadProps,
+ long maxMemorySizeInBytes,
+ String spillableMapBasePath,
+ ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isBitCaskDiskMapCompressionEnabled) {
+ super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
+ recordMerger, payloadProps, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+ }
+
+ @Override
+ protected boolean doHasNext() throws IOException {
+ ValidationUtils.checkState(baseFileIterator != null, "Base file iterator
has not been set yet");
+
+ // Output from base file first.
+ if (baseFileIterator.hasNext()) {
+ nextRecord = baseFileIterator.next();
+ return true;
+ }
+
+ // Output records based on the index to preserve the order.
+ if (!records.isEmpty()) {
+ Pair<Option<T>, Map<String, Object>> nextRecordInfo =
records.remove(getIndex++);
+
+ if (nextRecordInfo == null) {
+ throw new HoodieException("Row index should be continuous!");
+ }
+
+ if (nextRecordInfo.getLeft().isPresent()) {
+ nextRecord = nextRecordInfo.getKey().get();
+ } else {
+ nextRecord =
readerContext.constructRawDeleteRecord(nextRecordInfo.getRight());
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator()
{
+ return records.values().iterator();
+ }
+
+ @Override
+ public BufferType getBufferType() {
+ return BufferType.UNMERGED;
+ }
+
+ @Override
+ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) {
+ Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
+ getRecordsIterator(dataBlock, keySpecOpt);
+ if (dataBlock.containsPartialUpdates()) {
+ throw new HoodieException("Partial update is not supported for unmerged
record read");
+ }
+
+ try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
+ while (recordIterator.hasNext()) {
+ T nextRecord = recordIterator.next();
+ Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+ nextRecord, recordsIteratorSchemaPair.getRight());
+ processNextDataRecord(nextRecord, metadata, putIndex++);
+ }
+ }
+ }
+
+ @Override
+ public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable index) {
+ records.put(index, Pair.of(Option.ofNullable(readerContext.seal(record)),
metadata));
+ }
+
+ @Override
+ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
+ Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+ while (it.hasNext()) {
+ DeleteRecord record = it.next();
+ processNextDeletedRecord(record, putIndex++);
+ }
+ }
+
+ @Override
+ public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
index) {
+ records.put(index, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
+ deleteRecord.getRecordKey(), deleteRecord.getPartitionPath(),
deleteRecord.getOrderingValue())));
+ }
+
+ @Override
+ public boolean containsLogRecord(String recordKey) {
+ return records.containsKey(recordKey);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index bd6efa3886b..3aad5e9a0aa 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.testutils.reader;
+import org.apache.hudi.avro.model.HoodieDeleteRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -208,6 +209,14 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
};
}
+ @Override
+ public IndexedRecord constructRawDeleteRecord(Map<String, Object> metadata) {
+ return new HoodieDeleteRecord(
+ (String) metadata.get(INTERNAL_META_RECORD_KEY),
+ (String) metadata.get(INTERNAL_META_PARTITION_PATH),
+ metadata.get(INTERNAL_META_ORDERING_FIELD));
+ }
+
private Object getFieldValueFromIndexedRecord(
IndexedRecord record,
Schema recordSchema,