nsivabalan commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2121931023


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1093,6 +1116,113 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes 
should be enabled for startCommit API");
+    BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.requestedTime().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
+      }
+      metadataMetaClient.reloadActiveTimeline();
+    }
+
+    getWriteClient().startCommitForMetadataTable(metadataMetaClient, 
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  public void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata 
metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);
+    getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(), 
HoodieTimeline.DELTA_COMMIT_ACTION,
+        Collections.emptyMap(), Option.empty());
+  }
+
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    // Generate HoodieRecords for MDT partitions which can be generated just 
by using one WriteStatus
+
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    HoodieData<Pair<String, HoodieRecord>> mdtRecordsBasedOnWriteStatus = 
writeStatus.flatMap(
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
 dataWriteConfig, storageConf, instantTime));
+
+    // tag records
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedMdtRecords = 
tagRecordsWithLocation(mdtRecordsBasedOnWriteStatus,
+        mdtPartitionsToTag.stream().map(mdtPartition -> 
mdtPartition.getPartitionPath()).collect(
+            Collectors.toSet()));
+
+    // write partial writes to mdt table (every partition except FILES)
+    HoodieData<WriteStatus> mdtWriteStatus = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedMdtRecords,
 instantTime));
+    // dag not yet de-referenced. do not invoke any action on mdtWriteStatus 
yet.
+    return mdtWriteStatus;
+  }
+
+  protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> taggedMdtRecords, String instantTime) {

Review Comment:
   java docs. 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class TestMetadataWriterCommit extends BaseTestHandle {
+
+  @Test
+  public void testCreateHandleRLIStats() throws IOException {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withMetadataIndexColumnStats(false)
+            .withSecondaryIndexEnabled(false)
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = InProcessTimeGenerator.createNewInstantTime();
+
+    // create a parquet file and obtain corresponding write status
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    HoodieCommitMetadata commitMetadata = 
createCommitMetadata(writeStatus.getStat(), partitionPath);
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    // create mdt writer
+    HoodieBackedTableMetadataWriter mdtWriter = 
(HoodieBackedTableMetadataWriter) 
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
+        HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
+    HoodieTableMetaClient mdtMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() + 
"/metadata").setConf(storageConf).build();
+    assertEquals(2, 
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
+
+    // Create commit in MDT
+    mdtWriter.startCommit(instantTime);
+    HoodieData<WriteStatus> mdtWriteStatus = 
mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus),
 context, 1), instantTime);
+    List<HoodieWriteStat> mdtWriteStats = 
mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats, 
commitMetadata);

Review Comment:
   understand where FILES partition are invoked.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -147,6 +157,20 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> 
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime) {
+    JavaRDD<HoodieRecord> mdtRecords = 
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+    engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+    // TODO: Introduce prepped upsert call after client APIs are added
+    JavaRDD<WriteStatus> metadataWriteStatusesSoFar = 
getWriteClient().upsertPreppedRecords(mdtRecords, instantTime, 
Option.of(mdtRecordsHoodieData.getKey()));

Review Comment:
   incorporate changes for isMetadataIndexSupportedForStreamingWrites



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1093,6 +1116,113 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes 
should be enabled for startCommit API");
+    BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.requestedTime().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
+      }
+      metadataMetaClient.reloadActiveTimeline();
+    }
+
+    getWriteClient().startCommitForMetadataTable(metadataMetaClient, 
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  public void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata 
metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);

Review Comment:
   fix here to account for all enabled partitions for which streaming writes 
are not enabled. but we need to trigger upsertPrepped for those left over 
partitons. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -147,6 +157,20 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> 
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime) {
+    JavaRDD<HoodieRecord> mdtRecords = 
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+    engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+    // TODO: Introduce prepped upsert call after client APIs are added
+    JavaRDD<WriteStatus> metadataWriteStatusesSoFar = 
getWriteClient().upsertPreppedRecords(mdtRecords, instantTime, 
Option.of(mdtRecordsHoodieData.getKey()));

Review Comment:
   this is the 2nd api that we need to call for all partitions which is not 
supported w/ streaming flow. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once https://github.com/apache/hudi/pull/13226 is landed, we will leverage 
the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+  /**
+   * MDT record generation utility. This function is expected to be invoked 
from a map Partition call, where one spark task will receive
+   * one WriteStatus as input and the output contains prepared Metadata table 
records for all eligible partitions that can operate on one
+   * WriteStatus instance only.
+   */
+  static class WriteStatusBasedMetadataIndexGenerator implements 
SerializableFunction<WriteStatus, Iterator<Pair<String, HoodieRecord>>> {
+    List<MetadataPartitionType> enabledPartitionTypes;
+    HoodieWriteConfig dataWriteConfig;
+    StorageConfiguration<?> storageConf;
+    String instantTime;
+
+    public WriteStatusBasedMetadataIndexGenerator(List<MetadataPartitionType> 
enabledPartitionTypes, HoodieWriteConfig dataWriteConfig, 
StorageConfiguration<?> storageConf, String instantTime) {
+      this.enabledPartitionTypes = enabledPartitionTypes;
+      this.dataWriteConfig = dataWriteConfig;
+      this.storageConf = storageConf;
+      this.instantTime = instantTime;
+    }
+
+    @Override
+    public Iterator<Pair<String, HoodieRecord>> apply(WriteStatus writeStatus) 
throws Exception {
+      List<Pair<String, HoodieRecord>> allRecords = new ArrayList<>();

Review Comment:
   Siva to incroporate changes from his patch to account for 
isMetadataPartitionSupported for streaming writes



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -428,6 +432,20 @@ public void bootstrap(Option<Map<String, String>> 
extraMetadata) {
    */
   public abstract O upsertPreppedRecords(I preppedRecords, final String 
instantTime);
 
+  /**
+   * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
+   * <p>
+   * This implementation requires that the input records are already tagged, 
and de-duped if needed.
+   *
+   * @param preppedRecords  Prepared HoodieRecords to upsert
+   * @param instantTime     Instant time of the commit
+   * @param fileGroupIdsOpt Optional list of HoodieFileGroupIds impacted by 
the commit
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public O upsertPreppedRecords(I preppedRecords, final String instantTime, 
Option<List<HoodieFileGroupId>> fileGroupIdsOpt) {

Review Comment:
   should not be here. lets remove it



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java:
##########
@@ -120,6 +126,11 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return null;

Review Comment:
   why null ? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,34 @@
  */
 public interface HoodieTableMetadataWriter<I,O> extends Serializable, 
AutoCloseable {
 
+  /**
+   * Starts a new commit in metadata table for optimized write flow.
+   * @param instantTime
+   */
+  void startCommit(String instantTime);
+
+  /**
+   * Prepare records and write to Metadata table for all eligible partitions 
except FILES partition. This will be used in optimized writes,
+   * where in data table writes statuses are maintained as HoodieData and 
based on that, we prepare records and write to Metadata table
+   * partitions (except FILES). Caution should be followed to ensure the 
action is not triggered on the incoming HoodieData < WriteStatus >
+   *   and for the writes to metadata table. Caller is expected to trigger 
collect just once for both set of HoodieData < WriteStatus >.
+   * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data 
table writes.
+   * @param instantTime instant time of interest.
+   * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata 
table.
+   */
+  HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime);
+
+  /**
+   * This api will be used in streaming writes to metadata flow, where in a 
write in data table is already written to all data table, all partitions in 
Metadata table
+   * using {@code #streamWriteToAllPartitions} and the action is triggered for 
all writes together. Post that, marker reconciliation of data table is executed 
and we
+   * complete the commit. This will also take care of executing marker 
reconciliation in metadata table for all metadata table partitions.
+   * @param instantTime instant time of interest.
+   * @param context {@link HoodieEngineContext} of interest.
+   * @param metadataWriteStatsSoFar List<HoodieWriteStat> for 
partial/streaming writes to metadata table completed so far.
+   * @param commitMetadata {@link HoodieCommitMetadata} of interest.
+   */
+  void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata 
commitMetadata);
+

Review Comment:
   understand why the other api is missing from here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to