This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3018c492e42 [HUDI-8564] Removing WriteStatus references in Hoodie
Metadata writer flow (#12321)
3018c492e42 is described below
commit 3018c492e420a045d4614e96e5705543b1f09eb0
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 27 21:45:21 2024 -0800
[HUDI-8564] Removing WriteStatus references in Hoodie Metadata writer flow
(#12321)
- Fixing RLI and SI record generation in MDT to not rely on
RDD<WriteStatus> as it has edge cases wrt spark task retries. This patch does
on-demand reads from base and log files to generate RLI and SI records for MDT.
---------
Co-authored-by: vinoth chandar <[email protected]>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 6 +-
.../hudi/client/BaseHoodieTableServiceClient.java | 14 +-
.../apache/hudi/client/BaseHoodieWriteClient.java | 15 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 4 +
.../metadata/HoodieBackedTableMetadataWriter.java | 85 +---
.../hudi/metadata/HoodieTableMetadataWriter.java | 8 +-
.../hudi/table/action/BaseActionExecutor.java | 6 +-
.../action/commit/BaseCommitActionExecutor.java | 4 +-
.../index/WriteStatBasedIndexingCatchupTask.java | 2 +-
.../common/testutils/HoodieMetadataTestTable.java | 12 +-
.../hudi/client/HoodieFlinkTableServiceClient.java | 14 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 13 +-
.../commit/BaseFlinkCommitActionExecutor.java | 4 +-
.../hudi/client/HoodieJavaTableServiceClient.java | 7 -
.../apache/hudi/client/HoodieJavaWriteClient.java | 2 +-
.../commit/BaseJavaCommitActionExecutor.java | 3 +-
.../hudi/client/SparkRDDTableServiceClient.java | 5 -
.../apache/hudi/client/SparkRDDWriteClient.java | 2 +-
.../SparkBootstrapCommitActionExecutor.java | 2 +-
.../commit/BaseSparkCommitActionExecutor.java | 2 +-
.../hudi/client/TestSparkRDDWriteClient.java | 2 +-
.../functional/TestConsistentBucketIndex.java | 2 +-
.../TestMetadataUtilRLIandSIRecordGeneration.java | 506 +++++++++++++++++++++
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 2 +-
.../TestHoodieSparkMergeOnReadTableCompaction.java | 4 +-
.../TestSparkNonBlockingConcurrencyControl.java | 11 +-
.../hudi/testutils/HoodieCleanerTestBase.java | 2 +-
.../apache/hudi/common/model/HoodieWriteStat.java | 17 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 30 +-
.../hudi/metadata/BaseFileRecordParsingUtils.java | 186 ++++++++
.../hudi/metadata/HoodieTableMetadataUtil.java | 197 ++++++++
.../hudi/common/model/TestHoodieWriteStat.java | 6 +
.../hudi/sink/clustering/ClusteringCommitSink.java | 4 +-
.../internal/DataSourceInternalWriterHelper.java | 2 +-
.../TestSparkConsistentBucketClustering.java | 4 +-
.../functional/TestSparkSortAndSizeClustering.java | 2 +-
.../hudi/functional/RecordLevelIndexTestBase.scala | 4 +-
.../hudi/functional/TestRecordLevelIndex.scala | 28 +-
.../offlinejob/HoodieOfflineJobTestBase.java | 2 +-
39 files changed, 1037 insertions(+), 184 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 9de7f647092..c054a817b52 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -24,7 +24,6 @@ import
org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
-import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -269,14 +268,13 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
- * @param writeStatuses Write statuses of the commit
*/
- protected void writeTableMetadata(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
+ protected void writeTableMetadata(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to
metadata table: " + config.getTableName());
Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
- metadataWriter.updateFromWriteStatuses(metadata, writeStatuses,
instantTime);
+ metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index c6d1d8048fa..e96a6707247 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -31,7 +31,6 @@ import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
-import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -341,7 +340,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
this.txnManager.beginTransaction(Option.of(compactionInstant),
Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
- writeTableMetadata(table, compactionCommitTime, metadata,
context.emptyHoodieData());
+ writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {}", compactionCommitTime);
LOG.debug("Compaction {} finished with result: {}",
compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
@@ -404,7 +403,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
- writeTableMetadata(table, logCompactionCommitTime, metadata,
context.emptyHoodieData());
+ writeTableMetadata(table, logCompactionCommitTime, metadata);
LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
LOG.debug("Log Compaction {} finished with result {}",
logCompactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table,
logCompactionCommitTime, metadata);
@@ -490,7 +489,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
- completeClustering((HoodieReplaceCommitMetadata)
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant,
Option.ofNullable(convertToWriteStatus(writeMetadata)));
+ completeClustering((HoodieReplaceCommitMetadata)
clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
}
return clusteringMetadata;
}
@@ -522,12 +521,9 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
protected abstract HoodieWriteMetadata<O>
convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);
- protected abstract HoodieData<WriteStatus>
convertToWriteStatus(HoodieWriteMetadata<T> writeMetadata);
-
private void completeClustering(HoodieReplaceCommitMetadata metadata,
HoodieTable table,
- String clusteringCommitTime,
- Option<HoodieData<WriteStatus>>
writeStatuses) {
+ String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant =
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
@@ -542,7 +538,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
preCommit(metadata);
}
// Update table's metadata (table)
- writeTableMetadata(table, clusteringInstant.requestedTime(), metadata,
writeStatuses.orElseGet(context::emptyHoodieData));
+ writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);
LOG.info("Committing Clustering {}", clusteringCommitTime);
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime,
metadata);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5781b0cb87a..0dffda05a59 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -34,7 +34,6 @@ import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -216,12 +215,12 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc);
- public boolean commitStats(String instantTime, HoodieData<WriteStatus>
writeStatuses, List<HoodieWriteStat> stats, Option<Map<String, String>>
extraMetadata,
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType) {
- return commitStats(instantTime, writeStatuses, stats, extraMetadata,
commitActionType, Collections.emptyMap(), Option.empty());
+ return commitStats(instantTime, stats, extraMetadata, commitActionType,
Collections.emptyMap(), Option.empty());
}
- public boolean commitStats(String instantTime, HoodieData<WriteStatus>
writeStatuses, List<HoodieWriteStat> stats,
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String,
List<String>> partitionToReplaceFileIds,
Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
@@ -243,7 +242,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (extraPreCommitFunc.isPresent()) {
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
- commit(table, commitActionType, instantTime, metadata, stats,
writeStatuses);
+ commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
@@ -282,7 +281,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
- List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
@@ -293,7 +292,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
- writeTableMetadata(table, instantTime, metadata, writeStatuses);
+ writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT,
commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(),
metadata));
}
@@ -1627,7 +1626,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime,
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
- commitStats(instantTime, context.emptyHoodieData(),
Collections.emptyList(), Option.of(extraMeta), commitActionType);
+ commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta),
commitActionType);
}
private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 1bf6f6b0138..707c86dd73a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -171,6 +171,10 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+ // At the moment, we only support SI for overwrite with latest payload.
So, we don't need to embed entire file slice here.
+ // HUDI-8518 will be taken up to fix it for any payload during which we
might require entire file slice to be set here.
+ // Already AppendHandle adds all logs file from current file slice to
HoodieDeltaWriteStat.
+ writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
HoodiePartitionMetadata partitionMetadata = new
HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e05ab249dca..a09ea08eadf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
@@ -42,7 +41,6 @@ import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -1043,29 +1041,28 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
/**
* Update from {@code HoodieCommitMetadata}.
- *
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
@Override
- public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus, String instantTime) {
+ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
processAndCommit(instantTime, () -> {
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getWritesFileIdEncoding(),
+ dataWriteConfig.getMetadataConfig());
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
- HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpserts(writeStatus);
- HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
- partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
updatesFromWriteStatuses.union(additionalUpdates));
+ HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()),
commitMetadata);
+ partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
}
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
- updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
writeStatus);
+ updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap,
instantTime);
return partitionToRecordMap;
});
closeInternal();
@@ -1079,7 +1076,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
engineContext, dataWriteConfig, commitMetadata, instantTime,
dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
- dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig());
+ dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
+ dataWriteConfig.getWritesFileIdEncoding(),
dataWriteConfig.getMetadataConfig());
HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(),
records.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
@@ -1126,7 +1124,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
}
- private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap,
HoodieData<WriteStatus> writeStatus) {
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap,
+ String instantTime) {
if (!dataWriteConfig.isSecondaryIndexEnabled()) {
return;
}
@@ -1145,7 +1144,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
- secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, writeStatus);
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
partition, instantTime);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
}
@@ -1153,20 +1152,13 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
});
}
- private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs =
getPartitionFilePairs(commitMetadata);
// Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
- // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
- // operation is going to be expensive (in CPU, memory and IO)
- List<String> keysToRemove = new ArrayList<>();
- writeStatus.collectAsList().forEach(status -> {
- status.getWrittenRecordDelegates().forEach(recordDelegate -> {
- // Consider those keys which were either updated or deleted in this
commit
- if (!recordDelegate.getNewLocation().isPresent() ||
(recordDelegate.getCurrentLocation().isPresent() &&
recordDelegate.getNewLocation().isPresent())) {
- keysToRemove.add(recordDelegate.getRecordKey());
- }
- });
- });
+ // the secondary index partition for each of these keys.
+ List<String> keysToRemove =
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext,
commitMetadata, dataWriteConfig.getMetadataConfig(),
+ dataMetaClient, instantTime);
+
HoodieIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
// Fetch the secondary keys that each of the record keys ('keysToRemove')
maps to
// This is obtained by scanning the entire secondary index partition in
the metadata table
@@ -1671,51 +1663,6 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- /**
- * Return records that represent upserts to the record index due to write
operation on the dataset.
- *
- * @param writeStatuses {@code WriteStatus} from the write operation
- */
- private HoodieData<HoodieRecord>
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
- return writeStatuses.flatMap(writeStatus -> {
- List<HoodieRecord> recordList = new LinkedList<>();
- for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
- if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
- if (recordDelegate.getIgnoreIndexUpdate()) {
- continue;
- }
- HoodieRecord hoodieRecord;
- Option<HoodieRecordLocation> newLocation =
recordDelegate.getNewLocation();
- if (newLocation.isPresent()) {
- if (recordDelegate.getCurrentLocation().isPresent()) {
- // This is an update, no need to update index if the location
has not changed
- // newLocation should have the same fileID as currentLocation.
The instantTimes differ as newLocation's
- // instantTime refers to the current commit which was completed.
- if
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
{
- final String msg = String.format("Detected update in location
of record with key %s from %s to %s. The fileID should not change.",
- recordDelegate, recordDelegate.getCurrentLocation().get(),
newLocation.get());
- LOG.error(msg);
- throw new HoodieMetadataException(msg);
- }
- // for updates, we can skip updating RLI partition in MDT
- } else {
- // Insert new record case
- hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
- recordDelegate.getRecordKey(),
recordDelegate.getPartitionPath(),
- newLocation.get().getFileId(),
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
- recordList.add(hoodieRecord);
- }
- } else {
- // Delete existing index for a deleted record
- hoodieRecord =
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
- recordList.add(hoodieRecord);
- }
- }
- }
- return recordList.iterator();
- });
- }
-
private HoodieData<HoodieRecord>
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata
replaceCommitMetadata) {
try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs =
replaceCommitMetadata
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
index 7578949a8e0..119c97f0fce 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -57,16 +56,15 @@ public interface HoodieTableMetadataWriter extends
Serializable, AutoCloseable {
/**
* Update the metadata table due to a COMMIT operation.
- *
* @param commitMetadata commit metadata of the operation of interest.
* @param instantTime instant time of the commit.
*/
- void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatuses, String instantTime);
+ void update(HoodieCommitMetadata commitMetadata, String instantTime);
/**
* Update the metadata table due to a COMMIT or REPLACECOMMIT operation.
- * As compared to {@link #updateFromWriteStatuses(HoodieCommitMetadata,
HoodieData, String)}, this method
- * directly updates metadata with the given records, instead of first
converting {@link WriteStatus} to {@link HoodieRecord}.
+ * As compared to {@link #update(HoodieCommitMetadata, String)}, this method
+ * directly updates metadata with the given records, instead of generating
HoodieRecords based on HoodieCommitMetadata.
*
* @param commitMetadata commit metadata of the operation of interest.
* @param records records to update metadata with.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index c4ca5677832..5940129c91d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -21,8 +21,6 @@ package org.apache.hudi.table.action;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
@@ -72,7 +70,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
*
* @param metadata commit metadata of interest.
*/
- protected final void writeTableMetadata(HoodieCommitMetadata metadata,
HoodieData<WriteStatus> writeStatus, String actionType) {
+ protected final void writeTableMetadata(HoodieCommitMetadata metadata,
String actionType) {
// Recreate MDT for insert_overwrite_table operation.
if (table.getConfig().isMetadataTableEnabled()
&& WriteOperationType.INSERT_OVERWRITE_TABLE ==
metadata.getOperationType()) {
@@ -83,7 +81,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
- metadataWriter.updateFromWriteStatuses(metadata, writeStatus,
instantTime);
+ metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index ea20983819f..055fd7e10ba 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -211,7 +211,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
protected abstract void commit(HoodieWriteMetadata<O> result);
- protected void commit(HoodieData<WriteStatus> writeStatuses,
HoodieWriteMetadata<O> result, List<HoodieWriteStat> writeStats) {
+ protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat>
writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType + ",
operation Type " + operationType);
result.setCommitted(true);
@@ -222,7 +222,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = result.getCommitMetadata().get();
- writeTableMetadata(metadata, writeStatuses, actionType);
+ writeTableMetadata(metadata, actionType);
// cannot serialize maps with null values
metadata.getExtraMetadata().entrySet().removeIf(entry ->
entry.getValue() == null);
activeTimeline.saveAsComplete(false,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
index 727028e1eb6..25230049a76 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
@@ -50,6 +50,6 @@ public class WriteStatBasedIndexingCatchupTask extends
AbstractIndexingCatchupTa
public void updateIndexForWriteAction(HoodieInstant instant) throws
IOException {
HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant,
metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- metadataWriter.updateFromWriteStatuses(commitMetadata,
engineContext.emptyHoodieData(), instant.requestedTime());
+ metadataWriter.update(commitMetadata, instant.requestedTime());
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index b4a2f5a2d46..14c4885b4d0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -92,7 +92,7 @@ public class HoodieMetadataTestTable extends HoodieTestTable {
partitionToFilesNameLengthMap, bootstrap, true);
if (writer != null && !createInflightCommit) {
writer.performTableServices(Option.of(commitTime));
- writer.updateFromWriteStatuses(commitMetadata,
context.get().emptyHoodieData(), commitTime);
+ writer.update(commitMetadata, commitTime);
}
// DT should be committed after MDT.
if (!createInflightCommit) {
@@ -110,7 +110,7 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
public HoodieTestTable moveInflightCommitToComplete(String instantTime,
HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCommitToComplete(instantTime, metadata);
if (writer != null) {
- writer.updateFromWriteStatuses(metadata,
context.get().emptyHoodieData(), instantTime);
+ writer.update(metadata, instantTime);
}
return this;
}
@@ -119,7 +119,7 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
public HoodieTestTable moveInflightCompactionToComplete(String instantTime,
HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCompactionToComplete(instantTime, metadata);
if (writer != null) {
- writer.updateFromWriteStatuses(metadata,
context.get().emptyHoodieData(), instantTime);
+ writer.update(metadata, instantTime);
}
return this;
}
@@ -146,7 +146,7 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
public HoodieTestTable addCompaction(String instantTime,
HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
- writer.updateFromWriteStatuses(commitMetadata,
context.get().emptyHoodieData(), instantTime);
+ writer.update(commitMetadata, instantTime);
}
return this;
}
@@ -177,7 +177,7 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
super.addReplaceCommit(instantTime, requestedReplaceMetadata,
inflightReplaceMetadata, completeReplaceMetadata);
if (writer != null) {
- writer.updateFromWriteStatuses(completeReplaceMetadata,
context.get().emptyHoodieData(), instantTime);
+ writer.update(completeReplaceMetadata, instantTime);
}
return this;
}
@@ -187,7 +187,7 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
HoodieReplaceCommitMetadata
completeReplaceMetadata) throws Exception {
super.addCluster(instantTime, requestedReplaceMetadata,
inflightReplaceMetadata, completeReplaceMetadata);
if (writer != null) {
- writer.updateFromWriteStatuses(completeReplaceMetadata,
context.get().emptyHoodieData(), instantTime);
+ writer.update(completeReplaceMetadata, instantTime);
}
return this;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
index 5fac2d9a037..3874acdd429 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java
@@ -20,8 +20,6 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -88,7 +86,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes.
We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata
table will result in conflicts since all of them updates the same partition.
- writeTableMetadata(table, compactionCommitTime, metadata,
context.emptyHoodieData());
+ writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {} finished with result {}.",
compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
} finally {
@@ -113,8 +111,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
protected void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
- String clusteringCommitTime,
- Option<HoodieData<WriteStatus>> writeStatuses) {
+ String clusteringCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
clustering write status and commit clustering");
HoodieInstant clusteringInstant =
ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
table.getActiveTimeline(), table.getInstantGenerator()).get();
List<HoodieWriteStat> writeStats =
metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
@@ -135,7 +132,7 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
// commit to data table after committing to metadata table.
// We take the lock here to ensure all writes to metadata table happens
within a single lock (single writer).
// Because more than one write to metadata table will result in
conflicts since all of them updates the same partition.
- writeTableMetadata(table, clusteringCommitTime, metadata,
writeStatuses.orElseGet(context::emptyHoodieData));
+ writeTableMetadata(table, clusteringCommitTime, metadata);
LOG.info("Committing Clustering {} finished with result {}.",
clusteringCommitTime, metadata);
ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(
@@ -180,11 +177,6 @@ public class HoodieFlinkTableServiceClient<T> extends
BaseHoodieTableServiceClie
return writeMetadata;
}
- @Override
- protected HoodieData<WriteStatus>
convertToWriteStatus(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
- return HoodieListData.eager(writeMetadata.getWriteStatuses());
- }
-
@Override
protected HoodieTable createTable(HoodieWriteConfig config,
StorageConfiguration<?> storageConf, boolean skipValidation) {
return createTableAndValidate(config, HoodieFlinkTable::create,
skipValidation);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3bac02db25a..ba3570a0b49 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.utils.TransactionUtils;
-import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -109,7 +108,7 @@ public class HoodieFlinkWriteClient<T> extends
.values().stream()
.map(duplicates ->
duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
- return commitStats(instantTime, HoodieListData.eager(writeStatuses),
merged, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
+ return commitStats(instantTime, merged, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc);
}
@Override
@@ -375,9 +374,8 @@ public class HoodieFlinkWriteClient<T> extends
private void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
- String clusteringCommitTime,
- Option<HoodieData<WriteStatus>> writeStatuses) {
- ((HoodieFlinkTableServiceClient<T>)
tableServiceClient).completeClustering(metadata, table, clusteringCommitTime,
writeStatuses);
+ String clusteringCommitTime) {
+ ((HoodieFlinkTableServiceClient<T>)
tableServiceClient).completeClustering(metadata, table, clusteringCommitTime);
}
@Override
@@ -394,11 +392,10 @@ public class HoodieFlinkWriteClient<T> extends
TableServiceType tableServiceType,
HoodieCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
- String commitInstant,
- Option<HoodieData<WriteStatus>> writeStatuses) {
+ String commitInstant) {
switch (tableServiceType) {
case CLUSTER:
- completeClustering((HoodieReplaceCommitMetadata) metadata, table,
commitInstant, writeStatuses);
+ completeClustering((HoodieReplaceCommitMetadata) metadata, table,
commitInstant);
break;
case COMPACT:
completeCompaction(metadata, table, commitInstant);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 750ef71bc37..7a27830274f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -123,8 +122,7 @@ public abstract class BaseFlinkCommitActionExecutor<T>
extends
@Override
protected void commit(HoodieWriteMetadata<List<WriteStatus>> result) {
- commit(HoodieListData.eager(result.getWriteStatuses()), result,
-
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+ commit(result,
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
}
protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>>
result) {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
index 8dfbcae32ee..ac463e69b0d 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java
@@ -21,8 +21,6 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ClusteringUtils;
@@ -64,11 +62,6 @@ public class HoodieJavaTableServiceClient<T> extends
BaseHoodieTableServiceClien
return writeMetadata;
}
- @Override
- protected HoodieData<WriteStatus>
convertToWriteStatus(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
- return HoodieListData.eager(writeMetadata.getWriteStatuses());
- }
-
@Override
protected HoodieTable<?, List<HoodieRecord<T>>, ?, List<WriteStatus>>
createTable(HoodieWriteConfig config, StorageConfiguration<?> storageConf,
boolean skipValidation) {
return createTableAndValidate(config, HoodieJavaTable::create,
skipValidation);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 968454a1b3d..4742c25c557 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -88,7 +88,7 @@ public class HoodieJavaWriteClient<T> extends
Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- return commitStats(instantTime, HoodieListData.eager(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index c968a1cdab7..f447f630d80 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -189,8 +189,7 @@ public abstract class BaseJavaCommitActionExecutor<T>
extends
@Override
protected void commit(HoodieWriteMetadata<List<WriteStatus>> result) {
- commit(HoodieListData.eager(result.getWriteStatuses()), result,
-
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+ commit(result,
result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
}
protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>>
result) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index 2f313b1ddd7..e7865064791 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -64,11 +64,6 @@ public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<
return
writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
}
- @Override
- protected HoodieData<WriteStatus>
convertToWriteStatus(HoodieWriteMetadata<HoodieData<WriteStatus>>
writeMetadata) {
- return writeMetadata.getWriteStatuses();
- }
-
@Override
protected HoodieTable<?, HoodieData<HoodieRecord<T>>, ?,
HoodieData<WriteStatus>> createTable(HoodieWriteConfig config,
StorageConfiguration<?> storageConf, boolean skipValidation) {
return createTableAndValidate(config, HoodieSparkTable::create,
skipValidation);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 2e639b28192..12c9a0f80b7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -90,7 +90,7 @@ public class SparkRDDWriteClient<T> extends
Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index e7175c55d9f..9b842064000 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -214,7 +214,7 @@ public class SparkBootstrapCommitActionExecutor<T>
LOG.info("Finished writing bootstrap index for source " +
config.getBootstrapSourceBasePath() + " in table "
+ config.getBasePath());
}
- commit(result.getWriteStatuses(), result,
bootstrapSourceAndStats.values().stream()
+ commit(result, bootstrapSourceAndStats.values().stream()
.flatMap(f ->
f.stream().map(Pair::getValue)).collect(Collectors.toList()));
LOG.info("Committing metadata bootstrap !!");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 36902a8c3f2..94d8748166b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -289,7 +289,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
@Override
protected void commit(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status
collect: " + config.getTableName());
- commit(result.getWriteStatuses(), result,
result.getWriteStats().isPresent()
+ commit(result, result.getWriteStats().isPresent()
? result.getWriteStats().get() :
result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
index a299b2f337a..303f83241f7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -140,7 +140,7 @@ class TestSparkRDDWriteClient extends
SparkClientFunctionalTestHarness {
String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath());
List<Integer> metadataTableCacheIds0 =
context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath,
instant0));
List<Integer> metadataTableCacheIds1 =
context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath,
instant1));
- writeClient.commitStats(instant1, context().parallelize(writeStatuses, 1),
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ writeClient.commitStats(instant1,
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType());
writeClient.close();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
index c790b3cebdc..812c17d3743 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
@@ -275,7 +275,7 @@ public class TestConsistentBucketIndex extends
HoodieSparkClientTestHarness {
}
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
if (doCommit) {
- boolean success = writeClient.commitStats(commitTime,
context.parallelize(writeStatues, 1),
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ boolean success = writeClient.commitStats(commitTime,
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(success);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
new file mode 100644
index 00000000000..1a3c4bfa0e7
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.BaseFileRecordParsingUtils;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestMetadataUtilRLIandSIRecordGeneration extends
HoodieClientTestBase {
+
+ /**
+ * Tests various methods used for RLI and SI record generation flows.
+ * We test below methods
+ *
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...).
This is used for RLI record generation.
+ * BaseFileRecordParsingUtils.getRecordKeyStatuses(...) // This is used in
both RLI and SI flow.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testRecordGenerationAPIsForCOW() throws IOException {
+ HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+ cleanupClients();
+ initMetaClient(tableType);
+ cleanupTimelineService();
+ initTimelineService();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieWriteConfig writeConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // Insert
+ String commitTime = client.createNewInstantTime();
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
+ client.startCommitWithTime(commitTime);
+ List<WriteStatus> writeStatuses1 =
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ // assert RLI records for a base file from 1st commit
+ String finalCommitTime = commitTime;
+ Map<String, String> recordKeyToPartitionMapping1 = new HashMap<>();
+ Map<String, String> fileIdToFileNameMapping1 = new HashMap<>();
+ writeStatuses1.forEach(writeStatus -> {
+ assertEquals(writeStatus.getStat().getNumDeletes(), 0);
+ // Fetch record keys for all
+ try {
+ String writeStatFileId = writeStatus.getFileId();
+ if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) {
+ fileIdToFileNameMapping1.put(writeStatFileId,
writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/")
+ 1));
+ }
+
+ // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch
MDT RLI records for inserts and deletes.
+ Iterator<HoodieRecord> rliRecordsItr =
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
+ writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(),
finalCommitTime, metaClient.getStorage());
+ while (rliRecordsItr.hasNext()) {
+ HoodieRecord rliRecord = rliRecordsItr.next();
+ String key = rliRecord.getRecordKey();
+ String partition = ((HoodieMetadataPayload)
rliRecord.getData()).getRecordGlobalLocation().getPartitionPath();
+ recordKeyToPartitionMapping1.put(key, partition);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Should not have failed ", e);
+ }
+ });
+
+ Map<String, String> expectedRecordToPartitionMapping1 = new HashMap<>();
+ records1.forEach(record ->
expectedRecordToPartitionMapping1.put(record.getRecordKey(),
record.getPartitionPath()));
+
+ assertEquals(expectedRecordToPartitionMapping1,
recordKeyToPartitionMapping1);
+
+ // lets update some records and assert RLI records.
+ commitTime = client.createNewInstantTime();
+ client.startCommitWithTime(commitTime);
+ String finalCommitTime2 = commitTime;
+ List<HoodieRecord> deletes2 =
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+ List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates(commitTime,
30);
+ List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime, 30);
+ List<HoodieRecord> records2 = new ArrayList<>();
+ records2.addAll(inserts2);
+ records2.addAll(updates2);
+ records2.addAll(deletes2);
+
+ List<WriteStatus> writeStatuses2 =
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses2);
+
+ List<String> expectedInserts = inserts2.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ List<String> expectedDeletes = deletes2.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ List<String> actualInserts = new ArrayList<>();
+ List<String> actualDeletes = new ArrayList<>();
+ // only inserts and deletes will result in RLI records. lets validate
that.
+ generateRliRecordsAndAssert(writeStatuses2, fileIdToFileNameMapping1,
finalCommitTime2, writeConfig, actualInserts, actualDeletes);
+
+ assertListEquality(expectedInserts, actualInserts);
+ assertListEquality(expectedDeletes, actualDeletes);
+
+ // lets validate APIs in BaseFileParsingUtils directly
+ actualInserts = new ArrayList<>();
+ actualDeletes = new ArrayList<>();
+ List<String> actualUpdates = new ArrayList<>();
+ List<String> expectedUpdates = updates2.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ parseRecordKeysFromBaseFiles(writeStatuses2, fileIdToFileNameMapping1,
finalCommitTime2, writeConfig, actualInserts, actualDeletes, actualUpdates);
+ assertListEquality(expectedInserts, actualInserts);
+ assertListEquality(expectedDeletes, actualDeletes);
+ // we can't really assert equality for updates. bcoz, w/ COW, we might
just rewrite an existing parquet file. So, more records will be deduced as
updates.
+ // And so, we are validating using contains.
+ expectedUpdates.forEach(entry ->
assertTrue(actualUpdates.contains(entry)));
+ }
+ }
+
+ /**
+ * Tests various methods used for RLI and SI record generation flows w/ MOR
table. here emphasis are given to log files.
+ * We test below methods
+ *
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(...).
This is used for RLI record generation.
+ * HoodieTableMetadataUtil.getRecordKeys() // This is used in both RLI and
SI flow.
+ * HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated() for
HoodieCommitMetadata.
+ * <p>
+ * We also test few adhoc scenarios.
+ * - if any log files contains inserts, RLI and SI record generation should
throw exception.
+ * - RLI do no generate any records for compaction operation.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testRecordGenerationAPIsForMOR() throws IOException {
+ HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
+ cleanupClients();
+ initMetaClient(tableType);
+ cleanupTimelineService();
+ initTimelineService();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieWriteConfig writeConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
+ .withInlineCompaction(true)
+ .compactionSmallFileSize(0).build()).build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // Insert
+ String commitTime = client.createNewInstantTime();
+ List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 100);
+ client.startCommitWithTime(commitTime);
+ List<WriteStatus> writeStatuses1 =
client.insert(jsc.parallelize(records1, 1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ // assert RLI records for a base file from 1st commit
+ String finalCommitTime = commitTime;
+ Map<String, String> recordKeyToPartitionMapping1 = new HashMap<>();
+ Map<String, String> fileIdToFileNameMapping1 = new HashMap<>();
+ writeStatuses1.forEach(writeStatus -> {
+ assertEquals(writeStatus.getStat().getNumDeletes(), 0);
+ // Fetch record keys for all
+ try {
+ String writeStatFileId = writeStatus.getFileId();
+ if (!fileIdToFileNameMapping1.containsKey(writeStatFileId)) {
+ fileIdToFileNameMapping1.put(writeStatFileId,
writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/")
+ 1));
+ }
+
+ // poll into generateRLIMetadataHoodieRecordsForBaseFile to fetch
MDT RLI records for inserts and deletes.
+ Iterator<HoodieRecord> rliRecordsItr =
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
+ writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(),
finalCommitTime, metaClient.getStorage());
+ while (rliRecordsItr.hasNext()) {
+ HoodieRecord rliRecord = rliRecordsItr.next();
+ String key = rliRecord.getRecordKey();
+ String partition = ((HoodieMetadataPayload)
rliRecord.getData()).getRecordGlobalLocation().getPartitionPath();
+ recordKeyToPartitionMapping1.put(key, partition);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Should not have failed ", e);
+ }
+ });
+
+ Map<String, String> expectedRecordToPartitionMapping1 = new HashMap<>();
+ records1.forEach(record ->
expectedRecordToPartitionMapping1.put(record.getRecordKey(),
record.getPartitionPath()));
+
+ assertEquals(expectedRecordToPartitionMapping1,
recordKeyToPartitionMapping1);
+
+ // lets update some records and assert RLI records.
+ commitTime = client.createNewInstantTime();
+ client.startCommitWithTime(commitTime);
+ List<HoodieRecord> deletes2 =
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+ List<HoodieRecord> updates2 = dataGen.generateUniqueUpdates(commitTime,
30);
+ List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime, 30);
+ List<HoodieRecord> records2 = new ArrayList<>();
+ records2.addAll(inserts2);
+ records2.addAll(updates2);
+ records2.addAll(deletes2);
+
+ List<WriteStatus> writeStatuses2 =
client.upsert(jsc.parallelize(records2, 1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses2);
+ assertRLIandSIRecordGenerationAPIs(inserts2, updates2, deletes2,
writeStatuses2, commitTime, writeConfig);
+
+ // trigger 2nd commit.
+ commitTime = client.createNewInstantTime();
+ client.startCommitWithTime(commitTime);
+ String finalCommitTime3 = commitTime;
+ List<HoodieRecord> deletes3 =
dataGen.generateUniqueDeleteRecords(commitTime, 30);
+ List<HoodieRecord> updates3 = dataGen.generateUniqueUpdates(commitTime,
30);
+ List<HoodieRecord> inserts3 = dataGen.generateInserts(commitTime, 30);
+ List<HoodieRecord> records3 = new ArrayList<>();
+ records3.addAll(inserts3);
+ records3.addAll(updates3);
+ records3.addAll(deletes3);
+
+ List<WriteStatus> writeStatuses3 =
client.upsert(jsc.parallelize(records3, 1), commitTime).collect();
+ assertNoWriteErrors(writeStatuses3);
+ assertRLIandSIRecordGenerationAPIs(inserts3, updates3, deletes3,
writeStatuses3, finalCommitTime3, writeConfig);
+
+ // lets validate that if any log file contains inserts, fetching keys
will fail.
+ HoodieWriteStat writeStat = writeStatuses3.get(1).getStat();
+ writeStat.setNumInserts(5);
+ HoodieCommitMetadata commitMetadata =
CommitUtils.buildMetadata(Collections.singletonList(writeStat),
Collections.emptyMap(),
+ Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(),
"commit");
+
+ try {
+ HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context,
commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3);
+ fail("Should not have reached here");
+ } catch (Exception e) {
+ // no op
+ }
+
+ // trigger compaction
+ Option<String> compactionInstantOpt =
client.scheduleCompaction(Option.empty());
+ assertTrue(compactionInstantOpt.isPresent());
+ HoodieWriteMetadata compactionWriteMetadata =
client.compact(compactionInstantOpt.get());
+ HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata)
compactionWriteMetadata.getCommitMetadata().get();
+ // no RLI records should be generated for compaction operation.
+
assertTrue(HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords(context,
compactionCommitMetadata, writeConfig.getMetadataConfig(),
+ metaClient, writeConfig.getWritesFileIdEncoding(),
compactionInstantOpt.get()).isEmpty());
+ }
+ }
+
+ private void assertRLIandSIRecordGenerationAPIs(List<HoodieRecord> inserts3,
List<HoodieRecord> updates3, List<HoodieRecord> deletes3,
+ List<WriteStatus>
writeStatuses3, String finalCommitTime3, HoodieWriteConfig writeConfig) {
+ List<String> expectedRLIInserts = inserts3.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ List<String> expectedUpdates = updates3.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ List<String> expectedRLIDeletes = deletes3.stream().map(record ->
record.getKey().getRecordKey()).collect(Collectors.toList());
+ List<String> expectedUpatesAndDeletes = new
ArrayList<>(expectedRLIDeletes);
+ expectedUpatesAndDeletes.addAll(expectedUpdates);
+
+ // lets validate RLI record generation.
+ List<String> actualInserts = new ArrayList<>();
+ List<String> actualDeletes = new ArrayList<>();
+ List<String> actualUpdatesAndDeletes = new ArrayList<>();
+ generateRliRecordsAndAssert(writeStatuses3.stream().filter(writeStatus ->
!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(),
writeStatus.getPartitionPath())))
+ .collect(Collectors.toList()), Collections.emptyMap(),
finalCommitTime3, writeConfig, actualInserts, actualDeletes);
+
+ // lets also test HoodieTableMetadataUtil.getRecordKeys() for each
individual log file touched as part of HoodieCommitMetadata.
+ // lets test only deletes and also test both validat and deleted keys for
log files.
+ // we have disabled small file handling. And so, updates and deletes will
definitely go into log files.
+ String latestCommitTimestamp =
metaClient.reloadActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime();
+ Option<Schema> writerSchemaOpt = tryResolveSchemaForTable(metaClient);
+ List<String> finalActualDeletes = actualDeletes;
+ writeStatuses3.stream().filter(writeStatus ->
FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(),
writeStatus.getPartitionPath())))
+ .forEach(writeStatus -> {
+ try {
+ // used for RLI
+
finalActualDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath + "/"
+ writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
+ writeConfig.getMetadataConfig().getMaxReaderBufferSize(),
latestCommitTimestamp, false, true));
+
+ // used in SI flow
+
actualUpdatesAndDeletes.addAll(HoodieTableMetadataUtil.getRecordKeys(basePath +
"/" + writeStatus.getStat().getPath(), metaClient, writerSchemaOpt,
+ writeConfig.getMetadataConfig().getMaxReaderBufferSize(),
latestCommitTimestamp, true, true));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed w/ IOException ", e);
+ }
+ });
+
+ assertListEquality(expectedRLIInserts, actualInserts);
+ assertListEquality(expectedRLIDeletes, actualDeletes);
+ assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
+ HoodieCommitMetadata commitMetadata =
CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus ->
writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
+ Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(),
"commit");
+
+ // validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for
entire CommitMetadata which is used in SI code path.
+ List<String> updatedOrDeletedKeys =
+ new
ArrayList<>(HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(context,
commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3));
+ List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
+ expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
+ assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);
+ }
+
+ @Test
+ public void testReducedByKeysForRLIRecords() throws IOException {
+ HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+ cleanupClients();
+ initMetaClient(tableType);
+ cleanupTimelineService();
+ initTimelineService();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieWriteConfig writeConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ String commitTime = client.createNewInstantTime();
+ List<HoodieRecord> inserts = dataGen.generateInserts(commitTime, 100);
+ List<HoodieRecord> deletes =
dataGen.generateUniqueDeleteRecords(commitTime, 20);
+ String randomFileId = UUID.randomUUID().toString() + "-0";
+ List<String> deletedRecordKeys = deletes.stream().map(record ->
record.getRecordKey()).collect(Collectors.toList());
+ List<HoodieRecord> adjustedInserts = inserts.stream().filter(record ->
!deletedRecordKeys.contains(record.getRecordKey())).collect(Collectors.toList());
+
+ List<HoodieRecord> insertRecords =
+ inserts.stream().map(record ->
HoodieMetadataPayload.createRecordIndexUpdate(record.getRecordKey(), "abc",
randomFileId, commitTime, 0))
+ .collect(Collectors.toList());
+ List<HoodieRecord> deleteRecords = inserts.stream().map(record ->
HoodieMetadataPayload.createRecordIndexDelete(record.getRecordKey()))
+ .collect(Collectors.toList());
+
+ List<HoodieRecord> recordsToTest = new ArrayList<>();
+ recordsToTest.addAll(adjustedInserts);
+ recordsToTest.addAll(deleteRecords);
+ // happy paths. no dups. in and out are same.
+ List<HoodieRecord> actualRecords =
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2),
2).collectAsList();
+ assertHoodieRecordListEquality(actualRecords, recordsToTest);
+
+ // few records has both inserts and deletes.
+ recordsToTest = new ArrayList<>();
+ recordsToTest.addAll(insertRecords);
+ recordsToTest.addAll(deleteRecords);
+ actualRecords =
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2),
2).collectAsList();
+ List<HoodieRecord> expectedList = new ArrayList<>();
+ expectedList.addAll(insertRecords);
+ assertHoodieRecordListEquality(actualRecords, expectedList);
+
+ // few deletes are duplicates. we are allowed to have duplicate deletes.
+ recordsToTest = new ArrayList<>();
+ recordsToTest.addAll(adjustedInserts);
+ recordsToTest.addAll(deleteRecords);
+ recordsToTest.addAll(deleteRecords.subList(0, 10));
+ actualRecords =
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2),
2).collectAsList();
+ expectedList = new ArrayList<>();
+ expectedList.addAll(adjustedInserts);
+ expectedList.addAll(deleteRecords);
+ assertHoodieRecordListEquality(actualRecords, expectedList);
+
+ // test failure case. same record having 2 inserts should fail.
+ recordsToTest = new ArrayList<>();
+ recordsToTest.addAll(adjustedInserts);
+ recordsToTest.addAll(adjustedInserts.subList(0, 5));
+ try {
+
HoodieTableMetadataUtil.reduceByKeys(context.parallelize(recordsToTest, 2),
2).collectAsList();
+ fail("Should not have reached here");
+ } catch (Exception e) {
+ // expected. no-op
+ assertTrue(e.getCause() instanceof HoodieIOException);
+ }
+ }
+ }
+
+ private void assertHoodieRecordListEquality(List<HoodieRecord> actualList,
List<HoodieRecord> expectedList) {
+ assertEquals(expectedList.size(), actualList.size());
+ List<String> expectedInsertRecordKeys =
expectedList.stream().filter(record -> !(record.getData() instanceof
EmptyHoodieRecordPayload))
+ .map(record -> record.getRecordKey()).collect(Collectors.toList());
+ List<String> expectedDeletedRecordKeys =
expectedList.stream().filter(record -> (record.getData() instanceof
EmptyHoodieRecordPayload))
+ .map(record -> record.getRecordKey()).collect(Collectors.toList());
+
+ List<String> actualInsertRecordKeys = actualList.stream().filter(record ->
!(record.getData() instanceof EmptyHoodieRecordPayload))
+ .map(record -> record.getRecordKey()).collect(Collectors.toList());
+ List<String> actualDeletedRecordKeys = actualList.stream().filter(record
-> (record.getData() instanceof EmptyHoodieRecordPayload))
+ .map(record -> record.getRecordKey()).collect(Collectors.toList());
+
+ assertListEquality(expectedInsertRecordKeys, actualInsertRecordKeys);
+ assertListEquality(expectedDeletedRecordKeys, actualDeletedRecordKeys);
+ }
+
+ private void assertListEquality(List<String> list1, List<String> list2) {
+ Collections.sort(list1);
+ Collections.sort(list2);
+ assertEquals(list1, list2);
+ }
+
+ private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient
dataTableMetaClient) {
+ if
(dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
== 0) {
+ return Option.empty();
+ }
+
+ try {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(dataTableMetaClient);
+ return Option.of(schemaResolver.getTableAvroSchema());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get latest columns for " +
dataTableMetaClient.getBasePath(), e);
+ }
+ }
+
+ private void generateRliRecordsAndAssert(List<WriteStatus> writeStatuses,
Map<String, String> fileIdToFileNameMapping, String commitTime,
+ HoodieWriteConfig writeConfig,
List<String> actualInserts,
+ List<String> actualDeletes) {
+ writeStatuses.forEach(writeStatus -> {
+ if
(!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(),
writeStatus.getPartitionPath()))) {
+ // Fetch record keys for all
+ try {
+ String writeStatFileId = writeStatus.getFileId();
+ if (!fileIdToFileNameMapping.isEmpty()) {
+ assertEquals(writeStatus.getStat().getPrevBaseFile(),
fileIdToFileNameMapping.get(writeStatFileId));
+ }
+
+ Iterator<HoodieRecord> rliRecordsItr =
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
writeStatus.getStat(),
+ writeConfig.getWritesFileIdEncoding(), commitTime,
metaClient.getStorage());
+ while (rliRecordsItr.hasNext()) {
+ HoodieRecord rliRecord = rliRecordsItr.next();
+ String key = rliRecord.getRecordKey();
+ if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) {
+ actualDeletes.add(key);
+ } else {
+ actualInserts.add(key);
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Should not have failed ", e);
+ }
+ }
+ });
+ }
+
+ private void parseRecordKeysFromBaseFiles(List<WriteStatus> writeStatuses,
Map<String, String> fileIdToFileNameMapping, String commitTime,
+ HoodieWriteConfig writeConfig,
List<String> actualInserts,
+ List<String> actualDeletes,
List<String> actualUpdates) {
+ writeStatuses.forEach(writeStatus -> {
+ if
(!FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(),
writeStatus.getPartitionPath()))) {
+ // Fetch record keys for all
+ try {
+ String writeStatFileId = writeStatus.getFileId();
+ if (!fileIdToFileNameMapping.isEmpty()) {
+ assertEquals(writeStatus.getStat().getPrevBaseFile(),
fileIdToFileNameMapping.get(writeStatFileId));
+ }
+
+ String partition = writeStatus.getStat().getPartitionPath();
+ String latestFileName =
FSUtils.getFileNameFromPath(writeStatus.getStat().getPath());
+
+ Set<BaseFileRecordParsingUtils.RecordStatus> recordStatusSet = new
HashSet<>();
+ recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.INSERT);
+ recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.UPDATE);
+ recordStatusSet.add(BaseFileRecordParsingUtils.RecordStatus.DELETE);
+
+ Map<BaseFileRecordParsingUtils.RecordStatus, List<String>>
recordKeyMappings =
BaseFileRecordParsingUtils.getRecordKeyStatuses(metaClient.getBasePath().toString(),
partition, latestFileName,
+ writeStatus.getStat().getPrevBaseFile(), storage,
recordStatusSet);
+ if
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.INSERT))
{
+
actualInserts.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.INSERT));
+ }
+ if
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.UPDATE))
{
+
actualUpdates.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.UPDATE));
+ }
+ if
(recordKeyMappings.containsKey(BaseFileRecordParsingUtils.RecordStatus.DELETE))
{
+
actualDeletes.addAll(recordKeyMappings.get(BaseFileRecordParsingUtils.RecordStatus.DELETE));
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Should not have failed ", e);
+ }
+ }
+ });
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index a290b1f3e96..ce7b1c26c9c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -431,7 +431,7 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
});
commitMeta = generateCommitMetadata(instantTime, partToFileIds);
metadataWriter.performTableServices(Option.of(instantTime));
- metadataWriter.updateFromWriteStatuses(commitMeta,
context.emptyHoodieData(), instantTime);
+ metadataWriter.update(commitMeta, instantTime);
metaClient.getActiveTimeline().saveAsComplete(
INSTANT_GENERATOR.createNewInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime),
serializeCommitMetadata(metaClient.getCommitMetadataSerDe(),
commitMeta));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index ef28980d9cf..73b9d7e8be2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -135,7 +135,7 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
List<WriteStatus> writeStatuses = writeData(insertTime, 100, false);
Assertions.assertEquals(200, readTableTotalRecordsNum());
// commit the write. The records should be visible now even though the
compaction does not complete.
- client.commitStats(insertTime, context().parallelize(writeStatuses, 1),
writeStatuses.stream().map(WriteStatus::getStat)
+ client.commitStats(insertTime,
writeStatuses.stream().map(WriteStatus::getStat)
.collect(Collectors.toList()), Option.empty(),
metaClient.getCommitActionType());
Assertions.assertEquals(300, readTableTotalRecordsNum());
// after the compaction, total records should remain the same
@@ -207,7 +207,7 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- boolean committed = client.commitStats(instant,
context().parallelize(writeStatuses, 1), writeStats, Option.empty(),
metaClient.getCommitActionType());
+ boolean committed = client.commitStats(instant, writeStats,
Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index b8f55c55a72..704d6e8420b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -140,7 +140,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 1st txn
client1.commitStats(
insertTime1,
- context().parallelize(writeStatuses1, 1),
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -148,7 +147,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 2nd txn
client2.commitStats(
insertTime2,
- context().parallelize(writeStatuses2, 1),
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -185,7 +183,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 1st txn
client1.commitStats(
insertTime1,
- context().parallelize(writeStatuses1, 1),
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -199,7 +196,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
List<WriteStatus> writeStatuses3 = writeData(client1, insertTime3,
dataset3, false, WriteOperationType.INSERT);
client1.commitStats(
insertTime3,
- context().parallelize(writeStatuses3, 1),
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -233,7 +229,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
List<WriteStatus> writeStatuses0 = writeData(client0, insertTime0,
dataset0, false, WriteOperationType.BULK_INSERT, true);
client0.commitStats(
insertTime0,
- context().parallelize(writeStatuses0, 1),
writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -273,7 +268,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 1st txn
client1.commitStats(
insertTime1,
- context().parallelize(writeStatuses1, 1),
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -281,7 +275,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 2nd txn
client2.commitStats(
insertTime2,
- context().parallelize(writeStatuses2, 1),
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -330,7 +323,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
// step to commit the 1st txn
client1.commitStats(
insertTime1,
- context().parallelize(writeStatuses1, 1),
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -339,7 +331,6 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
assertThrows(HoodieWriteConflictException.class, () -> {
client2.commitStats(
insertTime2,
- context().parallelize(writeStatuses2, 1),
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(),
metaClient.getCommitActionType());
@@ -552,7 +543,7 @@ public class TestSparkNonBlockingConcurrencyControl extends
SparkClientFunctiona
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- boolean committed = client.commitStats(instant,
context().parallelize(writeStatuses, 1), writeStats, Option.empty(),
metaClient.getCommitActionType());
+ boolean committed = client.commitStats(instant, writeStats,
Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index 8d12c7da48d..0da2af63725 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -211,7 +211,7 @@ public class HoodieCleanerTestBase extends
HoodieClientTestBase {
HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime,
partToFileIds);
try (HoodieTableMetadataWriter metadataWriter = getMetadataWriter(config))
{
metadataWriter.performTableServices(Option.of(instantTime));
- metadataWriter.updateFromWriteStatuses(commitMeta,
context.emptyHoodieData(), instantTime);
+ metadataWriter.update(commitMeta, instantTime);
metaClient.getActiveTimeline().saveAsComplete(
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime),
serializeCommitMetadata(metaClient.getCommitMetadataSerDe(),
commitMeta));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 3c98a510317..6d7ca6d5182 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -159,6 +159,9 @@ public class HoodieWriteStat implements Serializable {
@Nullable
private Long maxEventTime;
+ @Nullable
+ private String prevBaseFile;
+
@Nullable
private RuntimeStats runtimeStats;
@@ -327,6 +330,14 @@ public class HoodieWriteStat implements Serializable {
this.fileSizeInBytes = fileSizeInBytes;
}
+ public String getPrevBaseFile() {
+ return prevBaseFile;
+ }
+
+ public void setPrevBaseFile(String prevBaseFile) {
+ this.prevBaseFile = prevBaseFile;
+ }
+
public Long getMinEventTime() {
return minEventTime;
}
@@ -370,9 +381,9 @@ public class HoodieWriteStat implements Serializable {
@Override
public String toString() {
return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path +
'\'' + ", prevCommit='" + prevCommit
- + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes +
", numUpdateWrites=" + numUpdateWrites
- + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" +
totalWriteErrors + ", tempPath='" + tempPath
- + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats)
+ + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" +
numWrites + ", numDeletes=" + numDeletes
+ + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" +
totalWriteBytes + ", totalWriteErrors="
+ + totalWriteErrors + ", tempPath='" + tempPath + '\'' + ", cdcStats='"
+ JsonUtils.toString(cdcStats)
+ '\'' + ", partitionPath='" + partitionPath + '\'' + ",
totalLogRecords=" + totalLogRecords
+ ", totalLogFilesCompacted=" + totalLogFilesCompacted + ",
totalLogSizeCompacted=" + totalLogSizeCompacted
+ ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted +
", totalLogBlocks=" + totalLogBlocks
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8b8d43449c6..c23c5f42f19 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -42,16 +43,19 @@ import java.util.stream.Collectors;
public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner {
private final LogRecordScannerCallback callback;
+ private final RecordDeletionCallback recordDeletionCallback;
private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean
reverseReader, int bufferSize,
- LogRecordScannerCallback callback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
+ LogRecordScannerCallback callback,
RecordDeletionCallback recordDeletionCallback,
+ Option<InstantRange> instantRange,
InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(),
enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
this.callback = callback;
+ this.recordDeletionCallback = recordDeletionCallback;
}
/**
@@ -78,12 +82,16 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into queue of
BoundedInMemoryExecutor.
// Just call callback without merging
- callback.apply(hoodieRecord.copy());
+ if (callback != null) {
+ callback.apply(hoodieRecord.copy());
+ }
}
@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
- // no - op
+ if (recordDeletionCallback != null) {
+ recordDeletionCallback.apply(deleteRecord.getHoodieKey());
+ }
}
/**
@@ -95,6 +103,14 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
void apply(HoodieRecord<?> record) throws Exception;
}
+ /**
+ * A callback for log record scanner to consume deleted HoodieKeys.
+ */
+ @FunctionalInterface
+ public interface RecordDeletionCallback {
+ void apply(HoodieKey deletedKey);
+ }
+
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
@@ -110,6 +126,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
private Option<InstantRange> instantRange = Option.empty();
// specific configurations
private LogRecordScannerCallback callback;
+ private RecordDeletionCallback recordDeletionCallback;
private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
private HoodieTableMetaClient hoodieTableMetaClient;
@@ -173,6 +190,11 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
return this;
}
+ public Builder withRecordDeletionCallback(RecordDeletionCallback
recordDeletionCallback) {
+ this.recordDeletionCallback = recordDeletionCallback;
+ return this;
+ }
+
@Override
public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
@@ -197,7 +219,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
ValidationUtils.checkArgument(recordMerger != null);
return new HoodieUnMergedLogRecordScanner(storage, basePath,
logFilePaths, readerSchema,
- latestInstantTime, reverseReader, bufferSize, callback, instantRange,
+ latestInstantTime, reverseReader, bufferSize, callback,
recordDeletionCallback, instantRange,
internalSchema, enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
new file mode 100644
index 00000000000..ac2739237f2
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+ /**
+ * Generates RLI Metadata records for base files.
+ * If base file is a added to a new file group, all record keys are treated
as inserts.
+ * If a base file is added to an existing file group, we read previous base
file in addition to the latest base file of interest. Find deleted records and
generate RLI Metadata records
+ * for the same in addition to new insert records.
+ *
+ * @param basePath base path of the table.
+ * @param writeStat {@link HoodieWriteStat} of interest.
+ * @param writesFileIdEncoding fileID encoding for the table.
+ * @param instantTime instant time of interest.
+ * @param storage instance of {@link HoodieStorage}.
+ * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+ * @throws IOException
+ */
+ public static Iterator<HoodieRecord>
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+
HoodieWriteStat writeStat,
+
Integer writesFileIdEncoding,
+
String instantTime,
+
HoodieStorage storage) throws IOException {
+ String partition = writeStat.getPartitionPath();
+ String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+ String fileId = FSUtils.getFileId(latestFileName);
+
+ Set<RecordStatus> recordStatuses = new HashSet<>();
+ recordStatuses.add(RecordStatus.INSERT);
+ recordStatuses.add(RecordStatus.DELETE);
+ // for RLI, we are only interested in INSERTS and DELETES
+ Map<RecordStatus, List<String>> recordStatusListMap =
getRecordKeyStatuses(basePath, writeStat.getPartitionPath(), latestFileName,
writeStat.getPrevBaseFile(), storage,
+ recordStatuses);
+ List<HoodieRecord> hoodieRecords = new ArrayList<>();
+ if (recordStatusListMap.containsKey(RecordStatus.INSERT)) {
+
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.INSERT).stream()
+ .map(recordKey -> (HoodieRecord)
HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId,
+ instantTime, writesFileIdEncoding)).collect(toList()));
+ }
+
+ if (recordStatusListMap.containsKey(RecordStatus.DELETE)) {
+
hoodieRecords.addAll(recordStatusListMap.get(RecordStatus.DELETE).stream()
+ .map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()));
+ }
+
+ return hoodieRecords.iterator();
+ }
+
+ /**
+ * Fetch list of record keys deleted or updated in file referenced in the
{@link HoodieWriteStat} passed.
+ *
+ * @param basePath base path of the table.
+ * @param writeStat {@link HoodieWriteStat} instance of interest.
+ * @param storage {@link HoodieStorage} instance of interest.
+ * @return list of record keys deleted or updated.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(String basePath,
+ HoodieWriteStat
writeStat,
+ HoodieStorage
storage) throws IOException {
+ String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+ Set<RecordStatus> recordStatuses = new HashSet<>();
+ recordStatuses.add(RecordStatus.UPDATE);
+ recordStatuses.add(RecordStatus.DELETE);
+ // for secondary index, we are interested in UPDATES and DELETES.
+ return getRecordKeyStatuses(basePath, writeStat.getPartitionPath(),
latestFileName, writeStat.getPrevBaseFile(), storage,
+ recordStatuses).values().stream().flatMap((Function<List<String>,
Stream<String>>) Collection::stream).collect(toList());
+ }
+
+ /**
+ * Fetch list of record keys deleted or updated in file referenced in the
{@link HoodieWriteStat} passed.
+ *
+ * @param basePath base path of the table.
+ * @param storage {@link HoodieStorage} instance of interest.
+ * @return list of record keys deleted or updated.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public static Map<RecordStatus, List<String>> getRecordKeyStatuses(String
basePath,
+ String partition,
+ String
latestFileName,
+ String
prevFileName,
+ HoodieStorage
storage,
+
Set<RecordStatus> recordStatusesOfInterest) throws IOException {
+ Set<String> recordKeysFromLatestBaseFile =
getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName);
+ if (prevFileName == null) {
+ if (recordStatusesOfInterest.contains(RecordStatus.INSERT)) {
+ return Collections.singletonMap(RecordStatus.INSERT, new
ArrayList<>(recordKeysFromLatestBaseFile));
+ } else {
+ // if this is a new base file for a new file group, everything is an
insert.
+ return Collections.emptyMap();
+ }
+ } else {
+ // read from previous base file and find difference to also generate
delete records.
+ // we will return updates and deletes from this code block
+ Set<String> recordKeysFromPreviousBaseFile =
getRecordKeysFromBaseFile(storage, basePath, partition, prevFileName);
+ Map<RecordStatus, List<String>> toReturn = new
HashMap<>(recordStatusesOfInterest.size());
+ if (recordStatusesOfInterest.contains(RecordStatus.DELETE)) {
+ toReturn.put(RecordStatus.DELETE,
recordKeysFromPreviousBaseFile.stream()
+ .filter(recordKey -> {
+ // deleted record
+ return !recordKeysFromLatestBaseFile.contains(recordKey);
+ }).collect(toList()));
+ }
+
+ List<String> updates = new ArrayList<>();
+ List<String> inserts = new ArrayList<>();
+ // do one pass and collect list of inserts and updates.
+ recordKeysFromLatestBaseFile.stream().forEach(recordKey -> {
+ if (recordKeysFromPreviousBaseFile.contains(recordKey)) {
+ updates.add(recordKey);
+ } else {
+ inserts.add(recordKey);
+ }
+ });
+ if (recordStatusesOfInterest.contains(RecordStatus.UPDATE)) {
+ toReturn.put(RecordStatus.UPDATE, updates);
+ }
+ if (recordStatusesOfInterest.contains(RecordStatus.INSERT)) {
+ toReturn.put(RecordStatus.INSERT, inserts);
+ }
+ return toReturn;
+ }
+ }
+
+ private static Set<String> getRecordKeysFromBaseFile(HoodieStorage storage,
String basePath, String partition, String fileName) throws IOException {
+ StoragePath dataFilePath = new StoragePath(basePath,
StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR)
+ fileName);
+ FileFormatUtils fileFormatUtils =
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET);
+ return fileFormatUtils.readRowKeys(storage, dataFilePath);
+ }
+
+ public enum RecordStatus {
+ INSERT,
+ UPDATE,
+ DELETE
+ }
+
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6f03b9e8209..ab63c153250 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -46,6 +46,9 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -54,6 +57,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -61,6 +65,7 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -135,6 +140,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.util.stream.Collectors.toList;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
@@ -364,6 +370,7 @@ public class HoodieTableMetadataUtil {
boolean isColumnStatsIndexEnabled,
int columnStatsIndexParallelism,
List<String> targetColumnsForColumnStatsIndex,
+
Integer writesFileIdEncoding,
HoodieMetadataConfig metadataConfig) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
@@ -385,6 +392,10 @@ public class HoodieTableMetadataUtil {
final HoodieData<HoodieRecord> partitionStatsRDD =
convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient,
metadataConfig);
partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(),
partitionStatsRDD);
}
+ if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) {
+
partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig,
+ dataMetaClient, writesFileIdEncoding, instantTime));
+ }
return partitionToRecordsMap;
}
@@ -757,6 +768,163 @@ public class HoodieTableMetadataUtil {
});
}
+ @VisibleForTesting
+ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty() || commitMetadata.getOperationType() ==
WriteOperationType.COMPACT) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ HoodieFileFormat baseFileFormat =
dataTableMetaClient.getTableConfig().getBaseFileFormat();
+ // RLI cannot support logs having inserts with current offering. So,
lets validate that.
+ if (allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+ })) {
+ throw new HoodieIOException("RLI cannot support logs having inserts
with current offering. Would recommend disabling Record Level Index");
+ }
+
+ // we might need to set some additional variables if we need to process
log files.
+ // for RLI and MOR table, we only care about log files if they contain
any deletes. If not, all entries in logs are considered as updates, for which
+ // we do not need to generate new RLI record.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ // handle base files
+ if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else if (FSUtils.isLogFile(fullFilePath)) {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ } else {
+ throw new HoodieIOException("Unsupported file type " +
fullFilePath.toString() + " while generating MDT records");
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ @VisibleForTesting
+ public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord>
recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else if (isRecord1Deleted && isRecord2Deleted) {
+ // let's delete just 1 of them
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ @VisibleForTesting
+ public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ String instantTime)
{
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ HoodieFileFormat baseFileFormat =
dataTableMetaClient.getTableConfig().getBaseFileFormat();
+ // SI cannot support logs having inserts with current offering. So, lets
validate that.
+ if (allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+ })) {
+ throw new HoodieIOException("Secondary index cannot support logs
having inserts with current offering. Can you drop secondary index.");
+ }
+
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName);
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFiles) {
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ // handle base files
+ if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat,
storage).iterator();
+ } else if (FSUtils.isLogFile(fullFilePath)) {
+ // for logs, every entry is either an update or a delete
+ return getRecordKeys(fullFilePath.toString(),
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true,
true)
+ .iterator();
+ } else {
+ throw new HoodieIOException("Found unsupported file type " +
fullFilePath.toString() + ", while generating MDT records");
+ }
+ }).collectAsList();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to fetch deleted record keys while
preparing MDT records", e);
+ }
+ }
+
private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient
dataTableMetaClient, String instantTime,
Map<String, Map<String,
Long>> partitionToFilesMap) {
InstantGenerator factory = dataTableMetaClient.getInstantGenerator();
@@ -1309,6 +1477,35 @@ public class HoodieTableMetadataUtil {
return Collections.emptyList();
}
+ @VisibleForTesting
+ public static Set<String> getRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema> writerSchemaOpt, int
maxBufferSize,
+ String latestCommitTimestamp,
boolean includeValidKeys,
+ boolean includeDeletedKeys) throws
IOException {
+ if (writerSchemaOpt.isPresent()) {
+ // read log file records without merging
+ Set<String> allRecordKeys = new HashSet<>();
+ HoodieUnMergedLogRecordScanner.Builder builder =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(maxBufferSize)
+ .withLatestInstantTime(latestCommitTimestamp)
+ .withReaderSchema(writerSchemaOpt.get())
+ .withTableMetaClient(datasetMetaClient);
+ if (includeValidKeys) {
+ builder.withLogRecordScannerCallback(record ->
allRecordKeys.add(record.getRecordKey()));
+ }
+ if (includeDeletedKeys) {
+ builder.withRecordDeletionCallback(deletedKey ->
allRecordKeys.add(deletedKey.getRecordKey()));
+ }
+ HoodieUnMergedLogRecordScanner scanner = builder.build();
+ scanner.scan();
+ return allRecordKeys;
+ }
+ return Collections.emptySet();
+ }
+
/**
* Does an upcast for {@link BigDecimal} instance to align it with
scale/precision expected by
* the {@link LogicalTypes.Decimal} Avro logical type
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
index a99e4848299..54d6b7a7ab7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
@@ -53,6 +53,12 @@ public class TestHoodieWriteStat {
writeStat.setPath(basePath, finalizeFilePath);
assertEquals(finalizeFilePath, new StoragePath(basePath,
writeStat.getPath()));
+ // test prev base file
+ StoragePath prevBaseFilePath = new StoragePath(partitionPath,
FSUtils.makeBaseFileName(instantTime, "2-0-3", fileName,
+ HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()));
+ writeStat.setPrevBaseFile(prevBaseFilePath.toString());
+ assertEquals(prevBaseFilePath.toString(), writeStat.getPrevBaseFile());
+
// test for null tempFilePath
writeStat = new HoodieWriteStat();
writeStat.setPath(basePath, finalizeFilePath);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index e773f3ed117..f28f811ba59 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.clustering;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.TableServiceType;
@@ -218,8 +217,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
}
// commit the clustering
this.table.getMetaClient().reloadActiveTimeline();
- this.writeClient.completeTableService(
- TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(),
table, instant,
Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses())));
+ this.writeClient.completeTableService(TableServiceType.CLUSTER,
writeMetadata.getCommitMetadata().get(), table, instant);
clusteringMetrics.updateCommitMetrics(instant,
writeMetadata.getCommitMetadata().get());
// whether to clean up the input base parquet files used for clustering
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 1c9e2bb7579..d9db9cd51d1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -84,7 +84,7 @@ public class DataSourceInternalWriterHelper {
public void commit(List<WriteStatus> writeStatuses) {
try {
List<HoodieWriteStat> writeStatList =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- writeClient.commitStats(instantTime,
writeClient.getEngineContext().parallelize(writeStatuses), writeStatList,
Option.of(extraMetadata),
+ writeClient.commitStats(instantTime, writeStatList,
Option.of(extraMetadata),
CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 7f2663d1051..a96dc752ee6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -305,7 +305,7 @@ public class TestSparkConsistentBucketClustering extends
HoodieSparkClientTestHa
List<WriteStatus> writeStatues = writeData(writeTime, 2000, false);
// Cannot schedule clustering if there is in-flight writer
Assertions.assertFalse(writeClient.scheduleClustering(Option.empty()).isPresent());
- Assertions.assertTrue(writeClient.commitStats(writeTime,
context.parallelize(writeStatues, 1),
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Assertions.assertTrue(writeClient.commitStats(writeTime,
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType()));
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -341,7 +341,7 @@ public class TestSparkConsistentBucketClustering extends
HoodieSparkClientTestHa
List<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
commitTime).collect();
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
if (doCommit) {
- Assertions.assertTrue(writeClient.commitStats(commitTime,
context.parallelize(writeStatues, 1),
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Assertions.assertTrue(writeClient.commitStats(commitTime,
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType()));
}
metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index bc7bc5edfc5..96b4d425102 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -157,7 +157,7 @@ public class TestSparkSortAndSizeClustering extends
HoodieSparkClientTestHarness
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
if (doCommit) {
- Assertions.assertTrue(writeClient.commitStats(commitTime,
context.parallelize(writeStatues, 1),
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Assertions.assertTrue(writeClient.commitStats(commitTime,
writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
Option.empty(), metaClient.getCommitActionType()));
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index e69c866ed18..15b52a68b8e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -322,9 +322,9 @@ class RecordLevelIndexTestBase extends
HoodieSparkClientTestBase {
writeConfig.getRecordIndexMinFileGroupCount,
writeConfig.getRecordIndexMaxFileGroupCount,
writeConfig.getRecordIndexGrowthFactor,
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
assertEquals(estimatedFileGroupCount,
getFileGroupCountForRecordIndex(writeConfig))
- val prevDf = mergedDfList.last.drop("tip_history")
+ val prevDf = mergedDfList.last.drop("tip_history", "_hoodie_is_deleted")
val nonMatchingRecords = readDf.drop("_hoodie_commit_time",
"_hoodie_commit_seqno", "_hoodie_record_key",
- "_hoodie_partition_path", "_hoodie_file_name", "tip_history")
+ "_hoodie_partition_path", "_hoodie_file_name", "tip_history",
"_hoodie_is_deleted")
.join(prevDf, prevDf.columns, "leftanti")
assertEquals(0, nonMatchingRecords.count())
assertEquals(readDf.count(), prevDf.count())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 14d56db3160..5693f1804e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -71,7 +71,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
val dataGen1 = HoodieTestDataGenerator.createTestGeneratorFirstPartition()
val dataGen2 = HoodieTestDataGenerator.createTestGeneratorSecondPartition()
- // batch1 inserts
+ // batch1 inserts (5 records)
val instantTime1 = getNewInstantTime()
val latestBatch = recordsToStrings(dataGen1.generateInserts(instantTime1,
5)).asScala.toSeq
var operation = INSERT_OPERATION_OPT_VAL
@@ -111,7 +111,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
val instantTime3 = getNewInstantTime()
- // batch3. updates to partition2
+ // batch3. update 2 records from newly inserted records from commit 2 to
partition2
val latestBatch3 =
recordsToStrings(dataGen2.generateUniqueUpdates(instantTime3, 2)).asScala.toSeq
val latestBatchDf3 =
spark.read.json(spark.sparkContext.parallelize(latestBatch3, 1))
latestBatchDf3.cache()
@@ -240,7 +240,29 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase {
.save(basePath)
val prevDf = mergedDfList.last
mergedDfList = mergedDfList :+ prevDf.except(deleteDf)
- validateDataAndRecordIndices(hudiOpts)
+ validateDataAndRecordIndices(hudiOpts, deleteDf)
+ }
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testRLIForDeletesWithHoodieIsDeletedColumn(tableType: HoodieTableType):
Unit = {
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name())
+ val insertDf = doWriteAndValidateDataAndRecordIndex(hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ insertDf.cache()
+
+ val deleteBatch =
recordsToStrings(dataGen.generateUniqueDeleteRecords(getNewInstantTime,
1)).asScala
+ val deleteDf =
spark.read.json(spark.sparkContext.parallelize(deleteBatch.toSeq, 1))
+ deleteDf.cache()
+ val recordKeyToDelete =
deleteDf.collectAsList().get(0).getAs("_row_key").asInstanceOf[String]
+ deleteDf.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val prevDf = mergedDfList.last
+ mergedDfList = mergedDfList :+ prevDf.filter(row =>
row.getAs("_row_key").asInstanceOf[String] != recordKeyToDelete)
+ validateDataAndRecordIndices(hudiOpts, deleteDf)
}
@ParameterizedTest
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
index 2daafb37a1d..90aa040dac9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
@@ -96,7 +96,7 @@ public class HoodieOfflineJobTestBase extends
UtilitiesTestBase {
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
if (doCommit) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- boolean committed = client.commitStats(instant,
context.parallelize(writeStatuses, 1), writeStats, Option.empty(),
metaClient.getCommitActionType());
+ boolean committed = client.commitStats(instant, writeStats,
Option.empty(), metaClient.getCommitActionType());
Assertions.assertTrue(committed);
}
metaClient = HoodieTableMetaClient.reload(metaClient);