vinothchandar commented on code in PR #12236:
URL: https://github.com/apache/hudi/pull/12236#discussion_r1840979503


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -330,10 +393,22 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, HoodieTable tab
       this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
+      // write to MDT FILES partition and commit

Review Comment:
   we need to nicely compartmentalize all this MT vs DT write stat splitting 
and committing.. so we don't change every method call path.. Will take some work



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -274,26 +317,39 @@ public boolean commitStats(String instantTime, 
HoodieData<WriteStatus> writeStat
         commitCallback = HoodieCommitCallbackFactory.create(config);
       }
       commitCallback.call(new HoodieWriteCommitCallbackMessage(
-          instantTime, config.getTableName(), config.getBasePath(), stats, 
Option.of(commitActionType), extraMetadata));
+          instantTime, config.getTableName(), config.getBasePath(), 
dataTableStats, Option.of(commitActionType), extraMetadata));
     }
     return true;
   }
 
   protected void commit(HoodieTable table, String commitActionType, String 
instantTime, HoodieCommitMetadata metadata,
-                        List<HoodieWriteStat> stats, HoodieData<WriteStatus> 
writeStatuses) throws IOException {
+                        List<HoodieWriteStat> dataTablestats, 
List<HoodieWriteStat> mdtStats, HoodieData<WriteStatus> writeStatuses) throws 
IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
-    finalizeWrite(table, instantTime, stats);
+    finalizeWrite(table, instantTime, dataTablestats);
     // do save internal schema to support Implicitly add columns in write 
process
     if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
         && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
       saveInternalSchema(table, instantTime, metadata);
     }
+    // generate Completion time
+    String completionTime = activeTimeline.createCompletionTime();
     // update Metadata table
-    writeTableMetadata(table, instantTime, metadata, writeStatuses);
+    if (metadataWriterMap.containsKey(instantTime) && 
metadataWriterMap.get(instantTime).isPresent()) {
+      try {
+        // wraps the commit in MDT
+        // todo: do we need to add completion time to MDT or not yet.
+        
metadataWriterMap.get(instantTime).get().writeToFilesPartitionAndCommit(instantTime,
 context, mdtStats, metadata);
+        metadataWriterMap.get(instantTime).get().close();

Review Comment:
   answered my previous q here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -330,10 +393,22 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, HoodieTable tab
       this.txnManager.beginTransaction(Option.of(compactionInstant), 
Option.empty());
       finalizeWrite(table, compactionCommitTime, writeStats);
       // commit to data table after committing to metadata table.
-      writeTableMetadata(table, compactionCommitTime, metadata, 
context.emptyHoodieData());
+      // write to MDT FILES partition and commit

Review Comment:
   Kind of what @danny0405 is pointing to



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -298,12 +314,56 @@ protected HoodieWriteMetadata<O> compact(String 
compactionInstantTime, boolean s
       table.getMetaClient().reloadActiveTimeline();
     }
     compactionTimer = metrics.getCompactionCtx();
+    // start commit in MDT if enabled
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriterFunc.apply(compactionInstantTime, table.getMetaClient());
+    if (metadataWriterOpt.isPresent()) {

Review Comment:
   Could this be not done in a single place - where we start the commit for the 
write operation? i.e whoever generates `compactionInstantTime` 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+public class MetadataIndexGenerator implements Serializable {

Review Comment:
   we need some sort of interface for generating index record for each type of 
index given WriteStat..



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -173,6 +179,35 @@ public BaseHoodieWriteClient(HoodieEngineContext context,
     this.metrics.emitIndexTypeMetrics(config.getIndexType().ordinal());
   }
 
+  protected Option<HoodieTableMetadataWriter> getMetadataWriter(
+      String triggeringInstantTimestamp) {
+    // Each engine is expected to override this and
+    // provide the actual metadata writer, if enabled.
+    isMetadataTableExists = false;
+    return Option.empty();
+  }
+
+  public void maybeDeleteMetadataTable(HoodieTableMetaClient metaClient) {

Review Comment:
   we should simplify all this enabled/disable business for MDT. just turn it 
on by default and be the only mode, now that Flink is also turned on



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -61,6 +61,7 @@ public class WriteStatus implements Serializable {
 
   private final List<Pair<HoodieRecordDelegate, Throwable>> failedRecords = 
new ArrayList<>();
 
+  private boolean isMetadataTable;

Review Comment:
   Does this track col-stats already? I guess yes



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -250,5 +260,9 @@ protected void setupWriteStatus() throws IOException {
     RuntimeStats runtimeStats = new RuntimeStats();
     runtimeStats.setTotalCreateTime(timer.endTimer());
     stat.setRuntimeStats(runtimeStats);
+
+    if (colStatsEnabled) {
+      attachColStats(stat, recordList, fieldsToIndex, 
writeSchemaWithMetaFields);

Review Comment:
   why buffer entire record?  can't we just extract value for column or 
maintain running stats



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -281,4 +304,21 @@ protected static Option<IndexedRecord> 
toAvroRecord(HoodieRecord record, Schema
       return Option.empty();
     }
   }
+
+  protected void attachColStats(HoodieWriteStat stat, List<HoodieRecord> 
recordList, List<Schema.Field> fieldsToIndex,

Review Comment:
   needs to be done on-the-fly



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1040,6 +1062,136 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
     initializeFromFilesystem(instantTime, partitionTypes, Option.empty());
   }
 
+  public void startCommit(String instantTime) {
+    BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient();
+
+    if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
+      // if this is a new commit being applied to metadata for the first time
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
+    } else {
+      // this code path refers to a re-attempted commit that:
+      //   1. got committed to metadata table, but failed in datatable.
+      //   2. failed while committing to metadata table
+      // for e.g., let's say compaction c1 on 1st attempt succeeded in 
metadata table and failed before committing to datatable.
+      // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+      // are upserts to metadata table and so only a new delta commit will be 
created.
+      // once rollback is complete in datatable, compaction will be retried 
again, which will eventually hit this code block where the respective commit is
+      // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
+      Option<HoodieInstant> existingInstant = 
metadataMetaClient.getActiveTimeline().filter(entry -> 
entry.getTimestamp().equals(instantTime))
+          .lastInstant();
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          existingInstant.isPresent() ? "Already" : "Partially", instantTime);
+
+      // Rollback the previous commit
+      if (!writeClient.rollback(instantTime)) {
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
+      }
+      metadataMetaClient.reloadActiveTimeline();
+      reInitWriteClient();
+    }
+
+    getWriteClient().startCommitWithTime(instantTime, 
HoodieTimeline.DELTA_COMMIT_ACTION);
+  }
+
+  public void writeToFilesPartitionAndCommit(String instantTime, 
HoodieEngineContext context, List<HoodieWriteStat> partialMdtWriteStats, 
HoodieCommitMetadata metadata) {

Review Comment:
   we can break up the `writeXXX` method and committing? i.e call both from 
layer above.



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java:
##########
@@ -65,15 +70,16 @@ public class HoodieFlinkTableServiceClient<T> extends 
BaseHoodieTableServiceClie
 
   protected HoodieFlinkTableServiceClient(HoodieEngineContext context,
                                           HoodieWriteConfig clientConfig,
-                                          Option<EmbeddedTimelineService> 
timelineService) {
-    super(context, clientConfig, timelineService);
+                                          Option<EmbeddedTimelineService> 
timelineService,

Review Comment:
   @danny0405 can you put together or have a diagram that shows the class 
dependencies in Flink.. i.e what code runs on Driver/Coordinator vs Executor. 
classes involved etc. 
   
   I want to make sure this works for Flink easily as well



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -76,10 +77,15 @@ public class WriteStatus implements Serializable {
   private final boolean trackSuccessRecords;
   private final transient Random random;
 
-  public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
+  public WriteStatus(Boolean trackSuccessRecords, Double failureFraction, 
Boolean isMetadataTable) {

Review Comment:
   all the naming needs to be revisited.. not adding any code comments for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to