the-other-tim-brown commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2134835649


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,37 @@
  */
 public interface HoodieTableMetadataWriter<I,O> extends Serializable, 
AutoCloseable {
 
+  /**
+   * Starts a new commit in metadata table for optimized write flow.
+   * @param instantTime
+   */
+  void startCommit(String instantTime);
+
+  /**
+   * Prepare records and write to Metadata table for all eligible partitions 
except FILES partition. This will be used in optimized writes,
+   * where in data table writes statuses are maintained as HoodieData and 
based on that, we prepare records and write to Metadata table
+   * partitions (except FILES). Caution should be followed to ensure the 
action is not triggered on the incoming HoodieData < WriteStatus >
+   *   and for the writes to metadata table. Caller is expected to trigger 
collect just once for both set of HoodieData < WriteStatus >.
+   * @param writeStatus {@link HoodieData} of {@link WriteStatus} from data 
table writes.
+   * @param instantTime instant time of interest.
+   * @return {@link HoodieData} of {@link WriteStatus} for writes to metadata 
table.
+   */
+  HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime);

Review Comment:
   Streaming is one of those terms that an mean different things in different 
contexts, is there another term we could use to describe this write pattern?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -161,6 +185,13 @@ protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?
     writeClient.commit(instantTime, writeStatusJavaRDD, Option.empty(), 
DELTA_COMMIT_ACTION, Collections.emptyMap());
   }
 
+  @Override
+  protected void upsertAndCommit(BaseHoodieWriteClient<?, 
JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String 
instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
+                                 List<HoodieFileGroupId> 
mdtFileGroupsIdsToUpdate) {
+    JavaRDD<WriteStatus> writeStatusJavaRDD = 
((SparkRDDMetadataWriteClient)writeClient).upsertPreppedRecords(preppedRecordInputs,
 instantTime, Option.of(mdtFileGroupsIdsToUpdate));

Review Comment:
   it may be cleaner to move this casting to a helper method



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -36,6 +39,37 @@
  */
 public interface HoodieTableMetadataWriter<I,O> extends Serializable, 
AutoCloseable {
 
+  /**
+   * Starts a new commit in metadata table for optimized write flow.

Review Comment:
   Right now we have a special flow in the BaseHoodieWriteClient for starting 
commits for the metadata table. Would it be possible to have that flow through 
this method instead?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -200,12 +218,17 @@ protected 
HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
       }
     }
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
+
+    this.metadataIndexGenerator = streamingWrites ? 
Option.of(getMetadataIndexGenerator()) : Option.empty();
+    this.streamingWritesEnabled = streamingWrites;
   }
 
   List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig 
metadataConfig, HoodieTableMetaClient metaClient) {
     return MetadataPartitionType.getEnabledPartitions(metadataConfig, 
metaClient);
   }
 
+  abstract MetadataIndexGenerator getMetadataIndexGenerator();

Review Comment:
   nitpick: the naming of this overlaps with the getter for the instance 
variable. Can this be named something like `initializeMetadataIndexGenerator` 
instead?
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * For now this is a placeholder to generate all MDT records in one place.
+ * Once https://github.com/apache/hudi/pull/13226 is landed, we will leverage 
the new abstraction to generate MDT records.
+ */
+public class MetadataIndexGenerator implements Serializable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataIndexGenerator.class);
+
+  /**
+   * MDT record generation utility. This function is expected to be invoked 
from a map Partition call, where one spark task will receive
+   * one WriteStatus as input and the output contains prepared Metadata table 
records for all eligible partitions that can operate on one
+   * WriteStatus instance only.
+   */
+  static class WriteStatusBasedMetadataIndexGenerator implements 
SerializableFunction<WriteStatus, Iterator<HoodieRecord>> {
+    List<MetadataPartitionType> enabledPartitionTypes;
+    HoodieWriteConfig dataWriteConfig;
+    StorageConfiguration<?> storageConf;
+    String instantTime;

Review Comment:
   make this `private final`?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.

Review Comment:
   Is this the right class referenced?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -148,6 +158,20 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> 
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String 
instantTime) {
+    JavaRDD<HoodieRecord> mdtRecords = 
HoodieJavaRDD.getJavaRDD(hoodieFileGroupsToUpdateAndTaggedMdtRecords.getValue());
+    engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Upserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
+    JavaRDD<WriteStatus> metadataWriteStatusesSoFar = 
((SparkRDDMetadataWriteClient)getWriteClient()).upsertPreppedRecords(mdtRecords,
 instantTime, Option.of(

Review Comment:
   nitpick: formatting - put a space after `(SparkRDDMetadataWriteClient)`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1096,6 +1119,126 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    ValidationUtils.checkState(streamingWritesEnabled, "Streaming writes 
should be enabled for startCommit API");
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      throw new HoodieMetadataException("Starting the same commit in Metadata 
table more than once w/o rolling back : " + instantTime);
+    }
+
+    // this is where we might instantiate the write client to metadata table 
for the first time.
+    getWriteClient().startCommitForMetadataTable(metadataMetaClient, 
instantTime, HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+    if (mdtPartitionsToTag.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    HoodieData<HoodieRecord> untaggedMdtRecords = writeStatus.flatMap(
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexGenerator(mdtPartitionsToTag,
 dataWriteConfig, storageConf, instantTime));
+
+    // tag records w/ location
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords,
+        mdtPartitionsToTag.stream().map(mdtPartition -> 
mdtPartition.getPartitionPath()).collect(
+            Collectors.toSet()));
+
+    // write partial writes to mdt table (for those partitions where streaming 
writes are enabled)
+    HoodieData<WriteStatus> mdtWriteStatusHoodieData = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
+    // dag not yet de-referenced. do not invoke any action on 
mdtWriteStatusHoodieData yet.
+    return mdtWriteStatusHoodieData;
+  }
+
+  /**
+   * Upsert the tagged records to metadata table in the streaming flow.
+   * @param hoodieFileGroupsToUpdateAndTaggedMdtRecords Pair of {@link List} 
of {@link HoodieFileGroupId} referring to list of all file groups ids that 
could receive updates
+   *                                                    and {@link HoodieData} 
of tagged {@link HoodieRecord}s.
+   * @param instantTime instant time of interest.
+   * @return
+   */
+  protected O streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> hoodieFileGroupsToUpdateAndTaggedMdtRecords, String 
instantTime) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  @Override
+  public void completeStreamingCommit(String instantTime, HoodieEngineContext 
context, List<HoodieWriteStat> metadataWriteStatsSoFar, HoodieCommitMetadata 
metadata) {
+    List<HoodieWriteStat> allWriteStats = new 
ArrayList<>(metadataWriteStatsSoFar);
+    // update metadata for left over partitions which does not have streaming 
writes support.
+    allWriteStats.addAll(prepareAndWriteToNonStreamingPartitions(context, 
metadata, instantTime).map(writeStatus -> 
writeStatus.getStat()).collectAsList());
+    getWriteClient().commitStats(instantTime, allWriteStats, Option.empty(), 
HoodieTimeline.DELTA_COMMIT_ACTION,
+        Collections.emptyMap(), Option.empty());
+  }
+
+  private HoodieData<WriteStatus> 
prepareAndWriteToNonStreamingPartitions(HoodieEngineContext context, 
HoodieCommitMetadata commitMetadata, String instantTime) {
+    Set<String> mdtPartitionsToUpdate = 
getNonStreamingMetadataPartitionsToUpdate();
+    Map<String, HoodieData<HoodieRecord>> mdtPartitionsAndUnTaggedRecords = 
new MdtPartitionRecordGeneratorBatchMode(instantTime, commitMetadata, 
mdtPartitionsToUpdate)
+        .convertMetadata();
+
+    HoodieData<HoodieRecord> untaggedMdtRecords = context.emptyHoodieData();
+    for (Map.Entry<String, HoodieData<HoodieRecord>> entry: 
mdtPartitionsAndUnTaggedRecords.entrySet()) {
+      untaggedMdtRecords = untaggedMdtRecords.union(entry.getValue());
+    }
+
+    // write to mdt table for non streaming mdt partitions
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> taggedRecords = 
tagRecordsWithLocationWithStreamingWrites(untaggedMdtRecords, 
mdtPartitionsToUpdate);
+    HoodieData<WriteStatus> mdtWriteStatusHoodieData = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(taggedRecords, 
instantTime));
+    return mdtWriteStatusHoodieData;
+  }
+
+  private Set<String> getNonStreamingMetadataPartitionsToUpdate() {
+    Set<String> toReturn = 
enabledPartitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+    STREAMING_WRITES_SUPPORTED_PARTITIONS.forEach(metadataPartitionType -> 
toReturn.remove(metadataPartitionType.getPartitionPath()));
+    return toReturn;
+  }
+
+  /**
+   * Returns List of pair of partition name and MDT fileId updated in the 
partition along with the tagged MDT records.
+   * @param untaggedMdtRecords Untagged MDT records
+   */
+  protected Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
tagRecordsWithLocationWithStreamingWrites(HoodieData<HoodieRecord> 
untaggedMdtRecords,
+                                                                               
                               Set<String> enabledMetadataPartitions) {
+    List<HoodieFileGroupId> updatedMDTFileGroupIds = new ArrayList<>();
+    // Fetch latest file slices for all enabled MDT partitions
+    Map<String, List<FileSlice>> mdtPartitionLatestFileSlices = new 
HashMap<>();
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient)) {
+      enabledMetadataPartitions.forEach(partitionName -> {
+        List<FileSlice> fileSlices =
+            
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        if (fileSlices.isEmpty()) {
+          // scheduling of INDEX only initializes the file group and not add 
commit

Review Comment:
   is this case generally applicable or should the code check the metadata 
partition type? Would it be possible the file groups were from failed commits?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestCreateHandle.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+@SuppressWarnings("checkstyle:LineLength")

Review Comment:
   why is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to