This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0e527404547 [HUDI-9405] Adding new writer apis and impl to Metadata
writer to support streaming writes (#13286)
0e527404547 is described below
commit 0e52740454768f2e81449c6472d71fbf9496857f
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jun 13 15:43:57 2025 +0530
[HUDI-9405] Adding new writer apis and impl to Metadata writer to support
streaming writes (#13286)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 216 ++++++++++++++++++---
.../hudi/metadata/HoodieTableMetadataWriter.java | 45 +++++
.../hudi/metadata/MetadataIndexGenerator.java | 114 +++++++++++
.../hudi/metadata/TestMetadataIndexGenerator.java | 121 ++++++++++++
.../FlinkHoodieBackedTableMetadataWriter.java | 20 +-
.../JavaHoodieBackedTableMetadataWriter.java | 18 ++
.../SparkHoodieBackedTableMetadataWriter.java | 47 ++++-
...ieBackedTableMetadataWriterTableVersionSix.java | 23 ++-
.../hudi/metadata/SparkMetadataWriterFactory.java | 5 +
.../java/org/apache/hudi/io/BaseTestHandle.java | 69 +++++++
.../io/KeyGeneratorForDataGeneratorRecords.java | 41 ++++
.../java/org/apache/hudi/io/TestAppendHandle.java | 96 +++++++++
.../java/org/apache/hudi/io/TestCreateHandle.java | 84 ++++++++
.../java/org/apache/hudi/io/TestMergeHandle.java | 84 ++++++++
.../apache/hudi/io/TestMetadataWriterCommit.java | 148 ++++++++++++++
15 files changed, 1103 insertions(+), 28 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 53c37db3ebd..33db4b9e822 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
@@ -92,6 +93,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -146,6 +148,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD_NAME =
HoodieMetadataPayload.KEY_FIELD_NAME;
+ // tracks the list of MDT partitions which can write to metadata table in a
streaming manner.
+ private static final List<MetadataPartitionType>
STREAMING_WRITES_SUPPORTED_PARTITIONS = Arrays.asList(RECORD_INDEX);
+
// Average size of a record saved within the record index.
// Record index has a fixed size schema. This has been calculated based on
experiments with default settings
// for block size (1MB), compression (GZ) and disabling the hudi metadata
fields.
@@ -165,7 +170,17 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
// Is the MDT bootstrapped and ready to be read from
boolean initialized = false;
+ boolean streamingWritesEnabled = false;
private HoodieTableFileSystemView metadataView;
+ private Option<MetadataIndexGenerator> metadataIndexGenerator;
+
+ protected HoodieBackedTableMetadataWriter(StorageConfiguration<?>
storageConf,
+ HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
+ HoodieEngineContext engineContext,
+ Option<String>
inflightInstantTimestamp) {
+ this(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp, false);
+ }
/**
* Hudi backed table metadata writer.
@@ -175,12 +190,15 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
* @param engineContext Engine context
* @param inflightInstantTimestamp Timestamp of any instant in progress
+ * @param streamingWrites For streaming writes, a long-living
write client is needed instead of been closed for each write.
+ * This flag will help to decide the
lifecycle of write client.
*/
protected HoodieBackedTableMetadataWriter(StorageConfiguration<?>
storageConf,
HoodieWriteConfig writeConfig,
HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
- Option<String>
inflightInstantTimestamp) {
+ Option<String>
inflightInstantTimestamp,
+ boolean streamingWrites) {
this.dataWriteConfig = writeConfig;
this.engineContext = engineContext;
this.storageConf = storageConf;
@@ -199,12 +217,20 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
}
}
ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT
Reader should have been opened post initialization");
+
+ this.metadataIndexGenerator = streamingWrites ?
Option.of(initializeMetadataIndexGenerator()) : Option.empty();
+ this.streamingWritesEnabled = streamingWrites;
}
List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig
metadataConfig, HoodieTableMetaClient metaClient) {
return MetadataPartitionType.getEnabledPartitions(metadataConfig,
metaClient);
}
+ /**
+ * Returns the utilities for metadata index generation.
+ */
+ abstract MetadataIndexGenerator initializeMetadataIndexGenerator();
+
private void mayBeReinitMetadataReader() {
if (metadata == null || metadataMetaClient == null ||
metadata.getMetadataFileSystemView() == null) {
initMetadataReader();
@@ -359,10 +385,9 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
/**
* Initialize the Metadata Table by listing files and partitions from the
file system.
*
- * @param dataTableInstantTime data table instant time on which the
metadata table
- * is initialized upon
- * @param partitionsToInit list of MDT partitions to initialize
- * @param inflightInstantTimestamp current action instant responsible for
this initialization
+ * @param dataTableInstantTime Timestamp to use for the commit
+ * @param partitionsToInit List of MDT partitions to initialize
+ * @param inflightInstantTimestamp Current action instant responsible for
this initialization
*/
private boolean initializeFromFilesystem(String dataTableInstantTime,
List<MetadataPartitionType> partitionsToInit, Option<String>
inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
@@ -722,12 +747,13 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
/**
* Fetch record locations from FileSlice snapshot.
- * @param engineContext context ot use.
- * @param partitionFileSlicePairs list of pairs of partition and file slice.
+ *
+ * @param engineContext context ot use.
+ * @param partitionFileSlicePairs list of pairs of partition and file
slice.
* @param recordIndexMaxParallelism parallelism to use.
- * @param activeModule active module of interest.
- * @param metaClient metaclient instance to use.
- * @param dataWriteConfig write config to use.
+ * @param activeModule active module of interest.
+ * @param metaClient metaclient instance to use.
+ * @param dataWriteConfig write config to use.
* @return
*/
private static <T> HoodieData<HoodieRecord>
readRecordKeysFromFileSliceSnapshot(HoodieEngineContext engineContext,
@@ -846,7 +872,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
/**
* Function to find hoodie partitions and list files in them in parallel.
*
- * @param initializationTime Files which have a timestamp after this are
neglected
+ * @param initializationTime Files which have a timestamp after this are
neglected
* @param pendingDataInstants Pending instants on data set
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
@@ -905,7 +931,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
/**
* Function to find hoodie partitions and list files in them in parallel
from MDT.
*
- * @param initializationTime Files which have a timestamp after this are
neglected
+ * @param initializationTime Files which have a timestamp after this are
neglected
* @param pendingDataInstants Files coming from pending instants are
neglected
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
@@ -1093,6 +1119,124 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ throw new HoodieMetadataException("Starting the same commit in Metadata
table more than once w/o rolling back : " + instantTime);
+ }
+
+ // this is where we might instantiate the write client to metadata table
for the first time.
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ @Override
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ List<MetadataPartitionType> partitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ partitionsToTag.remove(FILES);
+ partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+ if (partitionsToTag.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag,
dataWriteConfig));
+
+ // tag records w/ location
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
+
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+
+ // write partial writes to MDT table (for those partitions where streaming
writes are enabled)
+ HoodieData<WriteStatus> writeStatusCollection =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
+ // dag not yet de-referenced. do not invoke any action on
writeStatusCollection yet.
+ return writeStatusCollection;
+ }
+
+ /**
+ * Upsert the tagged records to metadata table in the streaming flow.
+ * @param fileGroupIdToTaggedRecords Pair of {@link List} of {@link
HoodieFileGroupId} referring to list of all file groups ids that could receive
updates
+ * and {@link HoodieData} of tagged {@link
HoodieRecord}s.
+ * @param instantTime Instant time of interest.
+ */
+ protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> partialWriteStats, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new ArrayList<>(partialWriteStats);
+ // update metadata for left over partitions which does not have streaming
writes support.
+ allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context,
metadata, instantTime).map(WriteStatus::getStat).collectAsList());
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ private HoodieData<WriteStatus>
prepareAndWriteToNonStreamingPartitions(HoodieEngineContext context,
HoodieCommitMetadata commitMetadata, String instantTime) {
+ Set<String> partitionsToUpdate =
getNonStreamingMetadataPartitionsToUpdate();
+ Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords =
new BatchMetadataConversionFunction(instantTime, commitMetadata,
partitionsToUpdate)
+ .convertMetadata();
+
+ HoodieData<HoodieRecord> untaggedMdtRecords = context.emptyHoodieData();
+ for (Map.Entry<String, HoodieData<HoodieRecord>> entry:
mdtPartitionsAndUnTaggedRecords.entrySet()) {
+ untaggedMdtRecords = untaggedMdtRecords.union(entry.getValue());
+ }
+
+ // write to mdt table for non-streaming mdt partitions
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedRecords =
tagRecordsWithLocationForStreamingWrites(untaggedMdtRecords,
partitionsToUpdate);
+ HoodieData<WriteStatus> writeStatusHoodieData =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords,
instantTime));
+ return writeStatusHoodieData;
+ }
+
+ private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
+ Set<String> toReturn =
enabledPartitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ STREAMING_WRITES_SUPPORTED_PARTITIONS.forEach(metadataPartitionType ->
toReturn.remove(metadataPartitionType.getPartitionPath()));
+ return toReturn;
+ }
+
+ /**
+ * Returns a pair of updated fileId list in the partition and the tagged MDT
records.
+ * @param untaggedRecords Untagged MDT records.
+ */
+ protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
tagRecordsWithLocationForStreamingWrites(HoodieData<HoodieRecord>
untaggedRecords,
+
Set<String> enabledMetadataPartitions) {
+ List<HoodieFileGroupId> updatedFileGroupIds = new ArrayList<>();
+ // Fetch latest file slices for all enabled MDT partitions
+ Map<String, List<FileSlice>> partitionToLatestFileSlices = new HashMap<>();
+ try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+ enabledMetadataPartitions.forEach(partitionName -> {
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ if (fileSlices.isEmpty()) {
+ // scheduling or initialising of INDEX only initializes the file
group and not add commit
+ // so if there are no committed file slices, look for inflight slices
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionName),
+ String.format("Partition %s should be part of inflight metadata
partitions here %s", partitionName,
dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
+ fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ }
+ partitionToLatestFileSlices.put(partitionName, fileSlices);
+ final int fileGroupCount = fileSlices.size();
+ fileSlices.forEach(fileSlice ->
updatedFileGroupIds.add(fileSlice.getFileGroupId()));
+ ValidationUtils.checkArgument(fileGroupCount > 0,
String.format("FileGroup count for MDT partition %s should be > 0",
partitionName));
+ });
+ }
+
+ HoodieData<HoodieRecord> taggedRecords = untaggedRecords.map(mdtRecord -> {
+ String mdtPartition = mdtRecord.getPartitionPath();
+ List<FileSlice> latestFileSlices =
partitionToLatestFileSlices.get(mdtPartition);
+ FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(mdtRecord.getRecordKey(),
+ latestFileSlices.size()));
+ mdtRecord.unseal();
+ mdtRecord.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ mdtRecord.seal();
+ return mdtRecord;
+ });
+
+ return Pair.of(updatedFileGroupIds, taggedRecords);
+ }
+
/**
* Update from {@code HoodieCommitMetadata}.
*
@@ -1102,26 +1246,44 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
@Override
public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
mayBeReinitMetadataReader();
- processAndCommit(instantTime, () -> {
+ processAndCommit(instantTime, new
BatchMetadataConversionFunction(instantTime, commitMetadata,
getMetadataPartitionsToUpdate()));
+ closeInternal();
+ }
+
+ /**
+ * Hoodie commit metadata conversion function for non-streaming writes.
+ */
+ private class BatchMetadataConversionFunction implements
ConvertMetadataFunction {
+
+ private final HoodieCommitMetadata commitMetadata;
+ private final String instantTime;
+ private final Set<String> partitionsToUpdate;
+ public BatchMetadataConversionFunction(String instantTime,
HoodieCommitMetadata commitMetadata, Set<String> partitionsToUpdate) {
+ this.instantTime = instantTime;
+ this.commitMetadata = commitMetadata;
+ this.partitionsToUpdate = partitionsToUpdate;
+ }
+
+ @Override
+ public Map<String, HoodieData<HoodieRecord>> convertMetadata() {
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient, getTableMetadata(),
dataWriteConfig.getMetadataConfig(),
- getMetadataPartitionsToUpdate(),
dataWriteConfig.getBloomFilterType(),
+ partitionsToUpdate, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
- if
(getMetadataPartitionsToUpdate().contains(RECORD_INDEX.getPartitionPath())) {
+ if (partitionsToUpdate.contains(RECORD_INDEX.getPartitionPath())) {
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()),
commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
partitionToRecordMap.get(RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateExpressionIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
instantTime);
return partitionToRecordMap;
- });
- closeInternal();
+ }
}
/**
@@ -1369,10 +1531,12 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
*/
protected abstract I
convertHoodieDataToEngineSpecificData(HoodieData<HoodieRecord> records);
+ protected abstract HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(O records);
+
protected void commitInternal(String instantTime, Map<String,
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
- Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result =
prepRecords(partitionRecordsMap);
+ Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result =
tagRecordsWithLocation(partitionRecordsMap, isInitializing);
HoodieData<HoodieRecord> preppedRecords = result.getKey();
I preppedRecordInputs =
convertHoodieDataToEngineSpecificData(preppedRecords);
@@ -1424,6 +1588,8 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O>
writeClient, String instantTime, I preppedRecordInputs);
+ protected abstract void upsertAndCommit(BaseHoodieWriteClient<?, I, ?, O>
writeClient, String instantTime, I preppedRecordInputs, List<HoodieFileGroupId>
fileGroupsIdsToUpdate);
+
/**
* Rolls back any failed writes if cleanup policy is EAGER. If any writes
were cleaned up, the meta client is reloaded.
* @param dataWriteConfig write config for the data table
@@ -1468,7 +1634,7 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
* @return Pair of {@link HoodieData} of {@link HoodieRecord} referring to
the prepared records to be ingested to metadata table and
* List of {@link HoodieFileGroupId} containing all file groups from the
partitions being written in metadata table.
*/
- protected Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>>
prepRecords(Map<String, HoodieData<HoodieRecord>> partitionRecordsMap) {
+ protected Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>>
tagRecordsWithLocation(Map<String, HoodieData<HoodieRecord>>
partitionRecordsMap, boolean isInitializing) {
// The result set
HoodieData<HoodieRecord> allPartitionRecords =
engineContext.emptyHoodieData();
try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
@@ -1479,8 +1645,10 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
if (fileSlices.isEmpty()) {
- // scheduling of INDEX only initializes the file group and not add
commit
+ // scheduling or initialising of INDEX only initializes the file
group and not add commit
// so if there are no committed file slices, look for inflight slices
+ ValidationUtils.checkState(isInitializing ||
dataMetaClient.getTableConfig().getMetadataPartitionsInflight().contains(partitionName),
+ String.format("Partition %s should be part of inflight metadata
partitions here %s", partitionName,
dataMetaClient.getTableConfig().getMetadataPartitionsInflight()));
fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
}
final int fileGroupCount = fileSlices.size();
@@ -1547,11 +1715,11 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
String metadataTableName = writeClient.getConfig().getTableName();
boolean tableNameExists = StringUtils.nonEmpty(metadataTableName);
String executionDurationMetricName = tableNameExists
- ? String.format("%s.%s", metadataTableName,
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION)
- : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION;
+ ? String.format("%s.%s", metadataTableName,
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION)
+ : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_DURATION;
String executionStatusMetricName = tableNameExists
- ? String.format("%s.%s", metadataTableName,
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS)
- : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS;
+ ? String.format("%s.%s", metadataTableName,
HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS)
+ : HoodieMetadataMetrics.TABLE_SERVICE_EXECUTION_STATUS;
long timeSpent = metadataTableServicesTimer.endTimer();
metrics.ifPresent(m -> m.setMetric(executionDurationMetricName,
timeSpent));
if (allTableServicesExecutedSuccessfullyOrSkipped) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 116d76bbc0b..8ffdb0a2720 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,8 +22,11 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
@@ -36,6 +39,48 @@ import java.util.List;
*/
public interface HoodieTableMetadataWriter<I,O> extends Serializable,
AutoCloseable {
+ /**
+ * Starts a new commit in metadata table for streaming write flow.
+ *
+ * @param instantTime The instant time of interest.
+ */
+ void startCommit(String instantTime);
+
+ /**
+ * Prepare records and write to MDT table for all eligible partitions except
FILES partition.
+ *
+ * <p>This will be used in streaming writes, where in data table
write-statuses are maintained as HoodieData,
+ * prepares records and write to MDT table partitions (except FILES).
+ *
+ * <p>Caution: that no actions should be triggered on the incoming
HoodieData<WriteStatus>
+ * and the writes to metadata table. Caller is expected to trigger #collect
just once for both set of HoodieData<WriteStatus>.
+ *
+ * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data
table writes.
+ * @param instantTime instant time of interest.
+ *
+ * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata
table.
+ */
+ HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime);
+
+ /**
+ * Completes the multiple commits in streaming writes.
+ *
+ * <p>The streaming writes work flow:
+ *
+ * <ol>
+ * <li>writes the inputs in data table o all data files;</li>
+ * <li>update metadata table partitions in {@link
#streamWriteToMetadataPartitions};</li>
+ * <li>finalizes the writes for data table;</li>
+ * <li>caller invokes this method to update metadata table for other
non-streaming enabled partitions(included FILES), and finally completes the
commit in metadata table.</li>
+ * </ol>
+ *
+ * @param instantTime Instant time of interest.
+ * @param context The engine context {@link HoodieEngineContext}.
+ * @param partialWriteStats List<HoodieWriteStat> for partial/streaming
writes to metadata table completed so far.
+ * @param commitMetadata The data table {@link HoodieCommitMetadata}.
+ */
+ void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> partialWriteStats, HoodieCommitMetadata
commitMetadata);
+
/**
* Builds the given metadata partitions to create index.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
new file mode 100644
index 00000000000..f8809d31291
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once <a href="https://github.com/apache/hudi/pull/13226">Refactor MDT
update logic with Indexer</a> is landed,
+ * we will leverage the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+ /**
+ * MDT record transformation utility. This function is expected to be
invoked from a map Partition call, where one spark task will receive
+ * one WriteStatus as input and the output contains prepared MDT table
records for all eligible partitions that can operate on one
+ * WriteStatus instance only.
+ */
+ static class WriteStatusBasedMetadataIndexMapper implements
SerializableFunction<WriteStatus, Iterator<HoodieRecord>> {
+ private final List<MetadataPartitionType> enabledPartitionTypes;
+ private final HoodieWriteConfig dataWriteConfig;
+
+ public WriteStatusBasedMetadataIndexMapper(List<MetadataPartitionType>
enabledPartitionTypes, HoodieWriteConfig dataWriteConfig) {
+ this.enabledPartitionTypes = enabledPartitionTypes;
+ this.dataWriteConfig = dataWriteConfig;
+ }
+
+ @Override
+ public Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws
Exception {
+ List<HoodieRecord> allRecords = new ArrayList<>();
+ if (enabledPartitionTypes.contains(RECORD_INDEX)) {
+ allRecords.addAll(processWriteStatusForRLI(writeStatus,
dataWriteConfig));
+ }
+ // yet to add support for more partitions.
+ // bloom filter
+ // secondary index
+ // expression index.
+ return allRecords.iterator();
+ }
+ }
+
+ protected static List<HoodieRecord> processWriteStatusForRLI(WriteStatus
writeStatus, HoodieWriteConfig dataWriteConfig) {
+ List<HoodieRecord> allRecords = new ArrayList<>();
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+ if (recordDelegate.getIgnoreIndexUpdate()) {
+ continue;
+ }
+ HoodieRecord hoodieRecord;
+ Option<HoodieRecordLocation> newLocation =
recordDelegate.getNewLocation();
+ if (newLocation.isPresent()) {
+ if (recordDelegate.getCurrentLocation().isPresent()) {
+ // This is an update, no need to update index if the location has
not changed
+ // newLocation should have the same fileID as currentLocation. The
instantTimes differ as newLocation's
+ // instantTime refers to the current commit which was completed.
+ if
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
{
+ final String msg = String.format("Detected update in location of
record with key %s from %s to %s. The fileID should not change.",
+ recordDelegate, recordDelegate.getCurrentLocation().get(),
newLocation.get());
+ LOG.error(msg);
+ throw new HoodieMetadataException(msg);
+ }
+ // for updates, we can skip updating RLI partition in MDT
+ } else {
+ // Insert new record case
+ hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
+ recordDelegate.getRecordKey(),
recordDelegate.getPartitionPath(),
+ newLocation.get().getFileId(),
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+ allRecords.add(hoodieRecord);
+ }
+ } else {
+ // Delete existing index for a deleted record
+ hoodieRecord =
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
+ allRecords.add(hoodieRecord);
+ }
+ }
+ }
+ return allRecords;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
new file mode 100644
index 00000000000..188e93099ff
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMetadataIndexGenerator extends HoodieCommonTestHarness {
+
+ @Test
+ public void testRLIIndexMapperWithInsertsAndUpserts() throws Exception {
+ // Generate write status with inserts and upserts
+ WriteStatus writeStatus = new WriteStatus(true, 0.0d);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = dataGenerator.generateInserts("001", 10);
+ for (int i = 0; i < records.size(); i++) {
+ HoodieRecord record = records.get(i);
+ String fileId = UUID.randomUUID().toString();
+ if (i < 5) {
+ // 5 updates
+ record.setCurrentLocation(new
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+ }
+ // 5 inserts
+ record.setNewLocation(new
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+ writeStatus.markSuccess(record, Option.empty());
+ }
+
+ // Generate RLI records
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("random")
+ .build();
+ MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper
writeStatusBasedMetadataIndexMapper = new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
+ Collections.singletonList(MetadataPartitionType.RECORD_INDEX),
writeConfig);
+ Iterator<HoodieRecord> rliRecords =
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+ AtomicInteger totalRLIRecords = new AtomicInteger();
+ rliRecords.forEachRemaining(rliRecord -> {
+ totalRLIRecords.getAndIncrement();
+
+ // verify RLI metadata
+ HoodieRecord<HoodieMetadataPayload> record = rliRecord;
+ assertEquals(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
record.getKey().getPartitionPath());
+ HoodieRecordIndexInfo recordIndexInfo =
record.getData().recordIndexMetadata;
+ assertNotNull(recordIndexInfo);
+ assertTrue(StringUtils.nonEmpty(recordIndexInfo.getPartitionName()));
+ assertNotNull(recordIndexInfo.getFileIdHighBits());
+ assertNotNull(recordIndexInfo.getFileIdLowBits());
+ });
+ // RLI is only updated for inserts
+ assertEquals(5, totalRLIRecords.get());
+ }
+
+ @Test
+ public void testRLIIndexGeneratorWithDeletes() throws Exception {
+ // Generate write status with deletes
+ WriteStatus writeStatus = new WriteStatus(true, 0.0d);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = dataGenerator.generateInserts("001", 10);
+ for (int i = 0; i < records.size(); i++) {
+ HoodieRecord record = records.get(i);
+ String fileId = UUID.randomUUID().toString();
+ record.setCurrentLocation(new
HoodieRecordLocation(InProcessTimeGenerator.createNewInstantTime(), fileId));
+ writeStatus.markSuccess(record, Option.empty());
+ }
+
+ // Generate RLI records
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("random")
+ .build();
+ MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper
writeStatusBasedMetadataIndexMapper = new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
+ Collections.singletonList(MetadataPartitionType.RECORD_INDEX),
writeConfig);
+ Iterator<HoodieRecord> rliRecords =
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+ AtomicInteger totalRLIRecords = new AtomicInteger();
+ rliRecords.forEachRemaining(rliRecord -> {
+ totalRLIRecords.getAndIncrement();
+
+ // verify RLI record payload is EmptyHoodieRecordPayload
+ HoodieRecord<EmptyHoodieRecordPayload> record = rliRecord;
+ assertEquals(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
record.getKey().getPartitionPath());
+ assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+ });
+ // RLI is only updated for inserts
+ assertEquals(10, totalRLIRecords.get());
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 9ebf0753964..e1911da1f67 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -110,6 +112,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
return records.collectAsList();
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(List<WriteStatus> records) {
+ return HoodieListData.lazy(records);
+ }
+
@Override
protected void bulkCommit(String instantTime, String partitionName,
HoodieData<HoodieRecord> records, int fileGroupCount) {
// TODO: functional and secondary index are not supported with Flink yet,
but we should fix the partition name when we support them.
@@ -120,7 +127,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
protected void commitInternal(String instantTime, Map<String,
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is
not fully initialized yet.");
- HoodieData<HoodieRecord> preppedRecords =
prepRecords(partitionRecordsMap).getKey();
+ HoodieData<HoodieRecord> preppedRecords =
tagRecordsWithLocation(partitionRecordsMap, isInitializing).getKey();
List<HoodieRecord> preppedRecordList = preppedRecords.collectAsList();
// Flink engine does not optimize initialCommit to MDT as bulk insert is
not yet supported
@@ -209,6 +216,17 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
writeClient.commit(instantTime, writeStatusJavaRDD);
}
+ @Override
+ protected void upsertAndCommit(BaseHoodieWriteClient<?, List<HoodieRecord>,
?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord>
preppedRecordInputs,
+ List<HoodieFileGroupId>
fileGroupsIdsToUpdate) {
+ throw new UnsupportedOperationException("Not implemented for Flink engine
yet");
+ }
+
+ @Override
+ MetadataIndexGenerator initializeMetadataIndexGenerator() {
+ throw new UnsupportedOperationException("Streaming writes are not
supported for Flink");
+ }
+
@Override
protected EngineType getEngineType() {
return EngineType.FLINK;
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index c820f4539d5..149ddf6407f 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -22,9 +22,11 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -61,6 +63,11 @@ public class JavaHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetada
super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
}
+ @Override
+ MetadataIndexGenerator initializeMetadataIndexGenerator() {
+ throw new UnsupportedOperationException("Streaming writes are not
supported for Java");
+ }
+
public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
@@ -102,6 +109,11 @@ public class JavaHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetada
return records.collectAsList();
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(List<WriteStatus> records) {
+ return HoodieListData.lazy(records);
+ }
+
@Override
protected void bulkCommit(String instantTime, String partitionName,
HoodieData<HoodieRecord> records, int fileGroupCount) {
commitInternal(instantTime, Collections.singletonMap(partitionName,
records), true, Option.of(new JavaHoodieMetadataBulkInsertPartitioner()));
@@ -120,6 +132,12 @@ public class JavaHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetada
writeClient.commit(instantTime, writeStatusJavaRDD);
}
+ @Override
+ protected void upsertAndCommit(BaseHoodieWriteClient<?, List<HoodieRecord>,
?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord>
preppedRecordInputs,
+ List<HoodieFileGroupId>
fileGroupsIdsToUpdate) {
+ throw new UnsupportedOperationException("Not implemented for Java engine
yet");
+ }
+
@Override
protected BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>>
initializeWriteClient() {
return new HoodieJavaWriteClient(engineContext, metadataWriteConfig);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 6a4088f0879..a98d7ae47d6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -116,7 +117,16 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
Option<String>
inflightInstantTimestamp) {
- super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
+ this(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp, false);
+ }
+
+ SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf,
+ HoodieWriteConfig writeConfig,
+ HoodieFailedWritesCleaningPolicy
failedWritesCleaningPolicy,
+ HoodieEngineContext engineContext,
+ Option<String> inflightInstantTimestamp,
+ boolean streamingWrites) {
+ super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp, streamingWrites);
}
@Override
@@ -147,7 +157,23 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
@Override
- protected void bulkInsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ public JavaRDD<WriteStatus>
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> fileGroupIdToTaggedRecords, String instantTime) {
+ JavaRDD<HoodieRecord> mdtRecords =
HoodieJavaRDD.getJavaRDD(fileGroupIdToTaggedRecords.getValue());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
String.format("Upserting with instant %s into metadata table %s", instantTime,
metadataWriteConfig.getTableName()));
+ JavaRDD<WriteStatus> metadataWriteStatusesSoFar =
getSparkWriteClient(Option.empty()).upsertPreppedRecords(mdtRecords,
instantTime, Option.of(
+ fileGroupIdToTaggedRecords.getKey()));
+ return metadataWriteStatusesSoFar;
+ }
+
+ @Override
+ protected void bulkInsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient,
+ String instantTime,
+ JavaRDD<HoodieRecord> preppedRecordInputs,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
JavaRDD<WriteStatus> writeStatusJavaRDD =
writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime,
bulkInsertPartitioner);
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
@@ -159,6 +185,15 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
}
+ @Override
+ protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient,
+ String instantTime,
+ JavaRDD<HoodieRecord> preppedRecordInputs,
+ List<HoodieFileGroupId>
fileGroupsIdsToUpdate) {
+ JavaRDD<WriteStatus> writeStatusJavaRDD =
getSparkWriteClient(Option.of(writeClient)).upsertPreppedRecords(preppedRecordInputs,
instantTime, Option.of(fileGroupsIdsToUpdate));
+ writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
+ }
+
@Override
protected void bulkCommit(
String instantTime, String partitionName, HoodieData<HoodieRecord>
records,
@@ -241,6 +276,14 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
return exprIndexRecords;
}
+ protected MetadataIndexGenerator initializeMetadataIndexGenerator() {
+ return new MetadataIndexGenerator();
+ }
+
+ protected SparkRDDMetadataWriteClient
getSparkWriteClient(Option<BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?,
JavaRDD<WriteStatus>>> writeClientOpt) {
+ return ((SparkRDDMetadataWriteClient)
writeClientOpt.orElse(getWriteClient()));
+ }
+
@Override
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?,
JavaRDD<WriteStatus>> initializeWriteClient() {
return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig,
Option.empty());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 802a2a66a85..1aa2a88a038 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDMetadataWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -57,6 +59,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
public class SparkHoodieBackedTableMetadataWriterTableVersionSix extends
HoodieBackedTableMetadataWriterTableVersionSix<JavaRDD<HoodieRecord>,
JavaRDD<WriteStatus>> {
@@ -92,6 +95,11 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
}
+ @Override
+ MetadataIndexGenerator initializeMetadataIndexGenerator() {
+ throw new UnsupportedOperationException("Streaming writes are not
supported for Spark table version six");
+ }
+
@Override
protected void initRegistry() {
if (metadataWriteConfig.isMetricsOn()) {
@@ -119,6 +127,11 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ throw new HoodieNotSupportedException("Unsupported flow for table version
6");
+ }
+
@Override
protected void bulkInsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
@@ -132,6 +145,13 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
}
+ @Override
+ protected void upsertAndCommit(BaseHoodieWriteClient<?,
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+ List<HoodieFileGroupId>
fileGroupsIdsToUpdate) {
+ JavaRDD<WriteStatus> writeStatusJavaRDD =
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
+ writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(),
DELTA_COMMIT_ACTION, Collections.emptyMap());
+ }
+
@Override
protected void bulkCommit(
String instantTime, String partitionName, HoodieData<HoodieRecord>
records,
@@ -148,7 +168,8 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
actionType);
- writeClient.deletePartitions(partitionsToDrop, instantTime);
+ HoodieWriteResult result = writeClient.deletePartitions(partitionsToDrop,
instantTime);
+ writeClient.commit(instantTime, result.getWriteStatuses(), Option.empty(),
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
index 3c6c9c4c1c3..3e5507213cb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkMetadataWriterFactory.java
@@ -49,6 +49,11 @@ public class SparkMetadataWriterFactory {
}
}
+ public static HoodieTableMetadataWriter
createWithStreamingWrites(StorageConfiguration<?> conf, HoodieWriteConfig
writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
+
HoodieEngineContext context, Option<String> inflightInstantTimestamp) {
+ return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig,
failedWritesCleaningPolicy, context, inflightInstantTimestamp, true);
+ }
+
public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
HoodieWriteConfig writeConfig, HoodieEngineContext context, HoodieTableConfig
tableConfig) {
return create(conf, writeConfig, context, Option.empty(), tableConfig);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
new file mode 100644
index 00000000000..910c5549e7a
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/BaseTestHandle.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class BaseTestHandle extends HoodieSparkClientTestHarness {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initSparkContexts("TestHoodieRowCreateHandle");
+ initPath();
+ initHoodieStorage();
+ initTestDataGenerator();
+ initMetaClient();
+ initTimelineService();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ Pair<WriteStatus, List<HoodieRecord>> createParquetFile(HoodieWriteConfig
config, HoodieTable table, String partitionPath,
+ String fileId,
String instantTime, HoodieTestDataGenerator dataGenerator) {
+ List<HoodieRecord> records = dataGenerator.generateInserts(instantTime,
100);
+ Map<String, HoodieRecord> recordMap = new HashMap<>();
+ for (int i = 0; i < records.size(); i++) {
+ recordMap.put(String.valueOf(i), records.get(i));
+ }
+ HoodieCreateHandle handle = new HoodieCreateHandle(config, instantTime,
table, partitionPath, fileId, recordMap, new LocalTaskContextSupplier());
+ handle.write();
+ handle.close();
+ return Pair.of(handle.writeStatus, records);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
new file mode 100644
index 00000000000..b93e32a87a0
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/KeyGeneratorForDataGeneratorRecords.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class KeyGeneratorForDataGeneratorRecords extends BuiltinKeyGenerator {
+
+ public KeyGeneratorForDataGeneratorRecords(TypedProperties config) {
+ super(config);
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return record.get("_row_key").toString();
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return record.get("partition_path").toString();
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
new file mode 100644
index 00000000000..770627b35eb
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestAppendHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieAppendHandle}.
+ */
+public class TestAppendHandle extends BaseTestHandle {
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testAppendHandleRLIStats(boolean populateMetaFields) {
+ // init config and table
+ HoodieWriteConfig config = getConfigBuilder(basePath)
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+ .withPopulateMetaFields(populateMetaFields)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+ .build();
+
+ HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+ // one round per partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+ // init some args
+ String fileId = UUID.randomUUID().toString();
+ String instantTime = "000";
+
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
+ // create parquet file
+ createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
+ // generate update records
+ instantTime = "001";
+ List<HoodieRecord> records =
dataGenerator.generateUniqueUpdates(instantTime, 50);
+ HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime,
table, partitionPath, fileId, records.iterator(), new
LocalTaskContextSupplier());
+ Map<String, HoodieRecord> recordMap = new HashMap<>();
+ for (int i = 0; i < records.size(); i++) {
+ recordMap.put(String.valueOf(i), records.get(i));
+ }
+ // write the update records
+ handle.write(recordMap);
+ WriteStatus writeStatus = handle.writeStatus;
+ handle.close();
+
+ assertEquals(records.size(), writeStatus.getTotalRecords());
+ assertEquals(0, writeStatus.getTotalErrorRecords());
+ // validate write status has all record delegates
+ if (populateMetaFields) {
+ assertEquals(records.size(),
writeStatus.getWrittenRecordDelegates().size());
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ assertTrue(recordDelegate.getNewLocation().isPresent());
+ assertEquals(fileId,
recordDelegate.getNewLocation().get().getFileId());
+ assertEquals(instantTime,
recordDelegate.getNewLocation().get().getInstantTime());
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
new file mode 100644
index 00000000000..672bb342879
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class TestCreateHandle extends BaseTestHandle {
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testCreateHandleRLIStats(boolean populateMetaFields) {
+ // init config and table
+ HoodieWriteConfig config = getConfigBuilder(basePath)
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+ .withPopulateMetaFields(populateMetaFields)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+ .build();
+
+ HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+ // one round per partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+ // init some args
+ String fileId = UUID.randomUUID().toString();
+ String instantTime = "000";
+
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
+ Pair<WriteStatus, List<HoodieRecord>> statusListPair =
createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
+ WriteStatus writeStatus = statusListPair.getLeft();
+ List<HoodieRecord> records = statusListPair.getRight();
+
+ assertEquals(records.size(), writeStatus.getTotalRecords());
+ assertEquals(0, writeStatus.getTotalErrorRecords());
+ // validate write status has all record delegates
+ if (populateMetaFields) {
+ assertEquals(records.size(),
writeStatus.getWrittenRecordDelegates().size());
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ assertTrue(recordDelegate.getNewLocation().isPresent());
+ assertEquals(fileId,
recordDelegate.getNewLocation().get().getFileId());
+ assertEquals(instantTime,
recordDelegate.getNewLocation().get().getInstantTime());
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
new file mode 100644
index 00000000000..c7035727ce5
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieMergeHandle}.
+ */
+public class TestMergeHandle extends BaseTestHandle {
+
+ @Test
+ public void testMergeHandleRLIStats() throws IOException {
+ // init config and table
+ HoodieWriteConfig config = getConfigBuilder(basePath)
+ .withPopulateMetaFields(false)
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+ .build();
+ HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf),
metaClient);
+ // one round per partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+ // init some args
+ String fileId = UUID.randomUUID().toString();
+ String instantTime = "000";
+
+ // Create a parquet file
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
+ Pair<WriteStatus, List<HoodieRecord>> statusListPair =
createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
+ WriteStatus writeStatus = statusListPair.getLeft();
+ List<HoodieRecord> records = statusListPair.getRight();
+ assertEquals(records.size(), writeStatus.getTotalRecords());
+ assertEquals(0, writeStatus.getTotalErrorRecords());
+
+ instantTime = "001";
+ List<HoodieRecord> updates =
dataGenerator.generateUniqueUpdates(instantTime, 10);
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime,
table, updates.iterator(), partitionPath, fileId,
table.getTaskContextSupplier(),
+ new HoodieBaseFile(writeStatus.getStat().getPath()), Option.of(new
KeyGeneratorForDataGeneratorRecords(config.getProps())));
+ HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
+ writeStatus = mergeHandle.writeStatus;
+ // verify stats after merge
+ assertEquals(records.size(), writeStatus.getStat().getNumWrites());
+ assertEquals(10, writeStatus.getStat().getNumUpdateWrites());
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
new file mode 100644
index 00000000000..0cbbdb23ea6
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Unit tests Metadata writer APIs with streaming.
+ */
+public class TestMetadataWriterCommit extends BaseTestHandle {
+
+ @Test
+ public void testCreateHandleRLIStats() throws IOException {
+ // init config and table
+ HoodieWriteConfig config = getConfigBuilder(basePath)
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordIndex(true)
+ .withMetadataIndexColumnStats(false)
+ .withSecondaryIndexEnabled(false)
+ .withStreamingWriteEnabled(true)
+ .build())
+ .build();
+
+ HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+ // one round per partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+ // init some args
+ String fileId = UUID.randomUUID().toString();
+ String instantTime = InProcessTimeGenerator.createNewInstantTime();
+
+ // create a parquet file and obtain corresponding write status
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
+ Pair<WriteStatus, List<HoodieRecord>> statusListPair =
createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
+ WriteStatus writeStatus = statusListPair.getLeft();
+ List<HoodieRecord> records = statusListPair.getRight();
+ HoodieCommitMetadata commitMetadata =
createCommitMetadata(writeStatus.getStat(), partitionPath);
+
+ assertEquals(records.size(), writeStatus.getTotalRecords());
+ assertEquals(0, writeStatus.getTotalErrorRecords());
+
+ // create mdt writer
+ HoodieBackedTableMetadataWriter mdtWriter =
(HoodieBackedTableMetadataWriter)
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+ HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+ HoodieTableMetaClient mdtMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() +
"/metadata").setConf(storageConf).build();
+ assertEquals(2,
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
+
+ // Create commit in MDT
+ mdtWriter = (HoodieBackedTableMetadataWriter)
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+ HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+ mdtWriter.startCommit(instantTime);
+ HoodieData<WriteStatus> mdtWriteStatus =
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
context, 1), instantTime);
+ List<HoodieWriteStat> mdtWriteStats =
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats,
commitMetadata);
+ // 3 bootstrap commits for 2 enabled partitions, 1 commit due to update
+ assertEquals(3,
mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants());
+
+ // verify commit metadata
+ HoodieCommitMetadata mdtCommitMetadata =
mdtMetaClient.getActiveTimeline().readCommitMetadata(mdtMetaClient.getActiveTimeline().lastInstant().get());
+ // 2 partitions should be seen in the commit metadata - FILES and Record
index
+ assertEquals(2, mdtCommitMetadata.getPartitionToWriteStats().size());
+ assertEquals(1,
mdtCommitMetadata.getPartitionToWriteStats().get(FILES.getPartitionPath()).size());
+ assertEquals(10,
mdtCommitMetadata.getPartitionToWriteStats().get(RECORD_INDEX.getPartitionPath()).size());
+
assertFalse(mdtCommitMetadata.getPartitionToWriteStats().containsKey(COLUMN_STATS.getPartitionPath()));
+
+ // Create commit in MDT with col stats enabled
+
config.getMetadataConfig().setValue(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS,
"true");
+ instantTime = InProcessTimeGenerator.createNewInstantTime();
+ mdtWriter = (HoodieBackedTableMetadataWriter)
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+ HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+ mdtWriter.startCommit(instantTime);
+ mdtWriteStatus =
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
context, 1), instantTime);
+ mdtWriteStats =
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats,
commitMetadata);
+ // 3 bootstrap commits for 3 enabled partitions, 2 commits due to update
+ assertEquals(5,
mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants());
+
+ // Verify commit metadata
+ mdtCommitMetadata =
mdtMetaClient.getActiveTimeline().readCommitMetadata(mdtMetaClient.getActiveTimeline().lastInstant().get());
+ // 3 partitions should be seen in the commit metadata - FILES, Record
index and Column stats
+ assertEquals(3, mdtCommitMetadata.getPartitionToWriteStats().size());
+ assertEquals(1,
mdtCommitMetadata.getPartitionToWriteStats().get(FILES.getPartitionPath()).size());
+ assertEquals(10,
mdtCommitMetadata.getPartitionToWriteStats().get(RECORD_INDEX.getPartitionPath()).size());
+ assertEquals(2,
mdtCommitMetadata.getPartitionToWriteStats().get(COLUMN_STATS.getPartitionPath()).size());
+ }
+
+ public static HoodieCommitMetadata createCommitMetadata(HoodieWriteStat
writeStat, String partitionPath) {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addMetadata("test", "test");
+ commitMetadata.addWriteStat(partitionPath, writeStat);
+ commitMetadata.setOperationType(WriteOperationType.INSERT);
+ return commitMetadata;
+ }
+}