This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e0ca6dd0d52 [HUDI-7652] Add new `HoodieMergeKey` API to support simple
and composite keys (#11077)
e0ca6dd0d52 is described below
commit e0ca6dd0d52c4171d5b4ee83cbc7ef684cc471dc
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri May 17 14:57:29 2024 +0530
[HUDI-7652] Add new `HoodieMergeKey` API to support simple and composite
keys (#11077)
Introduce a new abstract class `BaseHoodieMergedLogRecordScanner` which
subclasses from `AbstractHoodieLogRecordReader`. The new abstract class
holds the `records` map as `ExternalSpillableMap<K, HoodieRecord>` and
exposes
`public abstract Map<K, HoodieRecord> getRecords()` API. The existing
`HoodieMergedLogRecordScanner` now derives from the new abstract class
(instead of `AbstractHoodieLogRecordReader`) and uses String keys.
---
.../hudi/client/TestJavaHoodieBackedMetadata.java | 2 +-
.../functional/TestHoodieBackedMetadata.java | 2 +-
.../functional/TestHoodieBackedTableMetadata.java | 2 +-
.../common/model/HoodieMetadataRecordMerger.java | 43 ++++
.../hudi/common/model/HoodieRecordMerger.java | 8 +
.../table/log/AbstractHoodieLogRecordReader.java | 25 ++
.../log/BaseHoodieMergedLogRecordScanner.java | 260 ++++++++++++++++++++
.../table/log/HoodieMergedLogRecordScanner.java | 265 +++-----------------
.../log/HoodieMetadataMergedLogRecordScanner.java | 270 +++++++++++++++++++++
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../metadata/HoodieMetadataLogRecordReader.java | 41 +++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 3 +-
.../model/TestHoodieMetadataRecordMerger.java | 65 +++++
.../apache/hudi/functional/TestMORDataSource.scala | 2 +-
14 files changed, 740 insertions(+), 250 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index ae17a34da3e..c2b85bd70b0 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -946,7 +946,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
if (enableMetaFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
- HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
.withStorage(metadataMetaClient.getStorage())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 700b9f1cd24..4da78d84980 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1413,7 +1413,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
if (enableMetaFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
- HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
.withStorage(metadataMetaClient.getStorage())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index d3dcf94f641..cec201ee754 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -491,7 +491,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
*/
private void verifyMetadataMergedRecords(HoodieTableMetaClient
metadataMetaClient, List<String> logFilePaths, String latestCommitTimestamp) {
Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
- HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordReader =
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
.withStorage(metadataMetaClient.getStorage())
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
new file mode 100644
index 00000000000..c118cd803bc
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Record merger that accumulates metadata records.
+ */
+public class HoodieMetadataRecordMerger extends
HoodiePreCombineAvroRecordMerger {
+
+ public static final HoodieMetadataRecordMerger INSTANCE = new
HoodieMetadataRecordMerger();
+
+ @Override
+ public List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props)
throws IOException {
+ // TODO: Implement this method for secondary keys. Currently, it just
mimics the superclass.
+ return Collections.singletonList(super.merge(older, oldSchema, newer,
newSchema, props).get());
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index cdd41242ef1..62cde38a351 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
/**
* HoodieMerge defines how to merge two records. It is a stateless component.
@@ -126,6 +127,13 @@ public interface HoodieRecordMerger extends Serializable {
return true;
}
+ /**
+ * Merges two records with the same key in full outer merge fashion i.e. all
fields from both records are included.
+ */
+ default List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props)
throws IOException {
+ throw new UnsupportedOperationException("Partial merging logic is not
implemented.");
+ }
+
default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
ArrayList<String> requiredFields = new ArrayList<>();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index c545640e3eb..77f91a5b540 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -864,6 +865,30 @@ public abstract class AbstractHoodieLogRecordReader {
throw new UnsupportedOperationException();
}
+ public Builder withKeyFieldOverride(String keyFieldOverride) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder withForceFullScan(boolean forceFullScan) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder withDiskMapType(ExternalSpillableMap.DiskMapType
diskMapType) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder withBitCaskDiskMapCompressionEnabled(boolean
bitCaskDiskMapCompressionEnabled) {
+ throw new UnsupportedOperationException();
+ }
+
public Builder withTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
new file mode 100644
index 00000000000..17a6284d20e
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public abstract class BaseHoodieMergedLogRecordScanner<K extends Serializable>
extends AbstractHoodieLogRecordReader
+ implements Iterable<HoodieRecord>, Closeable {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseHoodieMergedLogRecordScanner.class);
+ // A timer for calculating elapsed time in millis
+ public final HoodieTimer timer = HoodieTimer.create();
+ // Map of compacted/merged records
+ protected final ExternalSpillableMap<K, HoodieRecord> records;
+ // Set of already scanned prefixes allowing us to avoid scanning same
prefixes again
+ private final Set<String> scannedPrefixes;
+ // count of merged records in log
+ private long numMergedRecordsInLog;
+ private final long maxMemorySizeInBytes;
+ // Stores the total time taken to perform reading and merging of log blocks
+ private long totalTimeTakenToReadAndMergeBlocks;
+
+ @SuppressWarnings("unchecked")
+ protected BaseHoodieMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, Long
maxMemorySizeInBytes,
+ boolean reverseReader, int
bufferSize, String spillableMapBasePath,
+ Option<InstantRange> instantRange,
+ ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled,
+ boolean withOperationField,
boolean forceFullScan,
+ Option<String> partitionName,
+ InternalSchema internalSchema,
+ Option<String> keyFieldOverride,
+ boolean
enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize,
+ instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+ hoodieTableMetaClientOption);
+ try {
+ this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+ // Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
+ this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+ this.scannedPrefixes = new HashSet<>();
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
+ }
+
+ if (forceFullScan) {
+ performScan();
+ }
+ }
+
+ /**
+ * Scans delta-log files processing blocks
+ */
+ public final void scan() {
+ scan(false);
+ }
+
+ public final void scan(boolean skipProcessingBlocks) {
+ if (forceFullScan) {
+ // NOTE: When full-scan is enforced, scanning is invoked upfront (during
initialization)
+ return;
+ }
+
+ scanInternal(Option.empty(), skipProcessingBlocks);
+ }
+
+ /**
+ * Provides incremental scanning capability where only provided keys will be
looked
+ * up in the delta-log files, scanned and subsequently materialized into the
internal
+ * cache
+ *
+ * @param keys to be looked up
+ */
+ public void scanByFullKeys(List<String> keys) {
+ // We can skip scanning in case reader is in full-scan mode, in which case
all blocks
+ // are processed upfront (no additional scanning is necessary)
+ if (forceFullScan) {
+ return; // no-op
+ }
+
+ List<String> missingKeys = keys.stream()
+ .filter(key -> !records.containsKey(key))
+ .collect(Collectors.toList());
+
+ if (missingKeys.isEmpty()) {
+ // All the required records are already fetched, no-op
+ return;
+ }
+
+ scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
+ }
+
+ /**
+ * Provides incremental scanning capability where only keys matching
provided key-prefixes
+ * will be looked up in the delta-log files, scanned and subsequently
materialized into
+ * the internal cache
+ *
+ * @param keyPrefixes to be looked up
+ */
+ public void scanByKeyPrefixes(List<String> keyPrefixes) {
+ // We can skip scanning in case reader is in full-scan mode, in which case
all blocks
+ // are processed upfront (no additional scanning is necessary)
+ if (forceFullScan) {
+ return;
+ }
+
+ List<String> missingKeyPrefixes = keyPrefixes.stream()
+ .filter(keyPrefix ->
+ // NOTE: We can skip scanning the prefixes that have already
+ // been covered by the previous scans
+ scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
+ .collect(Collectors.toList());
+
+ if (missingKeyPrefixes.isEmpty()) {
+ // All the required records are already fetched, no-op
+ return;
+ }
+
+ // NOTE: When looking up by key-prefixes unfortunately we can't
short-circuit
+ // and will have to scan every time as we can't know (based on just
+ // the records cached) whether particular prefix was scanned or just
records
+ // matching the prefix looked up (by [[scanByFullKeys]] API)
+ scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
+ scannedPrefixes.addAll(missingKeyPrefixes);
+ }
+
+ private void performScan() {
+ // Do the scan and merge
+ timer.startTimer();
+
+ scanInternal(Option.empty(), false);
+
+ this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
+ this.numMergedRecordsInLog = records.size();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Number of log files scanned => {}", logFilePaths.size());
+ LOG.info("MaxMemoryInBytes allowed for compaction => {}",
maxMemorySizeInBytes);
+ LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap =>
{}", records.getInMemoryMapNumEntries());
+ LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap
=> {}", records.getCurrentInMemoryMapSize());
+ LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap =>
{}", records.getDiskBasedMapNumEntries());
+ LOG.info("Size of file spilled to disk => {}",
records.getSizeOfFileOnDiskInBytes());
+ }
+ }
+
+ @Override
+ public Iterator<HoodieRecord> iterator() {
+ return records.iterator();
+ }
+
+ @Override
+ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+ String key = deleteRecord.getRecordKey();
+ HoodieRecord oldRecord = records.get(key);
+ if (oldRecord != null) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order(arrival time
semantics).
+
+ Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps());
+ Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean choosePrev = !deleteOrderingVal.equals(0)
+ && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal)
+ && curOrderingVal.compareTo(deleteOrderingVal) > 0;
+ if (choosePrev) {
+ // The DELETE message is obsolete if the old message has greater
orderingVal.
+ return;
+ }
+ }
+ // Put the DELETE record
+ if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
+ records.put((K) key, SpillableMapUtils.generateEmptyPayload(key,
+ deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(),
getPayloadClassFQN()));
+ } else {
+ HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key,
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(),
recordType);
+ records.put((K) key, record);
+ }
+ }
+
+ public abstract Map<K, HoodieRecord> getRecords();
+
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return recordMerger.getRecordType();
+ }
+
+ public long getNumMergedRecordsInLog() {
+ return numMergedRecordsInLog;
+ }
+
+ protected static <T> HoodieRecord getLatestHoodieRecord(HoodieRecord<T>
newRecord, HoodieRecord<T> combinedRecord, String key) {
+ HoodieRecord latestHoodieRecord =
+ combinedRecord.newInstance(new HoodieKey(key,
newRecord.getPartitionPath()), newRecord.getOperation());
+
+ latestHoodieRecord.unseal();
+ latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
+ latestHoodieRecord.seal();
+ return latestHoodieRecord;
+ }
+
+ public long getTotalTimeTakenToReadAndMergeBlocks() {
+ return totalTimeTakenToReadAndMergeBlocks;
+ }
+
+ @Override
+ public void close() {
+ if (records != null) {
+ records.close();
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index d29ee7bd46b..8679e2455ef 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -18,48 +18,33 @@
package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieEmptyRecord;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
-import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.SpillableMapUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.Closeable;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static
org.apache.hudi.common.table.cdc.HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a
compacted/merged list of records which will
@@ -74,172 +59,31 @@ import static
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
* This results in two I/O passes over the log file.
*/
@NotThreadSafe
-public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
- implements Iterable<HoodieRecord>, Closeable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
- // A timer for calculating elapsed time in millis
- public final HoodieTimer timer = HoodieTimer.create();
- // Map of compacted/merged records
- private final ExternalSpillableMap<String, HoodieRecord> records;
- // Set of already scanned prefixes allowing us to avoid scanning same
prefixes again
- private final Set<String> scannedPrefixes;
- // count of merged records in log
- private long numMergedRecordsInLog;
- private final long maxMemorySizeInBytes;
- // Stores the total time taken to perform reading and merging of log blocks
- private long totalTimeTakenToReadAndMergeBlocks;
+public class HoodieMergedLogRecordScanner extends
BaseHoodieMergedLogRecordScanner<String> {
@SuppressWarnings("unchecked")
- private HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath,
List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, Long
maxMemorySizeInBytes,
- boolean reverseReader, int bufferSize,
String spillableMapBasePath,
- Option<InstantRange> instantRange,
- ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isBitCaskDiskMapCompressionEnabled,
- boolean withOperationField, boolean
forceFullScan,
- Option<String> partitionName,
- InternalSchema internalSchema,
- Option<String> keyFieldOverride,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
- Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
- super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize,
- instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
- hoodieTableMetaClientOption);
- try {
- this.maxMemorySizeInBytes = maxMemorySizeInBytes;
- // Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
- this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
- this.scannedPrefixes = new HashSet<>();
- } catch (IOException e) {
- throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
- }
-
- if (forceFullScan) {
- performScan();
- }
- }
-
- /**
- * Scans delta-log files processing blocks
- */
- public final void scan() {
- scan(false);
- }
-
- public final void scan(boolean skipProcessingBlocks) {
- if (forceFullScan) {
- // NOTE: When full-scan is enforced, scanning is invoked upfront (during
initialization)
- return;
- }
-
- scanInternal(Option.empty(), skipProcessingBlocks);
- }
-
- /**
- * Provides incremental scanning capability where only provided keys will be
looked
- * up in the delta-log files, scanned and subsequently materialized into the
internal
- * cache
- *
- * @param keys to be looked up
- */
- public void scanByFullKeys(List<String> keys) {
- // We can skip scanning in case reader is in full-scan mode, in which case
all blocks
- // are processed upfront (no additional scanning is necessary)
- if (forceFullScan) {
- return; // no-op
- }
-
- List<String> missingKeys = keys.stream()
- .filter(key -> !records.containsKey(key))
- .collect(Collectors.toList());
-
- if (missingKeys.isEmpty()) {
- // All the required records are already fetched, no-op
- return;
- }
-
- scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
- }
-
- /**
- * Provides incremental scanning capability where only keys matching
provided key-prefixes
- * will be looked up in the delta-log files, scanned and subsequently
materialized into
- * the internal cache
- *
- * @param keyPrefixes to be looked up
- */
- public void scanByKeyPrefixes(List<String> keyPrefixes) {
- // We can skip scanning in case reader is in full-scan mode, in which case
all blocks
- // are processed upfront (no additional scanning is necessary)
- if (forceFullScan) {
- return;
- }
-
- List<String> missingKeyPrefixes = keyPrefixes.stream()
- .filter(keyPrefix ->
- // NOTE: We can skip scanning the prefixes that have already
- // been covered by the previous scans
- scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
- .collect(Collectors.toList());
-
- if (missingKeyPrefixes.isEmpty()) {
- // All the required records are already fetched, no-op
- return;
- }
-
- // NOTE: When looking up by key-prefixes unfortunately we can't
short-circuit
- // and will have to scan every time as we can't know (based on just
- // the records cached) whether particular prefix was scanned or just
records
- // matching the prefix looked up (by [[scanByFullKeys]] API)
- scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
- scannedPrefixes.addAll(missingKeyPrefixes);
- }
-
- private void performScan() {
- // Do the scan and merge
- timer.startTimer();
-
- scanInternal(Option.empty(), false);
-
- this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
- this.numMergedRecordsInLog = records.size();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Number of log files scanned => {}", logFilePaths.size());
- LOG.info("MaxMemoryInBytes allowed for compaction => {}",
maxMemorySizeInBytes);
- LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap =>
{}", records.getInMemoryMapNumEntries());
- LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap
=> {}", records.getCurrentInMemoryMapSize());
- LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap =>
{}", records.getDiskBasedMapNumEntries());
- LOG.info("Size of file spilled to disk => {}",
records.getSizeOfFileOnDiskInBytes());
- }
+ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, Long
maxMemorySizeInBytes,
+ boolean reverseReader, int
bufferSize, String spillableMapBasePath,
+ Option<InstantRange> instantRange,
+ ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled,
+ boolean withOperationField, boolean
forceFullScan,
+ Option<String> partitionName,
+ InternalSchema internalSchema,
+ Option<String> keyFieldOverride,
+ boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
maxMemorySizeInBytes, reverseReader, bufferSize,
+ spillableMapBasePath, instantRange, diskMapType,
isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
+ partitionName, internalSchema, keyFieldOverride,
enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption);
}
@Override
- public Iterator<HoodieRecord> iterator() {
- return records.iterator();
- }
-
public Map<String, HoodieRecord> getRecords() {
return records;
}
- public HoodieRecordType getRecordType() {
- return recordMerger.getRecordType();
- }
-
- public long getNumMergedRecordsInLog() {
- return numMergedRecordsInLog;
- }
-
- /**
- * Returns the builder for {@code HoodieMergedLogRecordScanner}.
- */
- public static HoodieMergedLogRecordScanner.Builder newBuilder() {
- return new Builder();
- }
-
@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
String key = newRecord.getRecordKey();
@@ -250,12 +94,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
- HoodieRecord latestHoodieRecord =
- combinedRecord.newInstance(new HoodieKey(key,
newRecord.getPartitionPath()), newRecord.getOperation());
-
- latestHoodieRecord.unseal();
- latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
- latestHoodieRecord.seal();
+ HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord,
combinedRecord, key);
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
@@ -271,50 +110,15 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
}
}
- @Override
- protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
- String key = deleteRecord.getRecordKey();
- HoodieRecord oldRecord = records.get(key);
- if (oldRecord != null) {
- // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
- // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
- // For same ordering values, uses the natural order(arrival time
semantics).
-
- Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps());
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean choosePrev = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal)
- && curOrderingVal.compareTo(deleteOrderingVal) > 0;
- if (choosePrev) {
- // The DELETE message is obsolete if the old message has greater
orderingVal.
- return;
- }
- }
- // Put the DELETE record
- if (recordType == HoodieRecordType.AVRO) {
- records.put(key, SpillableMapUtils.generateEmptyPayload(key,
- deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(),
getPayloadClassFQN()));
- } else {
- HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key,
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(),
recordType);
- records.put(key, record);
- }
- }
-
- public long getTotalTimeTakenToReadAndMergeBlocks() {
- return totalTimeTakenToReadAndMergeBlocks;
- }
-
- @Override
- public void close() {
- if (records != null) {
- records.close();
- }
+ /**
+ * Returns the builder for {@code HoodieMergedLogRecordScanner}.
+ */
+ public static HoodieMergedLogRecordScanner.Builder newBuilder() {
+ return new HoodieMergedLogRecordScanner.Builder();
}
/**
- * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+ * Builder used to build {@code HoodieMergedLogRecordScanner}.
*/
public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private HoodieStorage storage;
@@ -328,8 +132,8 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
// specific configurations
private Long maxMemorySizeInBytes;
private String spillableMapBasePath;
- private ExternalSpillableMap.DiskMapType diskMapType =
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
- private boolean isBitCaskDiskMapCompressionEnabled =
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+ private ExternalSpillableMap.DiskMapType diskMapType =
SPILLABLE_DISK_MAP_TYPE.defaultValue();
+ private boolean isBitCaskDiskMapCompressionEnabled =
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
private Option<InstantRange> instantRange = Option.empty();
private String partitionName;
@@ -357,7 +161,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
@Override
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths.stream()
- .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .filter(p -> !p.endsWith(CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList());
return this;
}
@@ -441,8 +245,8 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
return this;
}
- public Builder withKeyFiledOverride(String keyFieldOverride) {
- this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
+ public Builder withKeyFieldOverride(String keyFieldOverride) {
+ this.keyFieldOverride = requireNonNull(keyFieldOverride);
return this;
}
@@ -463,7 +267,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
this.partitionName = getRelativePartitionPath(
new StoragePath(basePath), new
StoragePath(this.logFilePaths.get(0)).getParent());
}
- ValidationUtils.checkArgument(recordMerger != null);
+ checkArgument(recordMerger != null);
return new HoodieMergedLogRecordScanner(storage, basePath, logFilePaths,
readerSchema,
latestInstantTime, maxMemorySizeInBytes, reverseReader,
@@ -474,4 +278,3 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
}
}
}
-
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
new file mode 100644
index 00000000000..c4a52cd6d86
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieMetadataRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static
org.apache.hudi.common.table.cdc.HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Merged log record scanner for metadata table using {@link
HoodieMetadataRecordMerger}.
+ */
+public class HoodieMetadataMergedLogRecordScanner extends
BaseHoodieMergedLogRecordScanner<String> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataMergedLogRecordScanner.class);
+
+ private HoodieMetadataMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, Long
maxMemorySizeInBytes,
+ boolean reverseReader, int
bufferSize, String spillableMapBasePath,
+ Option<InstantRange>
instantRange,
+
ExternalSpillableMap.DiskMapType diskMapType,
+ boolean
isBitCaskDiskMapCompressionEnabled,
+ boolean withOperationField,
boolean forceFullScan,
+ Option<String> partitionName,
+ InternalSchema internalSchema,
+ Option<String> keyFieldOverride,
+ boolean
enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
maxMemorySizeInBytes, reverseReader, bufferSize, spillableMapBasePath,
instantRange, diskMapType,
+ isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan,
recordMerger,
+ hoodieTableMetaClientOption);
+ }
+
+ @Override
+ public Map<String, HoodieRecord> getRecords() {
+ return records;
+ }
+
+ @Override
+ public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
+ // Merge the new record with the existing record in the map
+ HoodieRecord<T> oldRecord = (HoodieRecord<T>)
records.get(newRecord.getRecordKey());
+ if (oldRecord != null) {
+ LOG.debug("Merging new record with existing record in the map. Key: {}",
newRecord.getRecordKey());
+ recordMerger.fullOuterMerge(oldRecord, readerSchema, newRecord,
readerSchema, this.getPayloadProps()).forEach(
+ mergedRecord -> {
+ HoodieRecord<T> combinedRecord = mergedRecord.getLeft();
+ if (combinedRecord.getData() != oldRecord.getData()) {
+ HoodieRecord latestHoodieRecord =
getLatestHoodieRecord(newRecord, combinedRecord, newRecord.getRecordKey());
+ records.put(newRecord.getRecordKey(), latestHoodieRecord.copy());
+ }
+ });
+ } else {
+ // Put the record as is
+ // NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
+ // it since these records will be put into records(Map).
+ records.put(newRecord.getRecordKey(), newRecord.copy());
+ }
+ }
+
+ /**
+ * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+ */
+ public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() {
+ return new HoodieMetadataMergedLogRecordScanner.Builder();
+ }
+
+ /**
+ * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
+ */
+ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
+ private HoodieStorage storage;
+ private String basePath;
+ private List<String> logFilePaths;
+ private Schema readerSchema;
+ private InternalSchema internalSchema =
InternalSchema.getEmptyInternalSchema();
+ private String latestInstantTime;
+ private boolean reverseReader;
+ private int bufferSize;
+ // specific configurations
+ private Long maxMemorySizeInBytes;
+ private String spillableMapBasePath;
+ private ExternalSpillableMap.DiskMapType diskMapType =
SPILLABLE_DISK_MAP_TYPE.defaultValue();
+ private boolean isBitCaskDiskMapCompressionEnabled =
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+ // incremental filtering
+ private Option<InstantRange> instantRange = Option.empty();
+ private String partitionName;
+ // operation field default false
+ private boolean withOperationField = false;
+ private String keyFieldOverride;
+ // By default, we're doing a full-scan
+ private boolean forceFullScan = true;
+ private boolean enableOptimizedLogBlocksScan = false;
+ private HoodieRecordMerger recordMerger =
HoodieMetadataRecordMerger.INSTANCE;
+ protected HoodieTableMetaClient hoodieTableMetaClient;
+
+ @Override
+ public Builder withStorage(HoodieStorage storage) {
+ this.storage = storage;
+ return this;
+ }
+
+ @Override
+ public Builder withBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ @Override
+ public Builder withLogFilePaths(List<String> logFilePaths) {
+ this.logFilePaths = logFilePaths.stream()
+ .filter(p -> !p.endsWith(CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList());
+ return this;
+ }
+
+ @Override
+ public Builder withReaderSchema(Schema schema) {
+ this.readerSchema = schema;
+ return this;
+ }
+
+ @Override
+ public Builder withLatestInstantTime(String latestInstantTime) {
+ this.latestInstantTime = latestInstantTime;
+ return this;
+ }
+
+ @Override
+ public Builder withReverseReader(boolean reverseReader) {
+ this.reverseReader = reverseReader;
+ return this;
+ }
+
+ @Override
+ public Builder withBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ @Override
+ public Builder withInstantRange(Option<InstantRange> instantRange) {
+ this.instantRange = instantRange;
+ return this;
+ }
+
+ public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+ this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+ return this;
+ }
+
+ public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+ this.spillableMapBasePath = spillableMapBasePath;
+ return this;
+ }
+
+ public Builder withDiskMapType(ExternalSpillableMap.DiskMapType
diskMapType) {
+ this.diskMapType = diskMapType;
+ return this;
+ }
+
+ public Builder withBitCaskDiskMapCompressionEnabled(boolean
isBitCaskDiskMapCompressionEnabled) {
+ this.isBitCaskDiskMapCompressionEnabled =
isBitCaskDiskMapCompressionEnabled;
+ return this;
+ }
+
+ @Override
+ public Builder withInternalSchema(InternalSchema internalSchema) {
+ this.internalSchema = internalSchema;
+ return this;
+ }
+
+ public Builder withOperationField(boolean withOperationField) {
+ this.withOperationField = withOperationField;
+ return this;
+ }
+
+ @Override
+ public Builder withPartition(String partitionName) {
+ this.partitionName = partitionName;
+ return this;
+ }
+
+ @Override
+ public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
+ this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
+ return this;
+ }
+
+ @Override
+ public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
+ this.recordMerger = recordMerger;
+ return this;
+ }
+
+ public Builder withKeyFieldOverride(String keyFieldOverride) {
+ this.keyFieldOverride = requireNonNull(keyFieldOverride);
+ return this;
+ }
+
+ public Builder withForceFullScan(boolean forceFullScan) {
+ this.forceFullScan = forceFullScan;
+ return this;
+ }
+
+ @Override
+ public Builder withTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
+ return this;
+ }
+
+ @Override
+ public HoodieMetadataMergedLogRecordScanner build() {
+ if (this.partitionName == null &&
CollectionUtils.nonEmpty(this.logFilePaths)) {
+ this.partitionName = getRelativePartitionPath(
+ new StoragePath(basePath), new
StoragePath(this.logFilePaths.get(0)).getParent());
+ }
+ checkArgument(recordMerger != null);
+ checkArgument(nonEmpty(partitionName), "Partition name is required for
HoodieMetadataMergedLogRecordScanner.");
+
+ return new HoodieMetadataMergedLogRecordScanner(storage, basePath,
logFilePaths, readerSchema,
+ latestInstantTime, maxMemorySizeInBytes, reverseReader,
+ bufferSize, spillableMapBasePath, instantRange,
+ diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField,
forceFullScan,
+ Option.of(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+ Option.ofNullable(hoodieTableMetaClient));
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 59605679c5d..78a3ea48e38 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -641,7 +641,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// Load the schema
Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- HoodieMetadataLogRecordReader logRecordScanner =
HoodieMetadataLogRecordReader.newBuilder()
+ HoodieMetadataLogRecordReader logRecordScanner =
HoodieMetadataLogRecordReader.newBuilder(partitionName)
.withStorage(metadataMetaClient.getStorage())
.withBasePath(metadataBasePath)
.withLogFilePaths(sortedLogFilePaths)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 6705a75625e..17b7688133e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -20,7 +20,10 @@ package org.apache.hudi.metadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader;
+import org.apache.hudi.common.table.log.BaseHoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieMetadataMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -40,6 +43,8 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+
/**
* Metadata log-block records reading implementation, internally relying on
* {@link HoodieMergedLogRecordScanner} to merge corresponding Metadata
Table's delta log-blocks
@@ -48,17 +53,17 @@ import java.util.stream.Collectors;
@ThreadSafe
public class HoodieMetadataLogRecordReader implements Closeable {
- private final HoodieMergedLogRecordScanner logRecordScanner;
+ private final BaseHoodieMergedLogRecordScanner<String> logRecordScanner;
- private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner
logRecordScanner) {
+ private HoodieMetadataLogRecordReader(BaseHoodieMergedLogRecordScanner
logRecordScanner) {
this.logRecordScanner = logRecordScanner;
}
/**
* Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
*/
- public static HoodieMetadataLogRecordReader.Builder newBuilder() {
- return new HoodieMetadataLogRecordReader.Builder();
+ public static HoodieMetadataLogRecordReader.Builder newBuilder(String
partitionName) {
+ return new HoodieMetadataLogRecordReader.Builder(partitionName);
}
@SuppressWarnings("unchecked")
@@ -150,14 +155,20 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
* Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
*/
public static class Builder {
- private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
- new HoodieMergedLogRecordScanner.Builder()
- .withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
- // NOTE: Merging of Metadata Table's records is currently handled
using {@code HoodiePreCombineAvroRecordMerger}
- // for compatibility purposes; In the future it {@code
HoodieMetadataPayload} semantic
- // will be migrated to its own custom instance of {@code
RecordMerger}
- .withReverseReader(false)
- .withOperationField(false);
+ private final AbstractHoodieLogRecordReader.Builder scannerBuilder;
+ private final String partitionName;
+
+ public Builder(String partitionName) {
+ this.partitionName = partitionName;
+ this.scannerBuilder = shouldUseMetadataMergedLogRecordScanner() ?
HoodieMetadataMergedLogRecordScanner.newBuilder() :
HoodieMergedLogRecordScanner.newBuilder();
+ scannerBuilder
+ .withKeyFieldOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
+ // NOTE: Merging of Metadata Table's records is currently handled
using {@code HoodiePreCombineAvroRecordMerger}
+ // for compatibility purposes; In the future it {@code
HoodieMetadataPayload} semantic
+ // will be migrated to its own custom instance of {@code
RecordMerger}
+ .withReverseReader(false)
+ .withOperationField(false);
+ }
public Builder withStorage(HoodieStorage storage) {
scannerBuilder.withStorage(storage);
@@ -238,7 +249,11 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
}
public HoodieMetadataLogRecordReader build() {
- return new HoodieMetadataLogRecordReader(scannerBuilder.build());
+ return new
HoodieMetadataLogRecordReader((BaseHoodieMergedLogRecordScanner)
scannerBuilder.build());
+ }
+
+ private boolean shouldUseMetadataMergedLogRecordScanner() {
+ return PARTITION_NAME_SECONDARY_INDEX.equals(partitionName);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6441f85c532..a274d07b027 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -156,6 +156,7 @@ public class HoodieTableMetadataUtil {
public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX =
"func_index_";
+ public static final String PARTITION_NAME_SECONDARY_INDEX =
"secondary_index";
private HoodieTableMetadataUtil() {
}
@@ -1278,7 +1279,7 @@ public class HoodieTableMetadataUtil {
* it could subsequently be used in column stats
*
* NOTE: This method has to stay compatible with the semantic of
- * {@link ParquetUtils#readColumnStatsFromMetadata} as they are used in
tandem
+ * {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used
in tandem
*/
private static Comparable<?> coerceToComparable(Schema schema, Object val) {
if (val == null) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
new file mode 100644
index 00000000000..b63eb9e1531
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link HoodieMetadataRecordMerger}.
+ */
+public class TestHoodieMetadataRecordMerger {
+
+ private HoodieTestDataGenerator dataGen;
+
+ @BeforeEach
+ public void setUp() {
+ dataGen = new HoodieTestDataGenerator();
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ if (dataGen != null) {
+ dataGen = null;
+ }
+ }
+
+ @Test
+ public void testFullOuterMerge() throws IOException {
+ List<HoodieRecord> newRecordList = dataGen.generateInserts("000", 1);
+ List<HoodieRecord> updateRecordList = dataGen.generateUpdates("0001",
newRecordList);
+ HoodieMetadataRecordMerger recordMerger = new HoodieMetadataRecordMerger();
+ List<Pair<HoodieRecord, Schema>> mergedRecords =
recordMerger.fullOuterMerge(newRecordList.get(0), AVRO_SCHEMA,
updateRecordList.get(0), AVRO_SCHEMA, new TypedProperties());
+ assertEquals(1, mergedRecords.size());
+ assertEquals(updateRecordList.get(0), mergedRecords.get(0).getLeft());
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 0d4bd3501e3..516ce425d1e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -614,7 +614,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
- def testPreCombineFiledForReadMOR(recordType: HoodieRecordType): Unit = {
+ def testPreCombineFieldForReadMOR(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
writeData((1, "a0", 10, 100, false), writeOpts)