This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f93192d446ac [HUDI-9558] Make merge handle configurable (#13495)
f93192d446ac is described below
commit f93192d446acebb24b7b9379ded3fadc3f3d1948
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Jul 21 20:43:43 2025 -0400
[HUDI-9558] Make merge handle configurable (#13495)
* [ENG-21953][INTERNAL][1.0-internal] Add abstraction of HoodieMergeHandle
to make its implementations configurable (#1192)
---------
Co-authored-by: Lin Liu <[email protected]>
Co-authored-by: Davis-Zhang-Onehouse
<[email protected]>
Co-authored-by: Davis Zhang <[email protected]>
Co-authored-by: Rajesh Mahindra
<[email protected]>
Co-authored-by: rmahindra123 <[email protected]>
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 65 +++
.../hudi/io/FileGroupReaderBasedMergeHandle.java | 5 +-
.../apache/hudi/io/HoodieAbstractMergeHandle.java | 183 +++++++
.../org/apache/hudi/io/HoodieConcatHandle.java | 13 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 584 ++-------------------
.../apache/hudi/io/HoodieMergeHandleFactory.java | 167 +++++-
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 2 +-
.../apache/hudi/io/HoodieSortedMergeHandle.java | 9 +-
.../io/HoodieSortedMergeHandleWithChangeLog.java | 4 +-
...ergeHandle.java => HoodieWriteMergeHandle.java} | 176 ++-----
.../src/main/java/org/apache/hudi/io/IOUtils.java | 45 +-
.../java/org/apache/hudi/table/HoodieTable.java | 10 -
.../hudi/table/action/commit/BaseMergeHelper.java | 16 +-
.../table/action/commit/HoodieMergeHelper.java | 4 +-
.../hudi/table/action/compact/HoodieCompactor.java | 7 +-
.../hudi/io/TestHoodieMergeHandleFactory.java | 156 ++++++
.../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 7 +-
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 7 +-
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 2 +
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 13 +-
.../commit/BaseFlinkCommitActionExecutor.java | 15 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 11 +-
.../commit/BaseJavaCommitActionExecutor.java | 13 +-
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 8 +-
.../client/common/SparkReaderContextFactory.java | 2 +
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 37 +-
.../org/apache/hudi/table/HoodieSparkTable.java | 23 -
.../commit/BaseSparkCommitActionExecutor.java | 28 +-
.../hudi/client/TestUpdateSchemaEvolution.java | 4 +-
...ndle.java => TestHoodieDefaultMergeHandle.java} | 2 +-
.../table/log/BaseHoodieLogRecordReader.java | 6 +-
.../table/log/HoodieMergedLogRecordReader.java | 8 +-
.../hudi/command/payload/ExpressionPayload.scala | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 8 +-
.../java/org/apache/hudi/io/TestMergeHandle.java | 2 +-
35 files changed, 789 insertions(+), 855 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2ce654d6e644..a1df49505c48 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -71,6 +71,9 @@ import org.apache.hudi.estimator.AverageRecordSizeEstimator;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.io.HoodieConcatHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -850,6 +853,37 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.1.0")
.withDocumentation("Records event time watermark metadata in commit
metadata when enabled");
+ public static final ConfigProperty<String> MERGE_HANDLE_CLASS_NAME =
ConfigProperty
+ .key("hoodie.write.merge.handle.class")
+ .defaultValue(HoodieWriteMergeHandle.class.getName())
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("The merge handle class that implements
interface{@link HoodieMergeHandle} to merge the records "
+ + "from a base file with an iterator of incoming records or a map of
updates and deletes from log files at a file group level.");
+
+ public static final ConfigProperty<String> CONCAT_HANDLE_CLASS_NAME =
ConfigProperty
+ .key("hoodie.write.concat.handle.class")
+ .defaultValue(HoodieConcatHandle.class.getName())
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("The merge handle class to use to concat the records
from a base file with an iterator of incoming records.");
+
+ public static final ConfigProperty<String> COMPACT_MERGE_HANDLE_CLASS_NAME =
ConfigProperty
+ .key("hoodie.compact.merge.handle.class")
+ .defaultValue(FileGroupReaderBasedMergeHandle.class.getName())
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Merge handle class for compaction");
+
+ public static final ConfigProperty<Boolean> MERGE_HANDLE_PERFORM_FALLBACK =
ConfigProperty
+ .key("hoodie.write.merge.handle.fallback")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("When using a custom Hoodie Merge Handle
Implementation controlled by the config " + MERGE_HANDLE_CLASS_NAME.key()
+ + " or when using a custom Hoodie Concat Handle Implementation
controlled by the config " + CONCAT_HANDLE_CLASS_NAME.key()
+ + ", enabling this config results in fallback to the default
implementations if instantiation of the custom implementation fails");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -1401,6 +1435,10 @@ public class HoodieWriteConfig extends HoodieConfig {
}
}
+ public boolean isMergeHandleFallbackEnabled() {
+ return
getBooleanOrDefault(HoodieWriteConfig.MERGE_HANDLE_PERFORM_FALLBACK);
+ }
+
public boolean isConsistentLogicalTimestampEnabled() {
return
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
@@ -1497,6 +1535,18 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(WRITE_STATUS_CLASS_NAME);
}
+ public String getMergeHandleClassName() {
+ return getStringOrDefault(MERGE_HANDLE_CLASS_NAME);
+ }
+
+ public String getConcatHandleClassName() {
+ return getStringOrDefault(CONCAT_HANDLE_CLASS_NAME);
+ }
+
+ public String getCompactionMergeHandleClassName() {
+ return getStringOrDefault(COMPACT_MERGE_HANDLE_CLASS_NAME);
+ }
+
public int getFinalizeWriteParallelism() {
return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
}
@@ -3399,6 +3449,21 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withMergeHandleClassName(String className) {
+ writeConfig.setValue(MERGE_HANDLE_CLASS_NAME, className);
+ return this;
+ }
+
+ public Builder withConcatHandleClassName(String className) {
+ writeConfig.setValue(CONCAT_HANDLE_CLASS_NAME, className);
+ return this;
+ }
+
+ public Builder withFileGroupReaderMergeHandleClassName(String className) {
+ writeConfig.setValue(COMPACT_MERGE_HANDLE_CLASS_NAME, className);
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 0e7223a06f32..9ce794347b79 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -72,7 +72,7 @@ import static
org.apache.hudi.common.model.HoodieFileFormat.HFILE;
* the records, and writes the records to a new base file.
*/
@NotThreadSafe
-public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I, K, O> {
+public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends
HoodieWriteMergeHandle<T, I, K, O> {
private static final Logger LOG =
LoggerFactory.getLogger(FileGroupReaderBasedMergeHandle.class);
private final HoodieReaderContext<T> readerContext;
@@ -169,7 +169,8 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieMergeHand
* Reads the file slice of a compaction operation using a file group reader,
* by getting an iterator of the records; then writes the records to a new
base file.
*/
- public void write() {
+ @Override
+ public void doMerge() {
boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema());
TypedProperties props = TypedProperties.copy(config.getProps());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
new file mode 100644
index 000000000000..9ff103801042
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAbstractMergeHandle.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstract class to implement merging records from base file with incoming
records or records from log blocks
+ * at a file group level.
+ */
+public abstract class HoodieAbstractMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O> implements HoodieMergeHandle<T, I, K, O> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieAbstractMergeHandle.class);
+
+ protected Map<String, HoodieRecord<T>> keyToNewRecords;
+ protected StoragePath newFilePath;
+ protected StoragePath oldFilePath;
+ protected Option<BaseKeyGenerator> keyGeneratorOpt;
+ protected HoodieBaseFile baseFileToMerge;
+ protected Option<String[]> partitionFields = Option.empty();
+ protected Object[] partitionValues = new Object[0];
+
+ /**
+ * Used by writer code path, to upsert new records by providing an iterator
to the new records.
+ * @param config Hoodie writer configs.
+ * @param instantTime current instant time.
+ * @param hoodieTable an instance of {@link HoodieTable}
+ * @param partitionPath Partition path of the upsert and insert records.
+ * @param fileId New file id of the target base file.
+ * @param taskContextSupplier Base task context supplier
+ * @param baseFile current base file that needs to be read for the records
in storage.
+ * @param keyGeneratorOpt Optional instance of the {@link
org.apache.hudi.keygen.KeyGenerator} used.
+ */
+ public HoodieAbstractMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier,
+ HoodieBaseFile baseFile,
Option<BaseKeyGenerator> keyGeneratorOpt, boolean preserveMetadata) {
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, preserveMetadata);
+ this.baseFileToMerge = baseFile;
+ this.keyGeneratorOpt = keyGeneratorOpt;
+ initPartitionMetadataAndFilePaths(fileId, partitionPath);
+ initWriteStatus(fileId, partitionPath);
+ validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+ }
+
+ /**
+ * Used by fg reader merge handle
+ */
+ protected HoodieAbstractMergeHandle(HoodieWriteConfig config, String
instantTime, String partitionPath,
+ String fileId, HoodieTable<T, I, K, O>
hoodieTable, TaskContextSupplier taskContextSupplier, boolean preserveMetadata)
{
+ super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, preserveMetadata);
+ }
+
+ @Override
+ public StoragePath getOldFilePath() {
+ return oldFilePath;
+ }
+
+ @Override
+ public IOType getIOType() {
+ return IOType.MERGE;
+ }
+
+ @Override
+ public HoodieBaseFile baseFileForMerge() {
+ return baseFileToMerge;
+ }
+
+ public void setPartitionFields(Option<String[]> partitionFields) {
+ this.partitionFields = partitionFields;
+ }
+
+ public Option<String[]> getPartitionFields() {
+ return this.partitionFields;
+ }
+
+ public void setPartitionValues(Object[] partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ public Object[] getPartitionValues() {
+ return this.partitionValues;
+ }
+
+ /**
+ * Extract old file path, initialize StorageWriter and WriteStatus.
+ */
+ private void initPartitionMetadataAndFilePaths(String targetFileId, String
partitionPath) {
+ LOG.info("partitionPath:{}, targetFileId to be merged: {}", partitionPath,
targetFileId);
+ String latestValidFilePath = baseFileToMerge == null ? null :
baseFileToMerge.getFileName();
+ HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(
+ storage,
+ instantTime,
+ new StoragePath(config.getBasePath()),
+ FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
+ hoodieTable.getPartitionMetafileFormat());
+ partitionMetadata.trySave();
+
+ String newFileName = FSUtils.makeBaseFileName(
+ instantTime, writeToken, targetFileId,
hoodieTable.getBaseFileExtension());
+ makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
+
+ LOG.info(
+ "Merging new data into oldPath: {}, as newPath: {}",
+ oldFilePath.toString(), newFilePath.toString());
+
+ }
+
+ private void initWriteStatus(String fileId, String partitionPath) {
+ writeStatus.setStat(new HoodieWriteStat());
+ if (baseFileToMerge != null) {
+ writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+ // At the moment, we only support SI for overwrite with latest payload.
So, we don't need to embed entire file slice here.
+ // HUDI-8518 will be taken up to fix it for any payload during which we
might require entire file slice to be set here.
+ // Already AppendHandle adds all logs file from current file slice to
HoodieDeltaWriteStat.
+ writeStatus.getStat().setPrevBaseFile(baseFileToMerge.getFileName());
+ } else {
+ writeStatus.getStat().setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+ }
+ // file name is same for all records, in this bunch
+ writeStatus.setFileId(fileId);
+ writeStatus.setPartitionPath(partitionPath);
+ writeStatus.getStat().setPartitionPath(partitionPath);
+ writeStatus.getStat().setFileId(fileId);
+ LOG.debug("Initializing Write status with fileId {} partitionPath {}",
fileId, partitionPath);
+ setWriteStatusPath();
+ }
+
+ private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt, boolean populateMetaFields) {
+ ValidationUtils.checkArgument(populateMetaFields ==
!keyGeneratorOpt.isPresent());
+ this.keyGeneratorOpt = keyGeneratorOpt;
+ }
+
+ protected void makeOldAndNewFilePaths(String partitionPath, String
oldFileName, String newFileName) {
+ oldFilePath = makeNewFilePath(partitionPath, oldFileName);
+ newFilePath = makeNewFilePath(partitionPath, newFileName);
+ }
+
+ public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?>
hoodieTable, String partitionPath, String fileId) {
+ Option<HoodieBaseFile> baseFileOp =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+ if (!baseFileOp.isPresent()) {
+ throw new NoSuchElementException(String.format("FileID %s of partition
path %s does not exist.", fileId, partitionPath));
+ }
+ return baseFileOp.get();
+ }
+
+ protected void setWriteStatusPath() {
+ writeStatus.getStat().setPath(new StoragePath(config.getBasePath()),
newFilePath);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
index c8b93c2ac58f..4b3ba020b3cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java
@@ -19,6 +19,7 @@
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -37,10 +38,11 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
/**
* Handle to concatenate new records to old records w/o any merging. If
Operation is set to Inserts, and if {{@link
HoodieWriteConfig#allowDuplicateInserts()}}
- * is set, this handle will be used instead of {@link HoodieMergeHandle}.
+ * is set, this handle will be used instead of {@link HoodieWriteMergeHandle}.
*
* Simplified Logic:
* For every existing record
@@ -65,7 +67,7 @@ import java.util.Iterator;
* happen and every batch should have new records to be inserted. Above
example is for illustration purposes only.
*/
@NotThreadSafe
-public class HoodieConcatHandle<T, I, K, O> extends HoodieMergeHandle<T, I, K,
O> {
+public class HoodieConcatHandle<T, I, K, O> extends HoodieWriteMergeHandle<T,
I, K, O> {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieConcatHandle.class);
// a representation of incoming records that tolerates duplicate keys
@@ -78,6 +80,13 @@ public class HoodieConcatHandle<T, I, K, O> extends
HoodieMergeHandle<T, I, K, O
this.recordItr = recordItr;
}
+ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime,
HoodieTable hoodieTable,
+ Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, Collections.emptyMap(),
partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Option.empty());
+ this.recordItr = keyToNewRecords.values().iterator();
+ }
+
/**
* Write old record as is w/o merging with incoming record.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 7b81d728a358..abf02d3bd44e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -7,584 +7,62 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.io;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.serialization.DefaultSerializer;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCorruptedDataException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileWriter;
-import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
-
-import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Handle to merge incoming records to those in storage.
- * <p>
- * Simplified Logic:
- * For every existing record
- * Check if there is a new record coming in. If yes, merge two records and
write to file
- * else write the record as is
- * For all pending records from incoming batch, write to file.
- *
- * <p>
- * Illustration with simple data.
- * Incoming data:
- * rec1_2, rec4_2, rec5_1, rec6_1
- * Existing data:
- * rec1_1, rec2_1, rec3_1, rec4_1
- * <p>
- * For every existing record, merge w/ incoming if required and write to
storage.
- * => rec1_1 and rec1_2 is merged to write rec1_2 to storage
- * => rec2_1 is written as is
- * => rec3_1 is written as is
- * => rec4_2 and rec4_1 is merged to write rec4_2 to storage
- * Write all pending records from incoming set to storage
- * => rec5_1 and rec6_1
- * <p>
- * Final snapshot in storage
- * rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
- */
-@SuppressWarnings("Duplicates")
-@NotThreadSafe
-public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K,
O> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeHandle.class);
-
- protected Map<String, HoodieRecord<T>> keyToNewRecords;
- protected Set<String> writtenRecordKeys;
- protected HoodieFileWriter fileWriter;
-
- protected StoragePath newFilePath;
- protected StoragePath oldFilePath;
- protected long recordsWritten = 0;
- protected long recordsDeleted = 0;
- protected long updatedRecordsWritten = 0;
- protected long insertRecordsWritten = 0;
- protected Option<BaseKeyGenerator> keyGeneratorOpt;
- protected HoodieBaseFile baseFileToMerge;
-
- protected Option<String[]> partitionFields = Option.empty();
- protected Object[] partitionValues = new Object[0];
-
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
- this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier,
- getLatestBaseFile(hoodieTable, partitionPath, fileId),
keyGeneratorOpt);
- }
-
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, false);
- init(recordItr);
- init(fileId, partitionPath, baseFile);
- validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
- }
-
- /**
- * Called by compactor code path.
- */
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, true);
- this.keyToNewRecords = keyToNewRecords;
- init(fileId, this.partitionPath, dataFileToBeMerged);
- validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
- }
-
- /**
- * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
- *
- * @param config Hudi write config
- * @param instantTime Instant time to use
- * @param partitionPath Partition path
- * @param fileId File group ID for the merge handle to operate
on
- * @param hoodieTable {@link HoodieTable} instance
- * @param taskContextSupplier Task context supplier
- */
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
- String fileId, HoodieTable<T, I, K, O> hoodieTable,
TaskContextSupplier taskContextSupplier) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, true);
- }
-
- private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt, boolean populateMetaFields) {
- ValidationUtils.checkArgument(populateMetaFields ==
!keyGeneratorOpt.isPresent());
- this.keyGeneratorOpt = keyGeneratorOpt;
- }
-
- public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?>
hoodieTable, String partitionPath, String fileId) {
- Option<HoodieBaseFile> baseFileOp =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
- if (!baseFileOp.isPresent()) {
- throw new NoSuchElementException(String.format("FileID %s of partition
path %s does not exist.", fileId, partitionPath));
- }
- return baseFileOp.get();
- }
-
- /**
- * Extract old file path, initialize StorageWriter and WriteStatus.
- */
- private void init(String fileId, String partitionPath, HoodieBaseFile
baseFileToMerge) {
- LOG.info("partitionPath:{}, fileId to be merged:{}", partitionPath,
fileId);
- this.baseFileToMerge = baseFileToMerge;
- this.writtenRecordKeys = new HashSet<>();
- writeStatus.setStat(new HoodieWriteStat());
- try {
- String latestValidFilePath = baseFileToMerge.getFileName();
- writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
- // At the moment, we only support SI for overwrite with latest payload.
So, we don't need to embed entire file slice here.
- // HUDI-8518 will be taken up to fix it for any payload during which we
might require entire file slice to be set here.
- // Already AppendHandle adds all logs file from current file slice to
HoodieDeltaWriteStat.
- writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
-
- HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(storage, instantTime,
- new StoragePath(config.getBasePath()),
- FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
- hoodieTable.getPartitionMetafileFormat());
- partitionMetadata.trySave();
-
- String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken,
fileId, hoodieTable.getBaseFileExtension());
- makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
-
- LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath,
newFilePath);
- // file name is same for all records, in this bunch
- writeStatus.setFileId(fileId);
- writeStatus.setPartitionPath(partitionPath);
- writeStatus.getStat().setPartitionPath(partitionPath);
- writeStatus.getStat().setFileId(fileId);
- setWriteStatusPath();
-
- // Create Marker file,
- // uses name of `newFilePath` instead of `newFileName`
- // in case the sub-class may roll over the file handle name.
- createMarkerFile(partitionPath, newFilePath.getName());
-
- // Create the writer for writing the new version file
- fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
newFilePath, hoodieTable.getStorage(),
- config, writeSchemaWithMetaFields, taskContextSupplier,
recordMerger.getRecordType());
- } catch (IOException io) {
- LOG.error("Error in update task at commit {}", instantTime, io);
- writeStatus.setGlobalError(io);
- throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle
for FileId: " + fileId + " on commit "
- + instantTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
- }
- }
-
- protected void setWriteStatusPath() {
- writeStatus.getStat().setPath(new StoragePath(config.getBasePath()),
newFilePath);
- }
-
- protected void makeOldAndNewFilePaths(String partitionPath, String
oldFileName, String newFileName) {
- oldFilePath = makeNewFilePath(partitionPath, oldFileName);
- newFilePath = makeNewFilePath(partitionPath, newFileName);
- }
-
- /**
- * Initialize a spillable map for incoming records.
- */
- protected void initializeIncomingRecordsMap() {
- try {
- // Load the new records in a map
- long memoryForMerge =
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
- LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
- this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge,
config.getSpillableMapBasePath(),
- new DefaultSizeEstimator<>(), new
HoodieRecordSizeEstimator<>(writeSchema),
- config.getCommonConfig().getSpillableDiskMapType(),
- new DefaultSerializer<>(),
- config.getCommonConfig().isBitCaskDiskMapCompressionEnabled(),
- getClass().getSimpleName());
- } catch (IOException io) {
- throw new HoodieIOException("Cannot instantiate an
ExternalSpillableMap", io);
- }
- }
- /**
- * Whether there is need to update the record location.
- */
- boolean needsUpdateLocation() {
- return true;
- }
-
- /**
- * Load the new incoming records in a map and return partitionPath.
- */
- protected void init(Iterator<HoodieRecord<T>> newRecordsItr) {
- initializeIncomingRecordsMap();
- while (newRecordsItr.hasNext()) {
- HoodieRecord<T> record = newRecordsItr.next();
- // update the new location of the record, so we know where to find it
next
- if (needsUpdateLocation()) {
- record.unseal();
- record.setNewLocation(newRecordLocation);
- record.seal();
- }
- // NOTE: Once Records are added to map (spillable-map), DO NOT change it
as they won't persist
- keyToNewRecords.put(record.getRecordKey(), record);
- }
- if (keyToNewRecords instanceof ExternalSpillableMap) {
- ExternalSpillableMap<String, HoodieRecord<T>> spillableMap =
(ExternalSpillableMap<String, HoodieRecord<T>>) keyToNewRecords;
- LOG.info("Number of entries in MemoryBasedMap => {}, Total size in bytes
of MemoryBasedMap => {}, "
- + "Number of entries in BitCaskDiskMap => {}, Size of file spilled
to disk => {}",
- spillableMap.getInMemoryMapNumEntries(),
spillableMap.getCurrentInMemoryMapSize(),
spillableMap.getDiskBasedMapNumEntries(),
spillableMap.getSizeOfFileOnDiskInBytes());
- }
- }
-
- public boolean isEmptyNewRecords() {
- return keyToNewRecords.isEmpty();
- }
-
- protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema
writerSchema) throws IOException {
- boolean isDelete = false;
- if (combineRecordOpt.isPresent()) {
- if (oldRecord.getData() != combineRecordOpt.get().getData()) {
- // the incoming record is chosen
- isDelete = HoodieOperation.isDelete(newRecord.getOperation());
- } else {
- // the incoming record is dropped
- return false;
- }
- updatedRecordsWritten++;
- }
- return writeRecord(newRecord, oldRecord, combineRecordOpt, writerSchema,
config.getPayloadConfig().getProps(), isDelete);
- }
-
- protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
- Schema schema = getNewSchema();
- // just skip the ignored record
- if (newRecord.shouldIgnore(schema, config.getProps())) {
- return;
- }
- writeInsertRecord(newRecord, schema, config.getProps());
- }
-
- protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema,
Properties prop) {
- if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop,
HoodieOperation.isDelete(newRecord.getOperation()))) {
- insertRecordsWritten++;
- }
- }
-
- protected boolean writeRecord(HoodieRecord<T> newRecord,
Option<HoodieRecord> combineRecord, Schema schema, Properties prop) throws
IOException {
- return writeRecord(newRecord, null, combineRecord, schema, prop, false);
- }
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieMergeHandle<T, I, K, O> {
/**
- * The function takes the different versions of the record - old record, new
incoming record and combined record
- * created by merging the old record with the new incoming record. It
decides whether the combined record needs to be
- * written to the file and writes the record accordingly.
- *
- * @param newRecord The new incoming record
- * @param oldRecord The value of old record
- * @param combineRecord Record created by merging the old record with the
new incoming record
- * @param schema Record schema
- * @param prop Properties
- * @param isDelete Whether the new record is a delete record
- *
- * @return true if the record was written successfully
+ * Called to read the base file, the incoming records, merge the records and
write the final base file.
* @throws IOException
*/
- private boolean writeRecord(HoodieRecord<T> newRecord,
- @Nullable HoodieRecord<T> oldRecord,
- Option<HoodieRecord> combineRecord,
- Schema schema,
- Properties prop,
- boolean isDelete) {
- Option recordMetadata = newRecord.getMetadata();
- if (!partitionPath.equals(newRecord.getPartitionPath())) {
- HoodieUpsertException failureEx = new HoodieUpsertException("mismatched
partition path, record partition: "
- + newRecord.getPartitionPath() + " but trying to insert into
partition: " + partitionPath);
- writeStatus.markFailure(newRecord, failureEx, recordMetadata);
- return false;
- }
- try {
- if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema,
config.getProps()) && !isDelete) {
- // Last-minute check.
- boolean decision = recordMerger.shouldFlush(combineRecord.get(),
schema, config.getProps());
-
- if (decision) {
- // CASE (1): Flush the merged record.
- HoodieKey hoodieKey = newRecord.getKey();
- if (isSecondaryIndexStatsStreamingWritesEnabled) {
- SecondaryIndexStreamingTracker.trackSecondaryIndexStats(hoodieKey,
combineRecord, oldRecord, false, writeStatus,
- writeSchemaWithMetaFields, this::getNewSchema,
secondaryIndexDefns, keyGeneratorOpt, config);
- }
- writeToFile(hoodieKey, combineRecord.get(), schema, prop,
preserveMetadata);
- recordsWritten++;
- } else {
- // CASE (2): A delete operation.
- if (isSecondaryIndexStatsStreamingWritesEnabled) {
-
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(),
combineRecord, oldRecord, true, writeStatus,
- writeSchemaWithMetaFields, this::getNewSchema,
secondaryIndexDefns, keyGeneratorOpt, config);
- }
- recordsDeleted++;
- }
- } else {
- if (isSecondaryIndexStatsStreamingWritesEnabled) {
-
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(newRecord.getKey(),
combineRecord, oldRecord, true, writeStatus,
- writeSchemaWithMetaFields, this::getNewSchema,
secondaryIndexDefns, keyGeneratorOpt, config);
- }
- recordsDeleted++;
- // Clear the new location as the record was deleted
- newRecord.unseal();
- newRecord.clearNewLocation();
- newRecord.seal();
- }
- writeStatus.markSuccess(newRecord, recordMetadata);
- // deflate record payload after recording success. This will help users
access payload as a
- // part of marking
- // record successful.
- newRecord.deflate();
- return true;
- } catch (Exception e) {
- LOG.error("Error writing record {}", newRecord, e);
- writeStatus.markFailure(newRecord, e, recordMetadata);
- }
- return false;
- }
-
- /**
- * Go through an old record. Here if we detect a newer version shows up, we
write the new one to the file.
- */
- public void write(HoodieRecord<T> oldRecord) {
- // Use schema with metadata files no matter whether
'hoodie.populate.meta.fields' is enabled
- // to avoid unnecessary rewrite. Even with metadata table(whereas the
option 'hoodie.populate.meta.fields' is configured as false),
- // the record is deserialized with schema including metadata fields,
- // see HoodieMergeHelper#runMerge for more details.
- Schema oldSchema = writeSchemaWithMetaFields;
- Schema newSchema = getNewSchema();
- boolean copyOldRecord = true;
- String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
- TypedProperties props = config.getPayloadConfig().getProps();
- if (keyToNewRecords.containsKey(key)) {
- // If we have duplicate records that we are updating, then the hoodie
record will be deflated after
- // writing the first record. So make a copy of the record to be merged
- HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
- try {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
- Schema combineRecordSchema =
mergeResult.map(Pair::getRight).orElse(null);
- Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
- if (combinedRecord.isPresent() &&
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
- // If it is an IGNORE_RECORD, just copy the old record, and do not
update the new record.
- copyOldRecord = true;
- } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord,
combineRecordSchema)) {
- /*
- * ONLY WHEN 1) we have an update for this key AND 2) We are able to
successfully
- * write the combined new value
- *
- * We no longer need to copy the old record over.
- */
- copyOldRecord = false;
- }
- writtenRecordKeys.add(key);
- } catch (Exception e) {
- throw new HoodieUpsertException("Failed to combine/merge new record
with old value in storage, for new record {"
- + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}",
e);
- }
- }
-
- if (copyOldRecord) {
- try {
- // NOTE: We're enforcing preservation of the record metadata to keep
existing semantic
- writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema,
props, true);
- } catch (IOException | RuntimeException e) {
- String errMsg = String.format("Failed to merge old record into new
file for key %s from old file %s to new file %s with writerSchema %s",
- key, getOldFilePath(), newFilePath,
writeSchemaWithMetaFields.toString(true));
- LOG.debug("Old record is {}", oldRecord);
- throw new HoodieUpsertException(errMsg, e);
- }
- recordsWritten++;
- }
- }
-
- protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws
IOException {
- if (shouldPreserveRecordMetadata) {
- // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly
point to the
- // file holding this record even in cases when overall metadata is
preserved
- HoodieRecord populatedRecord = record.updateMetaField(schema,
HoodieRecord.FILENAME_META_FIELD_ORD, newFilePath.getName());
- fileWriter.write(key.getRecordKey(), populatedRecord,
writeSchemaWithMetaFields);
- } else {
- // rewrite the record to include metadata fields in schema, and the
values will be set later.
- record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new
MetadataValues(), config.getProps());
- fileWriter.writeWithMetadata(key, record, writeSchemaWithMetaFields);
- }
- }
-
- protected void writeIncomingRecords() throws IOException {
- // write out any pending records (this can happen when inserts are turned
into updates)
- Iterator<HoodieRecord<T>> newRecordsItr;
- if (keyToNewRecords instanceof ExternalSpillableMap) {
- newRecordsItr = ((ExternalSpillableMap) keyToNewRecords).iterator(key ->
!writtenRecordKeys.contains(key));
- } else {
- newRecordsItr = keyToNewRecords.entrySet().stream()
- .filter(e -> !writtenRecordKeys.contains(e.getKey()))
- .map(Map.Entry::getValue)
- .iterator();
- }
- while (newRecordsItr.hasNext()) {
- HoodieRecord<T> hoodieRecord = newRecordsItr.next();
- writeInsertRecord(hoodieRecord);
- }
- }
-
- private Schema getNewSchema() {
- return preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
- }
-
- @Override
- public List<WriteStatus> close() {
- try {
- if (isClosed()) {
- // Handle has already been closed
- return Collections.emptyList();
- }
-
- markClosed();
- writeIncomingRecords();
-
- if (keyToNewRecords instanceof Closeable) {
- ((Closeable) keyToNewRecords).close();
- }
-
- keyToNewRecords = null;
- writtenRecordKeys = null;
-
- fileWriter.close();
- fileWriter = null;
-
- long fileSizeInBytes = storage.getPathInfo(newFilePath).getLength();
- HoodieWriteStat stat = writeStatus.getStat();
-
- stat.setTotalWriteBytes(fileSizeInBytes);
- stat.setFileSizeInBytes(fileSizeInBytes);
- stat.setNumWrites(recordsWritten);
- stat.setNumDeletes(recordsDeleted);
- stat.setNumUpdateWrites(updatedRecordsWritten);
- stat.setNumInserts(insertRecordsWritten);
- stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
- RuntimeStats runtimeStats = new RuntimeStats();
- runtimeStats.setTotalUpsertTime(timer.endTimer());
- stat.setRuntimeStats(runtimeStats);
-
- performMergeDataValidationCheck(writeStatus);
-
- LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.",
stat.getPartitionPath(),
- stat.getFileId(), runtimeStats.getTotalUpsertTime());
-
- return Collections.singletonList(writeStatus);
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to close UpdateHandle", e);
- }
- }
-
- public void performMergeDataValidationCheck(WriteStatus writeStatus) {
- if (!config.isMergeDataValidationCheckEnabled() || baseFileToMerge ==
null) {
- return;
- }
-
- long oldNumWrites = 0;
- try (HoodieFileReader reader =
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
- .getReaderFactory(this.recordMerger.getRecordType())
- .getFileReader(config, oldFilePath)) {
- oldNumWrites = reader.getTotalRecords();
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to check for merge data
validation", e);
- }
-
- if ((writeStatus.getStat().getNumWrites() +
writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
- throw new HoodieCorruptedDataException(
- String.format("Record write count decreased for file: %s, Partition
Path: %s (%s:%d + %d < %s:%d)",
- writeStatus.getFileId(), writeStatus.getPartitionPath(),
- instantTime, writeStatus.getStat().getNumWrites(),
writeStatus.getStat().getNumDeletes(),
- baseFileToMerge.getCommitTime(), oldNumWrites));
- }
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ void doMerge() throws IOException;
- public Iterator<List<WriteStatus>> getWriteStatusesAsIterator() {
- List<WriteStatus> statuses = getWriteStatuses();
- // TODO(vc): This needs to be revisited
- if (getPartitionPath() == null) {
- LOG.info("Upsert Handle has partition path as null {}, {}",
getOldFilePath(), statuses);
- }
- return Collections.singletonList(statuses).iterator();
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ HoodieBaseFile baseFileForMerge();
- public StoragePath getOldFilePath() {
- return oldFilePath;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ void setPartitionFields(Option<String[]> partitionFields);
- @Override
- public IOType getIOType() {
- return IOType.MERGE;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ StoragePath getOldFilePath();
- public HoodieBaseFile baseFileForMerge() {
- return baseFileToMerge;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ String getPartitionPath();
- public void setPartitionFields(Option<String[]> partitionFields) {
- this.partitionFields = partitionFields;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ Schema getWriterSchema();
- public Option<String[]> getPartitionFields() {
- return this.partitionFields;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ void setPartitionValues(Object[] partitionValues);
- public void setPartitionValues(Object[] partitionValues) {
- this.partitionValues = partitionValues;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ List<WriteStatus> getWriteStatuses();
- public Object[] getPartitionValues() {
- return this.partitionValues;
- }
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ List<WriteStatus> close();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index d049f884b514..9b8ed2811b0a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -18,12 +18,19 @@
package org.apache.hudi.io;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
@@ -32,12 +39,16 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.hudi.config.HoodieWriteConfig.COMPACT_MERGE_HANDLE_CLASS_NAME;
/**
- * Factory class for hoodie merge handle.
+ * Factory class for instantiating the appropriate implementation of {@link
HoodieMergeHandle}.
*/
public class HoodieMergeHandleFactory {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeHandleFactory.class);
+
/**
* Creates a merge handle for normal write path.
*/
@@ -51,24 +62,20 @@ public class HoodieMergeHandleFactory {
String fileId,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
- LOG.info("Create update handle for fileId {} and partition path {} at
commit {}", fileId, partitionPath, instantTime);
- if (table.requireSortedRecords()) {
- if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
- return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig,
instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
- keyGeneratorOpt);
- } else {
- return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table,
recordItr, partitionPath, fileId, taskContextSupplier,
- keyGeneratorOpt);
- }
- } else if (!WriteOperationType.isChangingRecords(operationType) &&
writeConfig.allowDuplicateInserts()) {
- return new HoodieConcatHandle<>(writeConfig, instantTime, table,
recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
- } else {
- if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
- return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime,
table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
- } else {
- return new HoodieMergeHandle<>(writeConfig, instantTime, table,
recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
- }
- }
+
+ boolean isFallbackEnabled = writeConfig.isMergeHandleFallbackEnabled();
+ Pair<String, String> mergeHandleClasses =
getMergeHandleClassesWrite(operationType, writeConfig, table);
+ String logContext = String.format("for fileId %s and partition path %s at
commit %s", fileId, partitionPath, instantTime);
+ LOG.info("Create HoodieMergeHandle implementation {} {}",
mergeHandleClasses.getLeft(), logContext);
+
+ Class<?>[] constructorParamTypes = new Class<?>[] {
+ HoodieWriteConfig.class, String.class, HoodieTable.class,
Iterator.class,
+ String.class, String.class, TaskContextSupplier.class, Option.class
+ };
+
+ return instantiateMergeHandle(
+ isFallbackEnabled, mergeHandleClasses.getLeft(),
mergeHandleClasses.getRight(), logContext, constructorParamTypes,
+ writeConfig, instantTime, table, recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
}
/**
@@ -84,19 +91,125 @@ public class HoodieMergeHandleFactory {
HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
- LOG.info("Get updateHandle for fileId {} and partitionPath {} at commit
{}", fileId, partitionPath, instantTime);
+
+ boolean isFallbackEnabled = writeConfig.isMergeHandleFallbackEnabled();
+ Pair<String, String> mergeHandleClasses =
getMergeHandleClassesCompaction(writeConfig, table);
+ String logContext = String.format("for fileId %s and partitionPath %s at
commit %s", fileId, partitionPath, instantTime);
+ LOG.info("Create HoodieMergeHandle implementation {} {}",
mergeHandleClasses.getLeft(), logContext);
+
+ Class<?>[] constructorParamTypes = new Class<?>[] {
+ HoodieWriteConfig.class, String.class, HoodieTable.class, Map.class,
+ String.class, String.class, HoodieBaseFile.class,
TaskContextSupplier.class, Option.class
+ };
+
+ return instantiateMergeHandle(
+ isFallbackEnabled, mergeHandleClasses.getLeft(),
mergeHandleClasses.getRight(), logContext, constructorParamTypes,
+ writeConfig, instantTime, table, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ }
+
+ /**
+ * Creates a merge handle for compaction with file group reader.
+ */
+ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> hoodieTable,
+ FileSlice fileSlice,
+ CompactionOperation operation,
+ TaskContextSupplier taskContextSupplier,
+ HoodieReaderContext<T> readerContext,
+ String maxInstantTime,
+ HoodieRecord.HoodieRecordType recordType) {
+
+ boolean isFallbackEnabled = config.isMergeHandleFallbackEnabled();
+
+ String mergeHandleClass = config.getCompactionMergeHandleClassName();
+ String logContext = String.format("for fileId %s and partitionPath %s at
commit %s", operation.getFileId(), operation.getPartitionPath(), instantTime);
+ LOG.info("Create HoodieMergeHandle implementation {} {}",
mergeHandleClass, logContext);
+
+ Class<?>[] constructorParamTypes = new Class<?>[] {
+ HoodieWriteConfig.class, String.class, HoodieTable.class,
FileSlice.class, CompactionOperation.class,
+ TaskContextSupplier.class, HoodieReaderContext.class, String.class,
HoodieRecord.HoodieRecordType.class
+ };
+
+ return instantiateMergeHandle(
+ isFallbackEnabled, mergeHandleClass,
COMPACT_MERGE_HANDLE_CLASS_NAME.defaultValue(), logContext,
constructorParamTypes,
+ config, instantTime, hoodieTable, fileSlice, operation,
taskContextSupplier, readerContext, maxInstantTime, recordType);
+ }
+
+ /**
+ * Helper method to instantiate a HoodieMergeHandle via reflection, with an
optional fallback.
+ */
+ private static <T, I, K, O> HoodieMergeHandle<T, I, K, O>
instantiateMergeHandle(
+ boolean isFallbackEnabled,
+ String primaryClass,
+ String fallbackClass,
+ String logContext,
+ Class<?>[] constructorParamTypes,
+ Object... initargs) {
+ try {
+ return (HoodieMergeHandle<T, I, K, O>)
ReflectionUtils.loadClass(primaryClass, constructorParamTypes, initargs);
+ } catch (Throwable e1) {
+ if (isFallbackEnabled && fallbackClass != null &&
!Objects.equals(primaryClass, fallbackClass)) {
+ try {
+ LOG.warn("HoodieMergeHandle implementation {} failed, now creating
fallback implementation {} {}",
+ primaryClass, fallbackClass, logContext);
+ return (HoodieMergeHandle<T, I, K, O>)
ReflectionUtils.loadClass(fallbackClass, constructorParamTypes, initargs);
+ } catch (Throwable e2) {
+ throw new HoodieException("Could not instantiate the fallback
HoodieMergeHandle implementation: " + fallbackClass, e2);
+ }
+ }
+ throw new HoodieException("Could not instantiate the HoodieMergeHandle
implementation: " + primaryClass, e1);
+ }
+ }
+
+ @VisibleForTesting
+ static Pair<String, String> getMergeHandleClassesWrite(WriteOperationType
operationType, HoodieWriteConfig writeConfig, HoodieTable table) {
+ String mergeHandleClass;
+ String fallbackMergeHandleClass = null;
+ // Overwrite to a different implementation for {@link
HoodieWriteMergeHandle} if sorting or CDC is enabled.
if (table.requireSortedRecords()) {
- return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table,
keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ mergeHandleClass =
HoodieSortedMergeHandleWithChangeLog.class.getName();
+ } else {
+ mergeHandleClass = HoodieSortedMergeHandle.class.getName();
+ }
+ } else if (!WriteOperationType.isChangingRecords(operationType) &&
writeConfig.allowDuplicateInserts()) {
+ mergeHandleClass = writeConfig.getConcatHandleClassName();
+ if
(!mergeHandleClass.equals(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.defaultValue()))
{
+ fallbackMergeHandleClass =
HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.defaultValue();
+ }
+ } else if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
+ mergeHandleClass = HoodieMergeHandleWithChangeLog.class.getName();
+ } else {
+ mergeHandleClass = writeConfig.getMergeHandleClassName();
+ if
(!mergeHandleClass.equals(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue()))
{
+ fallbackMergeHandleClass =
HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue();
+ }
+ }
+
+ return Pair.of(mergeHandleClass, fallbackMergeHandleClass);
+ }
+
+ @VisibleForTesting
+ static Pair<String, String>
getMergeHandleClassesCompaction(HoodieWriteConfig writeConfig, HoodieTable
table) {
+ String mergeHandleClass;
+ String fallbackMergeHandleClass = null;
+ // Overwrite to sorted implementation for {@link HoodieWriteMergeHandle}
if sorting is required.
+ if (table.requireSortedRecords()) {
+ mergeHandleClass = HoodieSortedMergeHandle.class.getName();
} else if (table.getMetaClient().getTableConfig().isCDCEnabled() &&
writeConfig.isYieldingPureLogForMor()) {
// IMPORTANT: only index type that yields pure log files need to enable
the cdc log files for compaction,
// index type such as the BLOOM does not need this because it would do
delta merge for inserts and generates log for updates,
// both of these two cases are already handled in HoodieCDCExtractor.
- return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime,
table, keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ mergeHandleClass = HoodieMergeHandleWithChangeLog.class.getName();
} else {
- return new HoodieMergeHandle<>(writeConfig, instantTime, table,
keyToNewRecords, partitionPath, fileId,
- dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ mergeHandleClass = writeConfig.getMergeHandleClassName();
+ if
(!mergeHandleClass.equals(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue()))
{
+ fallbackMergeHandleClass =
HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.defaultValue();
+ }
}
+
+ return Pair.of(mergeHandleClass, fallbackMergeHandleClass);
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 2f1b629096ff..eda78ecc4423 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -45,7 +45,7 @@ import java.util.Map;
/**
* A merge handle that supports logging change logs.
*/
-public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends
HoodieMergeHandle<T, I, K, O> {
+public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends
HoodieWriteMergeHandle<T, I, K, O> {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeHandleWithChangeLog.class);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index b5b0bd6e7be5..3327c781f985 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -46,7 +46,7 @@ import java.util.Queue;
* keys in newRecordKeys (sorted in-memory).
*/
@NotThreadSafe
-public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T,
I, K, O> {
+public class HoodieSortedMergeHandle<T, I, K, O> extends
HoodieWriteMergeHandle<T, I, K, O> {
private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();
@@ -61,10 +61,9 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I,
* Called by compactor code path.
*/
public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath,
String fileId,
- HoodieBaseFile dataFileToBeMerged, TaskContextSupplier
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, keyToNewRecordsOrig,
partitionPath, fileId, dataFileToBeMerged,
- taskContextSupplier, keyGeneratorOpt);
+ Map<String, HoodieRecord<T>>
keyToNewRecordsOrig, String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, keyToNewRecordsOrig,
partitionPath, fileId, dataFileToBeMerged, taskContextSupplier,
keyGeneratorOpt);
newRecordKeysSorted.addAll(keyToNewRecords.keySet());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
index 4d16876ff5b4..9ba999974d30 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandleWithChangeLog.java
@@ -48,8 +48,8 @@ public class HoodieSortedMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMerg
* Called by compactor code path.
*/
public HoodieSortedMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
+ Map<String, HoodieRecord<T>>
keyToNewRecords, String partitionPath, String fileId,
+ HoodieBaseFile
dataFileToBeMerged, TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
similarity index 74%
copy from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 7b81d728a358..8d389d7ba88e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -21,21 +21,17 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
-import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -47,8 +43,9 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -64,12 +61,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
/**
- * Handle to merge incoming records to those in storage.
+ * Handle to merge incoming records to those in storage row-by-row.
* <p>
* Simplified Logic:
* For every existing record
@@ -77,13 +73,12 @@ import java.util.Set;
* else write the record as is
* For all pending records from incoming batch, write to file.
*
- * <p>
* Illustration with simple data.
* Incoming data:
* rec1_2, rec4_2, rec5_1, rec6_1
* Existing data:
* rec1_1, rec2_1, rec3_1, rec4_1
- * <p>
+ *
* For every existing record, merge w/ incoming if required and write to
storage.
* => rec1_1 and rec1_2 is merged to write rec1_2 to storage
* => rec2_1 is written as is
@@ -91,62 +86,58 @@ import java.util.Set;
* => rec4_2 and rec4_1 is merged to write rec4_2 to storage
* Write all pending records from incoming set to storage
* => rec5_1 and rec6_1
- * <p>
+ *
* Final snapshot in storage
* rec1_2, rec2_1, rec3_1, rec4_2, rec5_1, rec6_1
+ *
+ * </p>
*/
@SuppressWarnings("Duplicates")
@NotThreadSafe
-public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K,
O> {
+public class HoodieWriteMergeHandle<T, I, K, O> extends
HoodieAbstractMergeHandle<T, I, K, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeHandle.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieWriteMergeHandle.class);
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter fileWriter;
- protected StoragePath newFilePath;
- protected StoragePath oldFilePath;
protected long recordsWritten = 0;
protected long recordsDeleted = 0;
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
- protected Option<BaseKeyGenerator> keyGeneratorOpt;
- protected HoodieBaseFile baseFileToMerge;
-
- protected Option<String[]> partitionFields = Option.empty();
- protected Object[] partitionValues = new Object[0];
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier,
getLatestBaseFile(hoodieTable, partitionPath, fileId),
keyGeneratorOpt);
}
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
- TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, false);
- init(recordItr);
- init(fileId, partitionPath, baseFile);
- validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+ public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier,
HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId,
taskContextSupplier, baseFile, keyGeneratorOpt, false);
+ populateIncomingRecordsMap(recordItr);
+ initMarkerFileAndFileWriter(fileId, partitionPath);
}
/**
* Called by compactor code path.
*/
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
- super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, true);
+ public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ Map<String, HoodieRecord<T>> keyToNewRecords,
String partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier,
+ Option<BaseKeyGenerator> keyGeneratorOpt) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId,
taskContextSupplier, dataFileToBeMerged, keyGeneratorOpt,
+ // preserveMetadata is disabled by default for MDT but enabled
otherwise
+ !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
this.keyToNewRecords = keyToNewRecords;
- init(fileId, this.partitionPath, dataFileToBeMerged);
- validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+ initMarkerFileAndFileWriter(fileId, this.partitionPath);
}
/**
- * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
+ * Used by `FileGroupReaderBasedMergeHandle`.
*
* @param config Hudi write config
* @param instantTime Instant time to use
@@ -155,65 +146,30 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
* @param hoodieTable {@link HoodieTable} instance
* @param taskContextSupplier Task context supplier
*/
- public HoodieMergeHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
- String fileId, HoodieTable<T, I, K, O> hoodieTable,
TaskContextSupplier taskContextSupplier) {
+ public HoodieWriteMergeHandle(HoodieWriteConfig config, String instantTime,
String partitionPath,
+ String fileId, HoodieTable<T, I, K, O>
hoodieTable, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier, true);
}
- private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator>
keyGeneratorOpt, boolean populateMetaFields) {
- ValidationUtils.checkArgument(populateMetaFields ==
!keyGeneratorOpt.isPresent());
- this.keyGeneratorOpt = keyGeneratorOpt;
- }
-
- public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?>
hoodieTable, String partitionPath, String fileId) {
- Option<HoodieBaseFile> baseFileOp =
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
- if (!baseFileOp.isPresent()) {
- throw new NoSuchElementException(String.format("FileID %s of partition
path %s does not exist.", fileId, partitionPath));
- }
- return baseFileOp.get();
+ @Override
+ public void doMerge() throws IOException {
+ HoodieMergeHelper.newInstance().runMerge(hoodieTable, this);
}
/**
- * Extract old file path, initialize StorageWriter and WriteStatus.
+ * Initialize marker file and file writer.
*/
- private void init(String fileId, String partitionPath, HoodieBaseFile
baseFileToMerge) {
- LOG.info("partitionPath:{}, fileId to be merged:{}", partitionPath,
fileId);
- this.baseFileToMerge = baseFileToMerge;
+ private void initMarkerFileAndFileWriter(String fileId, String
partitionPath) {
this.writtenRecordKeys = new HashSet<>();
- writeStatus.setStat(new HoodieWriteStat());
try {
- String latestValidFilePath = baseFileToMerge.getFileName();
- writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
- // At the moment, we only support SI for overwrite with latest payload.
So, we don't need to embed entire file slice here.
- // HUDI-8518 will be taken up to fix it for any payload during which we
might require entire file slice to be set here.
- // Already AppendHandle adds all logs file from current file slice to
HoodieDeltaWriteStat.
- writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
-
- HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(storage, instantTime,
- new StoragePath(config.getBasePath()),
- FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
- hoodieTable.getPartitionMetafileFormat());
- partitionMetadata.trySave();
-
- String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken,
fileId, hoodieTable.getBaseFileExtension());
- makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
-
- LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath,
newFilePath);
- // file name is same for all records, in this bunch
- writeStatus.setFileId(fileId);
- writeStatus.setPartitionPath(partitionPath);
- writeStatus.getStat().setPartitionPath(partitionPath);
- writeStatus.getStat().setFileId(fileId);
- setWriteStatusPath();
-
// Create Marker file,
// uses name of `newFilePath` instead of `newFileName`
- // in case the sub-class may roll over the file handle name.
+ // in case the subclass may roll over the file handle name.
createMarkerFile(partitionPath, newFilePath.getName());
-
// Create the writer for writing the new version file
- fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
newFilePath, hoodieTable.getStorage(),
- config, writeSchemaWithMetaFields, taskContextSupplier,
recordMerger.getRecordType());
+ fileWriter = HoodieFileWriterFactory.getFileWriter(
+ instantTime, newFilePath, hoodieTable.getStorage(),
+ config, writeSchemaWithMetaFields, taskContextSupplier,
getRecordType());
} catch (IOException io) {
LOG.error("Error in update task at commit {}", instantTime, io);
writeStatus.setGlobalError(io);
@@ -222,19 +178,14 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
}
- protected void setWriteStatusPath() {
- writeStatus.getStat().setPath(new StoragePath(config.getBasePath()),
newFilePath);
- }
-
- protected void makeOldAndNewFilePaths(String partitionPath, String
oldFileName, String newFileName) {
- oldFilePath = makeNewFilePath(partitionPath, oldFileName);
- newFilePath = makeNewFilePath(partitionPath, newFileName);
+ protected HoodieRecord.HoodieRecordType getRecordType() {
+ return recordMerger.getRecordType();
}
/**
* Initialize a spillable map for incoming records.
*/
- protected void initializeIncomingRecordsMap() {
+ protected void initIncomingRecordsMap() {
try {
// Load the new records in a map
long memoryForMerge =
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
@@ -260,8 +211,8 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
/**
* Load the new incoming records in a map and return partitionPath.
*/
- protected void init(Iterator<HoodieRecord<T>> newRecordsItr) {
- initializeIncomingRecordsMap();
+ protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>>
newRecordsItr) {
+ initIncomingRecordsMap();
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
// update the new location of the record, so we know where to find it
next
@@ -309,7 +260,8 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
writeInsertRecord(newRecord, schema, config.getProps());
}
- protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema,
Properties prop) {
+ protected void writeInsertRecord(HoodieRecord<T> newRecord, Schema schema,
Properties prop)
+ throws IOException {
if (writeRecord(newRecord, null, Option.of(newRecord), schema, prop,
HoodieOperation.isDelete(newRecord.getOperation()))) {
insertRecordsWritten++;
}
@@ -549,42 +501,4 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
baseFileToMerge.getCommitTime(), oldNumWrites));
}
}
-
- public Iterator<List<WriteStatus>> getWriteStatusesAsIterator() {
- List<WriteStatus> statuses = getWriteStatuses();
- // TODO(vc): This needs to be revisited
- if (getPartitionPath() == null) {
- LOG.info("Upsert Handle has partition path as null {}, {}",
getOldFilePath(), statuses);
- }
- return Collections.singletonList(statuses).iterator();
- }
-
- public StoragePath getOldFilePath() {
- return oldFilePath;
- }
-
- @Override
- public IOType getIOType() {
- return IOType.MERGE;
- }
-
- public HoodieBaseFile baseFileForMerge() {
- return baseFileToMerge;
- }
-
- public void setPartitionFields(Option<String[]> partitionFields) {
- this.partitionFields = partitionFields;
- }
-
- public Option<String[]> getPartitionFields() {
- return this.partitionFields;
- }
-
- public void setPartitionValues(Object[] partitionValues) {
- this.partitionValues = partitionValues;
- }
-
- public Object[] getPartitionValues() {
- return this.partitionValues;
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index a457cbf5b324..29fe5246b613 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -18,11 +18,20 @@
package org.apache.hudi.io;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import static
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
@@ -33,6 +42,8 @@ import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTI
import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
public class IOUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);
+
/**
* Dynamic calculation of max memory to use for spillable map. There is
always more than one task
* running on an executor and each task maintains a spillable map.
@@ -81,10 +92,40 @@ public class IOUtils {
}
public static long getMaxMemoryPerCompaction(TaskContextSupplier context,
Map<String, String> options) {
- if (options.containsKey(MAX_MEMORY_FOR_COMPACTION)) {
- return Long.parseLong(options.get(MAX_MEMORY_FOR_COMPACTION));
+ if (options.containsKey(MAX_MEMORY_FOR_COMPACTION.key())) {
+ return Long.parseLong(options.get(MAX_MEMORY_FOR_COMPACTION.key()));
}
String fraction =
options.getOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION.key(),
MAX_MEMORY_FRACTION_FOR_COMPACTION.defaultValue());
return getMaxMemoryAllowedForMerge(context, fraction);
}
+
+ /**
+ * Triggers the merge action with given merge handle {@code
HoodieMergeHandle}.
+ *
+ * <p>Note: it can be either regular write path merging
+ * or compact merging based on impls of the {@link HoodieMergeHandle}.
+ *
+ * @param mergeHandle The merge handle
+ * @param instantTime The instant time
+ * @param fileId The file ID
+ *
+ * @return the write status iterator
+ */
+ public static Iterator<List<WriteStatus>> runMerge(HoodieMergeHandle<?, ?,
?, ?> mergeHandle,
+ String instantTime,
+ String fileId) throws
IOException {
+ if (mergeHandle.getOldFilePath() == null) {
+ throw new HoodieUpsertException(
+ "Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
+ } else {
+ mergeHandle.doMerge();
+ }
+
+ // TODO(vc): This needs to be revisited
+ if (mergeHandle.getPartitionPath() == null) {
+ LOG.info("Upsert Handle has partition path as null " +
mergeHandle.getOldFilePath() + ", " + mergeHandle.getWriteStatuses());
+ }
+
+ return
Collections.singletonList(mergeHandle.getWriteStatuses()).iterator();
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 667e2a183a77..29f9ff7b4696 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -77,7 +77,6 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -86,7 +85,6 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
-import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
@@ -1207,12 +1205,4 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
}
return new HashSet<>(Arrays.asList(partitionFields.get()));
}
-
- public void runMerge(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime, String fileId) throws IOException {
- if (upsertHandle.getOldFilePath() == null) {
- throw new HoodieUpsertException("Error in finding the old file path at
commit " + instantTime + " for fileId: " + fileId);
- } else {
- HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
- }
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 138e6a840d4b..b96848ae40fc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.queue.HoodieConsumer;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
@@ -33,30 +33,30 @@ public abstract class BaseMergeHelper {
/**
* Read records from previous version of base file and merge.
* @param table Hoodie Table
- * @param upsertHandle Merge Handle
+ * @param mergeHandle Merge Handle
* @throws IOException in case of error
*/
- public abstract void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
+ public abstract void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieWriteMergeHandle<?, ?, ?, ?> mergeHandle) throws IOException;
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
protected static class UpdateHandler implements HoodieConsumer<HoodieRecord,
Void> {
- private final HoodieMergeHandle upsertHandle;
+ private final HoodieWriteMergeHandle mergeHandle;
- protected UpdateHandler(HoodieMergeHandle upsertHandle) {
- this.upsertHandle = upsertHandle;
+ protected UpdateHandler(HoodieWriteMergeHandle mergeHandle) {
+ this.mergeHandle = mergeHandle;
}
@Override
public void consume(HoodieRecord record) {
- upsertHandle.write(record);
+ mergeHandle.write(record);
}
@Override
public Void finish() {
- upsertHandle.close();
+ mergeHandle.close();
return null;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index f7b93b59457e..296a178c275f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -35,7 +35,7 @@ import
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
@@ -74,7 +74,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
@Override
public void runMerge(HoodieTable<?, ?, ?, ?> table,
- HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
+ HoodieWriteMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index f3757fe9e3ac..c9ddce099eeb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -44,7 +44,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FileGroupReaderBasedAppendHandle;
-import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
@@ -157,9 +158,9 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
HoodieTable table,
String maxInstantTime,
TaskContextSupplier taskContextSupplier)
throws IOException {
- FileGroupReaderBasedMergeHandle<T, ?, ?, ?> mergeHandle = new
FileGroupReaderBasedMergeHandle<>(writeConfig,
+ HoodieMergeHandle<T, ?, ?, ?> mergeHandle =
HoodieMergeHandleFactory.create(writeConfig,
instantTime, table, getFileSliceFromOperation(operation,
writeConfig.getBasePath()), operation, taskContextSupplier,
hoodieReaderContext, maxInstantTime, getEngineRecordType());
- mergeHandle.write();
+ mergeHandle.doMerge();
return mergeHandle.close();
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
new file mode 100644
index 000000000000..ca59cde89c66
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieMergeHandleFactory.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Properties;
+
+import static
org.apache.hudi.config.HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieMergeHandleFactory {
+
+ private static final String CUSTOM_MERGE_HANDLE =
"io.custom.CustomMergeHandle.java";
+ private static final String BASE_PATH = "base_path";
+
+ @Mock
+ private HoodieTable mockHoodieTable;
+ @Mock
+ private HoodieTableConfig mockHoodieTableConfig;
+ @Mock
+ private HoodieTableMetaClient mockMetaClient;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(mockHoodieTable.getMetaClient()).thenReturn(mockMetaClient);
+ when(mockMetaClient.getTableConfig()).thenReturn(mockHoodieTableConfig);
+ }
+
+ @Test
+ public void validateWriterPathFactoryImpl() {
+ // default case
+ Properties properties = new Properties();
+ properties.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
"false");
+ Pair mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieWriteMergeHandle.class.getName());
+
+ // sorted case
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+ when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(true);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieSortedMergeHandleWithChangeLog.class.getName());
+ when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(false);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieSortedMergeHandle.class.getName());
+
+ // non-sorted: no CDC cases
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+ Properties propsWithDups = new Properties();
+ propsWithDups.setProperty(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key(),
"true");
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT,
getWriterConfig(propsWithDups), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieConcatHandle.class.getName());
+
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(propsWithDups), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieWriteMergeHandle.class.getName());
+
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieWriteMergeHandle.class.getName());
+
+ // non-sorted: CDC enabled
+ when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(true);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(propsWithDups), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieMergeHandleWithChangeLog.class.getName());
+
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieMergeHandleWithChangeLog.class.getName());
+
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieMergeHandleWithChangeLog.class.getName());
+
+ // custom merge handle
+ when(mockHoodieTableConfig.isCDCEnabled()).thenReturn(false);
+ properties.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(),
CUSTOM_MERGE_HANDLE);
+ properties.setProperty(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.key(),
CUSTOM_MERGE_HANDLE);
+ propsWithDups.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(),
CUSTOM_MERGE_HANDLE);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE,
HoodieWriteMergeHandle.class.getName());
+
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.UPSERT,
getWriterConfig(properties), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieSortedMergeHandle.class.getName());
+
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT,
getWriterConfig(propsWithDups), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieConcatHandle.class.getName());
+
+
propsWithDups.setProperty(HoodieWriteConfig.CONCAT_HANDLE_CLASS_NAME.key(),
CUSTOM_MERGE_HANDLE);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesWrite(WriteOperationType.INSERT,
getWriterConfig(propsWithDups), mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE,
HoodieConcatHandle.class.getName());
+ }
+
+ @Test
+ public void validateCompactionPathFactoryImpl() {
+ // default case
+ Properties properties = new Properties();
+ Pair mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieWriteMergeHandle.class.getName());
+
+ // sorted case
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieSortedMergeHandle.class.getName());
+
+ // custom case
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(false);
+ properties.setProperty(HoodieWriteConfig.MERGE_HANDLE_CLASS_NAME.key(),
CUSTOM_MERGE_HANDLE);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses, CUSTOM_MERGE_HANDLE,
HoodieWriteMergeHandle.class.getName());
+
+ when(mockHoodieTable.requireSortedRecords()).thenReturn(true);
+ mergeHandleClasses =
HoodieMergeHandleFactory.getMergeHandleClassesCompaction(getWriterConfig(properties),
mockHoodieTable);
+ validateMergeClasses(mergeHandleClasses,
HoodieSortedMergeHandle.class.getName());
+
+ }
+
+ private void validateMergeClasses(Pair<String, String> mergeHandleClasses,
String expectedMergeHandleClasses) {
+ validateMergeClasses(mergeHandleClasses, expectedMergeHandleClasses, null);
+ }
+
+ private void validateMergeClasses(Pair<String, String> mergeHandleClasses,
String expectedMergeHandleClass, String expectedFallbackClass) {
+ Assertions.assertEquals(expectedMergeHandleClass,
mergeHandleClasses.getLeft());
+ Assertions.assertEquals(expectedFallbackClass,
mergeHandleClasses.getRight());
+ }
+
+ private HoodieWriteConfig getWriterConfig(Properties properties) {
+ return
HoodieWriteConfig.newBuilder().withPath(BASE_PATH).withProperties(properties).build();
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index 126ad3f47477..2888faf0368c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -41,7 +41,7 @@ import java.util.Iterator;
import java.util.List;
/**
- * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small
data buffers).
+ * A {@link HoodieWriteMergeHandle} that supports MERGE write
incrementally(small data buffers).
*
* <P>This handle is needed from the second mini-batch write for COW data
bucket
* when the data bucket is written using multiple mini-batches.
@@ -51,7 +51,7 @@ import java.util.List;
* behaves like the new data buffer are appended to the old file.
*/
public class FlinkMergeAndReplaceHandle<T, I, K, O>
- extends HoodieMergeHandle<T, I, K, O>
+ extends HoodieWriteMergeHandle<T, I, K, O>
implements MiniBatchHandle {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkMergeAndReplaceHandle.class);
@@ -135,7 +135,7 @@ public class FlinkMergeAndReplaceHandle<T, I, K, O>
}
@Override
- protected void initializeIncomingRecordsMap() {
+ protected void initIncomingRecordsMap() {
LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
// the incoming records are already buffered on heap and the underlying
bytes are managed by memory pool
// in Flink write buffer, so there is no need to use ExternalSpillableMap.
@@ -156,6 +156,7 @@ public class FlinkMergeAndReplaceHandle<T, I, K, O>
writeStatus.getStat().setPath(new StoragePath(config.getBasePath()),
oldFilePath);
}
+ @Override
boolean needsUpdateLocation() {
// No need to update location for Flink hoodie records because all the
records are pre-tagged
// with the desired locations.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 1d8078707e77..ab4862bbb975 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -41,7 +41,7 @@ import java.util.Iterator;
import java.util.List;
/**
- * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small
data buffers).
+ * A {@link HoodieWriteMergeHandle} that supports MERGE write
incrementally(small data buffers).
*
* <p>For a new data buffer, it initializes and set up the next file path to
write,
* and closes the file path when the data buffer write finish. When next data
buffer
@@ -52,7 +52,7 @@ import java.util.List;
* @see FlinkMergeAndReplaceHandle
*/
public class FlinkMergeHandle<T, I, K, O>
- extends HoodieMergeHandle<T, I, K, O>
+ extends HoodieWriteMergeHandle<T, I, K, O>
implements MiniBatchHandle {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkMergeHandle.class);
@@ -155,7 +155,7 @@ public class FlinkMergeHandle<T, I, K, O>
}
@Override
- protected void initializeIncomingRecordsMap() {
+ protected void initIncomingRecordsMap() {
LOG.info("Initialize on-heap keyToNewRecords for incoming records.");
// the incoming records are already buffered on heap and the underlying
bytes are managed by memory pool
// in Flink write buffer, so there is no need to use ExternalSpillableMap.
@@ -187,6 +187,7 @@ public class FlinkMergeHandle<T, I, K, O>
}
}
+ @Override
boolean needsUpdateLocation() {
// No need to update location for Flink hoodie records because all the
records are pre-tagged
// with the desired locations.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index d574125eb1eb..438c5d563df4 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -65,6 +65,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
+ @Override
protected boolean writeUpdateRecord(HoodieRecord<T> newRecord,
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema
writerSchema)
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
@@ -80,6 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
return result;
}
+ @Override
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
// TODO Remove these unnecessary newInstance invocations
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 815baf39d843..aee1da3a8134 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -41,10 +41,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -413,14 +414,8 @@ public class HoodieFlinkCopyOnWriteTable<T>
// always using avro record merger for legacy compaction since log scanner
do not support rowdata reading yet.
config.setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
- return handleUpdateInternal(upsertHandle, instantTime, fileId);
- }
-
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime,
- String fileId)
throws IOException {
- runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
+ HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
+ return IOUtils.runMerge(mergeHandle, instantTime, fileId);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index d156a6e6e1e4..d713ead1d53a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -32,8 +32,9 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.HoodieCreateHandle;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -166,7 +167,7 @@ public abstract class BaseFlinkCommitActionExecutor<T>
extends
// the second batch batch2 tries to reuse the same bucket
// and append instead of UPDATE.
return handleInsert(fileIdHint, recordItr);
- } else if (this.writeHandle instanceof HoodieMergeHandle) {
+ } else if (this.writeHandle instanceof HoodieWriteMergeHandle) {
return handleUpdate(partitionPath, fileIdHint, recordItr);
} else {
switch (bucketType) {
@@ -190,19 +191,13 @@ public abstract class BaseFlinkCommitActionExecutor<T>
extends
Iterator<HoodieRecord<T>>
recordItr)
throws IOException {
// This is needed since sometimes some buckets are never picked in
getPartition() and end up with 0 records
- HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?,
?>) this.writeHandle;
+ HoodieWriteMergeHandle<?, ?, ?, ?> upsertHandle =
(HoodieWriteMergeHandle<?, ?, ?, ?>) this.writeHandle;
if (upsertHandle.isEmptyNewRecords() && !recordItr.hasNext()) {
LOG.info("Empty partition with fileId => {}.", fileId);
return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
}
// these are updates
- return handleUpdateInternal(upsertHandle, fileId);
- }
-
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
- throws IOException {
- table.runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
+ return IOUtils.runMerge(upsertHandle, instantTime, fileId);
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index f5857ca8eb18..873d0f55d338 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -44,6 +44,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -282,14 +283,8 @@ public class HoodieJavaCopyOnWriteTable<T>
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile)
throws IOException {
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
- return handleUpdateInternal(upsertHandle, instantTime, fileId);
- }
-
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime,
- String fileId)
throws IOException {
- runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
+ HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
+ return IOUtils.runMerge(mergeHandle, instantTime, fileId);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 3e9974fc5ebe..092daf2cead2 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.storage.StoragePath;
@@ -241,17 +242,11 @@ public abstract class BaseJavaCommitActionExecutor<T>
extends
return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
}
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId,
recordItr);
- return handleUpdateInternal(upsertHandle, fileId);
+ HoodieMergeHandle<?, ?, ?, ?> mergeHandle = getUpdateHandle(partitionPath,
fileId, recordItr);
+ return IOUtils.runMerge(mergeHandle, instantTime, fileId);
}
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
- throws IOException {
- table.runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
- }
-
- protected HoodieMergeHandle getUpdateHandle(String partitionPath, String
fileId, Iterator<HoodieRecord<T>> recordItr) {
+ protected HoodieMergeHandle<?, ?, ?, ?> getUpdateHandle(String
partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!config.populateMetaFields()) {
try {
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index a8d4e5990f1b..0d2daf9fe34c 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -42,7 +42,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
@@ -261,9 +261,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
Pair<String, String> partitionAndBaseFilePaths =
getPartitionAndBaseFilePathsFromLatestCommitMetadata(metaClient);
HoodieBaseFile baseFile = new
HoodieBaseFile(partitionAndBaseFilePaths.getRight());
- HoodieMergeHandle handle = null;
+ HoodieWriteMergeHandle handle = null;
try {
- handle = new HoodieMergeHandle(config, instantTime, table, new
HashMap<>(),
+ handle = new HoodieWriteMergeHandle(config, instantTime, table, new
HashMap<>(),
partitionAndBaseFilePaths.getLeft(),
FSUtils.getFileId(baseFile.getFileName()), baseFile, new
JavaTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps())));
@@ -281,7 +281,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
config.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
// does the handle need to be closed to clean up the writer it contains?
- handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
+ handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
partitionAndBaseFilePaths.getLeft(),
FSUtils.getFileId(baseFile.getFileName()), baseFile, new
JavaTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps())));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index f10985ab3694..fd88f4108ba3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -85,6 +85,8 @@ class SparkReaderContextFactory implements
ReaderContextFactory<InternalRow> {
// Broadcast: Configuration.
Configuration configs = getHadoopConfiguration(jsc.hadoopConfiguration());
schemaEvolutionConfigs.forEach(configs::set);
+ configs.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(),
sqlConf.getConfString(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key()));
+ configs.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key(),
sqlConf.getConfString(SQLConf.PARQUET_WRITE_LEGACY_FORMAT().key()));
configurationBroadcast = jsc.broadcast(new
SerializableConfiguration(configs));
// Broadcast: ParquetReader.
// Spark parquet reader has to be instantiated on the driver and broadcast
to the executors
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 4de3cc12ae74..1e7af1c31e6d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -31,6 +31,8 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.SparkPartitionUtils;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -77,6 +80,7 @@ import
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,21 +241,32 @@ public class HoodieSparkCopyOnWriteTable<T>
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
oldDataFile) throws IOException {
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
- return handleUpdateInternal(upsertHandle, instantTime, fileId);
- }
-
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime,
- String fileId)
throws IOException {
- runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
+ HoodieMergeHandle mergeHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
+ return IOUtils.runMerge(mergeHandle, instantTime, fileId);
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
- Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile
dataFileToBeMerged) {
- Option<BaseKeyGenerator> keyGeneratorOpt =
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
- return HoodieMergeHandleFactory.create(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
+ Map<String, HoodieRecord<T>>
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+ Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+ if (!config.populateMetaFields()) {
+ try {
+ keyGeneratorOpt = Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(TypedProperties.copy(config.getProps())));
+ } catch (Exception e) {
+ throw new HoodieException("Only BaseKeyGenerator (or any key generator
that extends from BaseKeyGenerator) are supported when meta "
+ + "columns are disabled. Please choose the right key generator if
you wish to disable meta fields.", e);
+ }
+ }
+ HoodieMergeHandle mergeHandle = HoodieMergeHandleFactory.create(config,
instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ if (mergeHandle.getOldFilePath() != null &&
mergeHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
+ Option<String[]> partitionFields =
getMetaClient().getTableConfig().getPartitionFields();
+ Object[] partitionValues =
SparkPartitionUtils.getPartitionFieldVals(partitionFields,
mergeHandle.getPartitionPath(),
+ getMetaClient().getTableConfig().getBootstrapBasePath().get(),
+ mergeHandle.getWriterSchema(), (Configuration)
getStorageConf().unwrap());
+ mergeHandle.setPartitionFields(partitionFields);
+ mergeHandle.setPartitionValues(partitionValues);
+ }
+ return mergeHandle;
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 23c88718f183..a9276fd37902 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -30,16 +29,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkMetadataWriterFactory;
-import org.apache.hudi.table.action.commit.HoodieMergeHelper;
-import org.apache.hadoop.conf.Configuration;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
@@ -138,22 +133,4 @@ public abstract class HoodieSparkTable<T>
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}
-
- @Override
- public void runMerge(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime, String fileId) throws IOException {
- if (upsertHandle.getOldFilePath() == null) {
- throw new HoodieUpsertException("Error in finding the old file path at
commit " + instantTime + " for fileId: " + fileId);
- } else {
- if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
- Option<String[]> partitionFields =
getMetaClient().getTableConfig().getPartitionFields();
- Object[] partitionValues =
SparkPartitionUtils.getPartitionFieldVals(partitionFields,
upsertHandle.getPartitionPath(),
- getMetaClient().getTableConfig().getBootstrapBasePath().get(),
- upsertHandle.getWriterSchema(),
getStorageConf().unwrapAs(Configuration.class));
- upsertHandle.setPartitionFields(partitionFields);
- upsertHandle.setPartitionValues(partitionValues);
- }
- HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
- }
- }
-
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 8ae910934464..b2119ddf7ce2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.client.utils.SparkPartitionUtils;
+import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -45,10 +47,10 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieSparkIndexClient;
-import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
@@ -57,6 +59,7 @@ import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -379,19 +382,22 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
}
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId,
recordItr);
- return handleUpdateInternal(upsertHandle, fileId);
- }
-
- protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
- throws IOException {
- table.runMerge(upsertHandle, instantTime, fileId);
- return upsertHandle.getWriteStatusesAsIterator();
+ HoodieMergeHandle mergeHandle = getUpdateHandle(partitionPath, fileId,
recordItr);
+ return IOUtils.runMerge(mergeHandle, instantTime, fileId);
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String
fileId, Iterator<HoodieRecord<T>> recordItr) {
- return HoodieMergeHandleFactory.create(operationType, config, instantTime,
table, recordItr, partitionPath, fileId,
+ HoodieMergeHandle mergeHandle =
HoodieMergeHandleFactory.create(operationType, config, instantTime, table,
recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
+ if (mergeHandle.getOldFilePath() != null &&
mergeHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) {
+ Option<String[]> partitionFields =
table.getMetaClient().getTableConfig().getPartitionFields();
+ Object[] partitionValues =
SparkPartitionUtils.getPartitionFieldVals(partitionFields,
mergeHandle.getPartitionPath(),
+ table.getMetaClient().getTableConfig().getBootstrapBasePath().get(),
+ mergeHandle.getWriterSchema(), (Configuration)
table.getStorageConf().unwrap());
+ mergeHandle.setPartitionFields(partitionFields);
+ mergeHandle.setPartitionValues(partitionValues);
+ }
+ return mergeHandle;
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 687d92ccd65d..a4168958e6b5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -32,7 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.CreateHandleFactory;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
@@ -132,7 +132,7 @@ public class TestUpdateSchemaEvolution extends
HoodieSparkClientTestHarness impl
List<HoodieRecord>
updateRecords, String assertMsg, boolean isAssertThrow, Class
expectedExceptionType) {
jsc.parallelize(Arrays.asList(1)).map(x -> {
Executable executable = () -> {
- HoodieMergeHandle mergeHandle = new
HoodieMergeHandle(updateTable.getConfig(), "101", updateTable,
+ HoodieWriteMergeHandle mergeHandle = new
HoodieWriteMergeHandle(updateTable.getConfig(), "101", updateTable,
updateRecords.iterator(), updateRecords.get(0).getPartitionPath(),
insertResult.getFileId(), supplier, Option.empty());
List<GenericRecord> oldRecords =
HoodieIOFactory.getIOFactory(updateTable.getStorage())
.getFileFormatUtils(updateTable.getBaseFileFormat())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
similarity index 99%
rename from
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
rename to
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
index 590c32298823..2fab2abf24ec 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieDefaultMergeHandle.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@SuppressWarnings("unchecked")
-public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness {
+public class TestHoodieDefaultMergeHandle extends HoodieSparkClientTestHarness
{
@BeforeEach
public void setUp() throws Exception {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index be168c9d73dd..0362a619499f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -31,7 +31,7 @@ import
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -133,7 +133,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean enableOptimizedLogBlocksScan;
- protected FileGroupRecordBuffer<T> recordBuffer;
+ protected HoodieFileGroupRecordBuffer<T> recordBuffer;
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants;
// table version for compatibility
@@ -142,7 +142,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
List<String> logFilePaths,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
- Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
+ Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getRequiredSchema() : null;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index f6efc50426d0..36ee213c44f4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -22,8 +22,8 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -66,7 +66,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String>
logFilePaths, boolean reverseReader,
int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField, boolean forceFullScan,
Option<String> partitionName,
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
- FileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
+ HoodieFileGroupRecordBuffer<T>
recordBuffer, boolean allowInflightInstants) {
super(readerContext, metaClient, storage, logFilePaths, reverseReader,
bufferSize, instantRange, withOperationField,
forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
@@ -174,7 +174,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
- private FileGroupRecordBuffer<T> recordBuffer;
+ private HoodieFileGroupRecordBuffer<T> recordBuffer;
private boolean allowInflightInstants = false;
private HoodieTableMetaClient metaClient;
@@ -244,7 +244,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
- public Builder<T> withRecordBuffer(FileGroupRecordBuffer<T> recordBuffer) {
+ public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T>
recordBuffer) {
this.recordBuffer = recordBuffer;
return this;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index ec1da45c8c41..0881a95f6ae2 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -156,7 +156,7 @@ class ExpressionPayload(@transient record: GenericRecord,
}
if (resultRecordOpt == null) {
// If there is no condition matched, just filter this record.
- // here we return a IGNORE_RECORD, HoodieMergeHandle will not handle it.
+ // here we return a IGNORE_RECORD, HoodieWriteMergeHandle will not
handle it.
HOption.of(HoodieRecord.SENTINEL)
} else {
resultRecordOpt
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index a718a1f20c81..52ef94ab85fa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -78,7 +78,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
@@ -467,9 +467,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath);
- HoodieMergeHandle handle = null;
+ HoodieWriteMergeHandle handle = null;
try {
- handle = new HoodieMergeHandle(config, instantTime, table, new
HashMap<>(),
+ handle = new HoodieWriteMergeHandle(config, instantTime, table, new
HashMap<>(),
partitionPath, FSUtils.getFileId(baseFile.getFileName()),
baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
@@ -490,7 +490,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
final String newInstantTime = "006";
config.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
- handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
+ handle = new HoodieWriteMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
partitionPath, FSUtils.getFileId(baseFile.getFileName()),
baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(config.getProps())));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
index 4adbb5afa416..4452268fa445 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -93,7 +93,7 @@ public class TestMergeHandle extends BaseTestHandle {
List<HoodieRecord> newRecords =
dataGenerator.generateUniqueUpdates(instantTime, numUpdates);
int numDeletes = generateDeleteRecords(newRecords, dataGenerator,
instantTime);
assertTrue(numDeletes > 0);
- HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime,
table, newRecords.iterator(), partitionPath, fileId, new
LocalTaskContextSupplier(),
+ HoodieWriteMergeHandle mergeHandle = new HoodieWriteMergeHandle(config,
instantTime, table, newRecords.iterator(), partitionPath, fileId, new
LocalTaskContextSupplier(),
new HoodieBaseFile(fileGroup.getAllBaseFiles().findFirst().get()),
Option.empty());
HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
WriteStatus writeStatus = mergeHandle.writeStatus;