nsivabalan commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2121931023
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1093,6 +1116,113 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+ BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that:
+ // 1. got committed to metadata table, but failed in datatable.
+ // 2. failed while committing to metadata table
+ // for e.g., let's say compaction c1 on 1st attempt succeeded in
metadata table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will be
created.
+ // once rollback is complete in datatable, compaction will be retried
again, which will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually rollback
the completed instant and proceed.
+ Option<HoodieInstant> existingInstant =
metadataMetaClient.getActiveTimeline().filter(entry ->
entry.requestedTime().equals(instantTime))
+ .lastInstant();
+ LOG.info("{} completed commit at {} being applied to MDT.",
+ existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+ // Rollback the previous commit
+ if (!writeClient.rollback(instantTime)) {
+ throw new HoodieMetadataException(String.format("Failed to rollback
deltacommit at %s from MDT", instantTime));
+ }
+ metadataMetaClient.reloadActiveTimeline();
+ }
+
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
+ getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(),
HoodieTimeline.DELTA_COMMIT_ACTION,
+ Collections.emptyMap(), Option.empty());
+ }
+
+ public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
+ // Generate HoodieRecords for MDT partitions which can be generated just
by using one WriteStatus
+
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ HoodieData<Pair<String, HoodieRecord>> mdtRecordsBasedOnWriteStatus =
writeStatus.flatMap(
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
dataWriteConfig, storageConf, instantTime));
+
+ // tag records
+ Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedMdtRecords =
tagRecordsWithLocation(mdtRecordsBasedOnWriteStatus,
+ mdtPartitionsToTag.stream().map(mdtPartition ->
mdtPartition.getPartitionPath()).collect(
+ Collectors.toSet()));
+
+ // write partial writes to mdt table (every partition except FILES)
+ HoodieData<WriteStatus> mdtWriteStatus =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedMdtRecords,
instantTime));
+ // dag not yet de-referenced. do not invoke any action on mdtWriteStatus
yet.
+ return mdtWriteStatus;
+ }
+
+ protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> taggedMdtRecords, String instantTime) {
Review Comment:
java docs.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class TestMetadataWriterCommit extends BaseTestHandle {
+
+ @Test
+ public void testCreateHandleRLIStats() throws IOException {
+ // init config and table
+ HoodieWriteConfig config = getConfigBuilder(basePath)
+
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordIndex(true)
+ .withMetadataIndexColumnStats(false)
+ .withSecondaryIndexEnabled(false)
+ .build())
+ .build();
+
+ HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+ // one round per partition
+ String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+ // init some args
+ String fileId = UUID.randomUUID().toString();
+ String instantTime = InProcessTimeGenerator.createNewInstantTime();
+
+ // create a parquet file and obtain corresponding write status
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
+ Pair<WriteStatus, List<HoodieRecord>> statusListPair =
createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
+ WriteStatus writeStatus = statusListPair.getLeft();
+ List<HoodieRecord> records = statusListPair.getRight();
+ HoodieCommitMetadata commitMetadata =
createCommitMetadata(writeStatus.getStat(), partitionPath);
+
+ assertEquals(records.size(), writeStatus.getTotalRecords());
+ assertEquals(0, writeStatus.getTotalErrorRecords());
+
+ // create mdt writer
+ HoodieBackedTableMetadataWriter mdtWriter =
(HoodieBackedTableMetadataWriter)
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+ HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+ HoodieTableMetaClient mdtMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() +
"/metadata").setConf(storageConf).build();
+ assertEquals(2,
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
+
+ // Create commit in MDT
+ mdtWriter.startCommit(instantTime);
+ HoodieData<WriteStatus> mdtWriteStatus =
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
context, 1), instantTime);
+ List<HoodieWriteStat> mdtWriteStats =
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats,
commitMetadata);
Review Comment:
understand where FILES partition are invoked.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -147,6 +157,20 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ public JavaRDD<WriteStatus>
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime) {
+ JavaRDD<HoodieRecord> mdtRecords =
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
String.format("Upserting at %s into metadata table %s", instantTime,
metadataWriteConfig.getTableName()));
+ // TODO: Introduce prepped upsert call after client APIs are added
+ JavaRDD<WriteStatus> metadataWriteStatusesSoFar =
getWriteClient().upsertPreppedRecords(mdtRecords, instantTime,
Option.of(mdtRecordsHoodieData.getKey()));
Review Comment:
incorporate changes for isMetadataIndexSupportedForStreamingWrites
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1093,6 +1116,113 @@ public void buildMetadataPartitions(HoodieEngineContext
engineContext, List<Hood
initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
}
+ public void startCommit(String instantTime) {
+ ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes
should be enabled for startCommit API");
+ BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+ if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
+ // if this is a new commit being applied to metadata for the first time
+ LOG.info("New commit at {} being applied to MDT.", instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that:
+ // 1. got committed to metadata table, but failed in datatable.
+ // 2. failed while committing to metadata table
+ // for e.g., let's say compaction c1 on 1st attempt succeeded in
metadata table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending
compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will be
created.
+ // once rollback is complete in datatable, compaction will be retried
again, which will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually rollback
the completed instant and proceed.
+ Option<HoodieInstant> existingInstant =
metadataMetaClient.getActiveTimeline().filter(entry ->
entry.requestedTime().equals(instantTime))
+ .lastInstant();
+ LOG.info("{} completed commit at {} being applied to MDT.",
+ existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+ // Rollback the previous commit
+ if (!writeClient.rollback(instantTime)) {
+ throw new HoodieMetadataException(String.format("Failed to rollback
deltacommit at %s from MDT", instantTime));
+ }
+ metadataMetaClient.reloadActiveTimeline();
+ }
+
+ getWriteClient().startCommitForMetadataTable(metadataMetaClient,
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+ }
+
+ public void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
metadata) {
+ List<HoodieWriteStat> allWriteStats = new
ArrayList<>(metadataWriteStatsSoFar);
Review Comment:
fix here to account for all enabled partitions for which streaming writes
are not enabled. but we need to trigger upsertPrepped for those left over
partitons.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -147,6 +157,20 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ public JavaRDD<WriteStatus>
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime) {
+ JavaRDD<HoodieRecord> mdtRecords =
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+ engineContext.setJobStatus(this.getClass().getSimpleName(),
String.format("Upserting at %s into metadata table %s", instantTime,
metadataWriteConfig.getTableName()));
+ // TODO: Introduce prepped upsert call after client APIs are added
+ JavaRDD<WriteStatus> metadataWriteStatusesSoFar =
getWriteClient().upsertPreppedRecords(mdtRecords, instantTime,
Option.of(mdtRecordsHoodieData.getKey()));
Review Comment:
this is the 2nd api that we need to call for all partitions which is not
supported w/ streaming flow.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once https://github.com/apache/hudi/pull/13226 is landed, we will leverage
the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+ /**
+ * MDT record generation utility. This function is expected to be invoked
from a map Partition call, where one spark task will receive
+ * one WriteStatus as input and the output contains prepared Metadata table
records for all eligible partitions that can operate on one
+ * WriteStatus instance only.
+ */
+ static class WriteStatusBasedMetadataIndexGenerator implements
SerializableFunction<WriteStatus, Iterator<Pair<String, HoodieRecord>>> {
+ List<MetadataPartitionType> enabledPartitionTypes;
+ HoodieWriteConfig dataWriteConfig;
+ StorageConfiguration<?> storageConf;
+ String instantTime;
+
+ public WriteStatusBasedMetadataIndexGenerator(List<MetadataPartitionType>
enabledPartitionTypes, HoodieWriteConfig dataWriteConfig,
StorageConfiguration<?> storageConf, String instantTime) {
+ this.enabledPartitionTypes = enabledPartitionTypes;
+ this.dataWriteConfig = dataWriteConfig;
+ this.storageConf = storageConf;
+ this.instantTime = instantTime;
+ }
+
+ @Override
+ public Iterator<Pair<String, HoodieRecord>> apply(WriteStatus writeStatus)
throws Exception {
+ List<Pair<String, HoodieRecord>> allRecords = new ArrayList<>();
Review Comment:
Siva to incroporate changes from his patch to account for
isMetadataPartitionSupported for streaming writes
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -428,6 +432,20 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
*/
public abstract O upsertPreppedRecords(I preppedRecords, final String
instantTime);
+ /**
+ * Upserts the given prepared records into the Hoodie table, at the supplied
instantTime.
+ * <p>
+ * This implementation requires that the input records are already tagged,
and de-duped if needed.
+ *
+ * @param preppedRecords Prepared HoodieRecords to upsert
+ * @param instantTime Instant time of the commit
+ * @param fileGroupIdsOpt Optional list of HoodieFileGroupIds impacted by
the commit
+ * @return Collection of WriteStatus to inspect errors and counts
+ */
+ public O upsertPreppedRecords(I preppedRecords, final String instantTime,
Option<List<HoodieFileGroupId>> fileGroupIdsOpt) {
Review Comment:
should not be here. lets remove it
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java:
##########
@@ -120,6 +126,11 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return null;
Review Comment:
why null ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,34 @@
*/
public interface HoodieTableMetadataWriter<I,O> extends Serializable,
AutoCloseable {
+ /**
+ * Starts a new commit in metadata table for optimized write flow.
+ * @param instantTime
+ */
+ void startCommit(String instantTime);
+
+ /**
+ * Prepare records and write to Metadata table for all eligible partitions
except FILES partition. This will be used in optimized writes,
+ * where in data table writes statuses are maintained as HoodieData and
based on that, we prepare records and write to Metadata table
+ * partitions (except FILES). Caution should be followed to ensure the
action is not triggered on the incoming HoodieData < WriteStatus >
+ * and for the writes to metadata table. Caller is expected to trigger
collect just once for both set of HoodieData < WriteStatus >.
+ * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data
table writes.
+ * @param instantTime instant time of interest.
+ * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata
table.
+ */
+ HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime);
+
+ /**
+ * This api will be used in streaming writes to metadata flow, where in a
write in data table is already written to all data table, all partitions in
Metadata table
+ * using {@code #streamWriteToAllPartitions} and the action is triggered for
all writes together. Post that, marker reconciliation of data table is executed
and we
+ * complete the commit. This will also take care of executing marker
reconciliation in metadata table for all metadata table partitions.
+ * @param instantTime instant time of interest.
+ * @param context {@link HoodieEngineContext} of interest.
+ * @param metadataWriteStatsSoFar List<HoodieWriteStat> for
partial/streaming writes to metadata table completed so far.
+ * @param commitMetadata {@link HoodieCommitMetadata} of interest.
+ */
+ void completeStreamingCommit(String instantTime, HoodieEngineContext
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata
commitMetadata);
+
Review Comment:
understand why the other api is missing from here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]