the-other-tim-brown commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2145775792


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -317,33 +318,40 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?, 
I, ?, T> table, String c
     }
     compactionTimer = metrics.getCompactionCtx();
     HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
-    HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(writeMetadata);
+    HoodieWriteMetadata<T> processedWriteMetadata = 
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+    HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(processedWriteMetadata);
     if (shouldComplete) {
       commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));
     }
     return compactionWriteMetadata;
   }
 
+  protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table, 
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+    return writeMetadata;
+  }
+
   public void commitCompaction(String compactionInstantTime, 
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
     // dereferencing the write dag for compaction for the first time.
-    List<HoodieWriteStat> writeStats = 
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+    Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
dataTableAndMetadataTableHoodieWriteStats = 
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
     // Fetch commit metadata from HoodieWriteMetadata and update 
HoodieWriteStat
-    
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata, 
writeStats);
+    
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata, 
dataTableAndMetadataTableHoodieWriteStats.getKey());
     metrics.emitCompactionCompleted();
 
     HoodieTable table = tableOpt.orElseGet(() -> createTable(config, 
context.getStorageConf()));
-    completeCompaction(compactionWriteMetadata.getCommitMetadata().get(), 
table, compactionInstantTime);
+    completeCompaction(compactionWriteMetadata.getCommitMetadata().get(), 
table, compactionInstantTime, 
dataTableAndMetadataTableHoodieWriteStats.getValue());
   }
 
   /**
    * The API triggers the data write and fetches the corresponding write stats 
using the write metadata.
+   * When streaming writes to metadata table is enabled, writes to metadata 
table is expected to be triggered here and the List of {@link HoodieWriteStat} 
to be returned
+   * as part of this call.
    */
-  protected abstract List<HoodieWriteStat> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> writeMetadata);
+  protected abstract Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<O> writeMetadata);
 
   /**
    * Commit Compaction and track metrics.
    */
-  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime) {
+  protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable 
table, String compactionCommitTime, List<HoodieWriteStat> 
metadataWriteStatsSoFar) {

Review Comment:
   nit on naming: `metadataWriteStatsSoFar` could be 
`currentMetadataWriteStats`? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1185,6 +1203,13 @@ public HoodieWriteMetadata<O> logCompact(String 
logCompactionInstantTime) {
     return logCompact(logCompactionInstantTime, false);
   }
 
+  /**
+   * Commit Log Compaction and track metrics.
+   **/
+  protected void completeLogCompaction(HoodieCommitMetadata metadata, 
HoodieTable table, String logCompactionCommitTime) {

Review Comment:
   This method is unused, is it still required?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write 
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {

Review Comment:
   I think we can come up with a better name than `Wrapper`, maybe something 
that indicates this is for streaming writes? `StreamingMetadataWriteHandler` or 
something similar?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+  private HoodieMetadataWriteWrapper metadataWriterWrapper = new 
HoodieMetadataWriteWrapper();
   protected SparkRDDTableServiceClient(HoodieEngineContext context,
                                        HoodieWriteConfig clientConfig,
                                        Option<EmbeddedTimelineService> 
timelineService) {
     super(context, clientConfig, timelineService);
   }
 
   @Override
-  protected List<HoodieWriteStat> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    return writeMetadata.getWriteStatuses().map(writeStatus -> 
writeStatus.getStat()).collect();
+  protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
tableServiceWriteMetadata) {
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggered at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so each 
object will be larger.
+    // So, here we are dropping all additional stats and error records to 
retain only the required information and prevent collecting large objects on 
the driver.
+    List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList = 
tableServiceWriteMetadata.getWriteStatuses()
+        .map(writeStatus -> new 
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(), 
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),

Review Comment:
   This is similar to the code in SparkRDDWriteClient. We can expose a static 
method on `SlimWriteStats` like `static List<SlimWriteStats> 
from(JavaRDD<WriteStatus> writeStatuses)` to keep the code consistent between 
the two clients.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1539,6 +1543,7 @@ protected void commitInternal(String instantTime, 
Map<String, HoodieData<HoodieR
     Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> result = 
tagRecordsWithLocation(partitionRecordsMap, isInitializing);
     HoodieData<HoodieRecord> preppedRecords = result.getKey();
     I preppedRecordInputs = 
convertHoodieDataToEngineSpecificData(preppedRecords);
+    List<HoodieFileGroupId> updatedMDTFileGroupIds = result.getValue();

Review Comment:
   This is unused, is that intentional?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+  private HoodieMetadataWriteWrapper metadataWriterWrapper = new 
HoodieMetadataWriteWrapper();
   protected SparkRDDTableServiceClient(HoodieEngineContext context,
                                        HoodieWriteConfig clientConfig,
                                        Option<EmbeddedTimelineService> 
timelineService) {
     super(context, clientConfig, timelineService);
   }
 
   @Override
-  protected List<HoodieWriteStat> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    return writeMetadata.getWriteStatuses().map(writeStatus -> 
writeStatus.getStat()).collect();
+  protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
tableServiceWriteMetadata) {
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggered at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so each 
object will be larger.
+    // So, here we are dropping all additional stats and error records to 
retain only the required information and prevent collecting large objects on 
the driver.
+    List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList = 
tableServiceWriteMetadata.getWriteStatuses()
+        .map(writeStatus -> new 
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(), 
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+            writeStatus.getStat())).collect();
+
+    List<HoodieWriteStat> dataTableWriteStats = 
writeStatusMetadataTrackerList.stream().filter(entry -> 
!entry.isMetadataTable()).map(entry -> 
entry.getWriteStat()).collect(Collectors.toList());
+    List<HoodieWriteStat> mdtWriteStats = 
writeStatusMetadataTrackerList.stream().filter(entry -> 
entry.isMetadataTable()).map(entry -> 
entry.getWriteStat()).collect(Collectors.toList());
+
+    if (isMetadataTable) {
+      ValidationUtils.checkArgument(dataTableWriteStats.isEmpty(), "For 
Metadata table,"
+          + "we do not expect any writes having WriteStatus referring to data 
table. ");

Review Comment:
   Nitpick: trim trailing space



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write 
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+  // Cached HoodieTableMetadataWriter for each action in data table. This will 
be cleaned up when action is completed or when write client is closed.
+  protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap = 
new ConcurrentHashMap<>();

Review Comment:
   can this be `private final`? Is it expected to undergo concurrent 
operations? if not, let's use a hashmap



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write 
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+  // Cached HoodieTableMetadataWriter for each action in data table. This will 
be cleaned up when action is completed or when write client is closed.
+  protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap = 
new ConcurrentHashMap<>();
+
+  /**
+   * Called by data table write client and data table table service client to 
perform streaming write to metadata table.
+   * @param table {@link HoodieTable} instance for data table of interest.
+   * @param dataTableWriteStatuses {@link WriteStatus} from data table writes.
+   * @param instantTime instant time of interest.

Review Comment:
   is this the instant time in the metadata table or data table?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+  private HoodieMetadataWriteWrapper metadataWriterWrapper = new 
HoodieMetadataWriteWrapper();

Review Comment:
   make this `final`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -270,7 +269,7 @@ protected void init(String fileId, 
Iterator<HoodieRecord<T>> newRecordsItr) {
       // update the new location of the record, so we know where to find it 
next
       if (needsUpdateLocation()) {
         record.unseal();
-        record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+        record.setNewLocation(newRecordLocation);

Review Comment:
   the `fileId` input to the method is unused, should it be removed now?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -70,6 +70,7 @@ public class SparkRDDWriteClient<T> extends
     BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkRDDWriteClient.class);
+  private HoodieMetadataWriteWrapper metadataWriteWrapper = new 
HoodieMetadataWriteWrapper();

Review Comment:
   make this `final` as well?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java:
##########
@@ -36,17 +39,58 @@
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
+
+  private HoodieMetadataWriteWrapper metadataWriterWrapper = new 
HoodieMetadataWriteWrapper();
   protected SparkRDDTableServiceClient(HoodieEngineContext context,
                                        HoodieWriteConfig clientConfig,
                                        Option<EmbeddedTimelineService> 
timelineService) {
     super(context, clientConfig, timelineService);
   }
 
   @Override
-  protected List<HoodieWriteStat> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata) {
-    return writeMetadata.getWriteStatuses().map(writeStatus -> 
writeStatus.getStat()).collect();
+  protected Pair<List<HoodieWriteStat>, List<HoodieWriteStat>> 
triggerWritesAndFetchWriteStats(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
tableServiceWriteMetadata) {
+    // Triggering the dag for writes.
+    // If streaming writes are enabled, writes to both data table and metadata 
table gets triggered at this juncture.
+    // If not, writes to data table gets triggered here.
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so each 
object will be larger.
+    // So, here we are dropping all additional stats and error records to 
retain only the required information and prevent collecting large objects on 
the driver.
+    List<SparkRDDWriteClient.SlimWriteStats> writeStatusMetadataTrackerList = 
tableServiceWriteMetadata.getWriteStatuses()
+        .map(writeStatus -> new 
SparkRDDWriteClient.SlimWriteStats(writeStatus.isMetadataTable(), 
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+            writeStatus.getStat())).collect();
+
+    List<HoodieWriteStat> dataTableWriteStats = 
writeStatusMetadataTrackerList.stream().filter(entry -> 
!entry.isMetadataTable()).map(entry -> 
entry.getWriteStat()).collect(Collectors.toList());
+    List<HoodieWriteStat> mdtWriteStats = 
writeStatusMetadataTrackerList.stream().filter(entry -> 
entry.isMetadataTable()).map(entry -> 
entry.getWriteStat()).collect(Collectors.toList());
+
+    if (isMetadataTable) {
+      ValidationUtils.checkArgument(dataTableWriteStats.isEmpty(), "For 
Metadata table,"
+          + "we do not expect any writes having WriteStatus referring to data 
table. ");
+      dataTableWriteStats.clear();
+      dataTableWriteStats.addAll(mdtWriteStats);
+      mdtWriteStats.clear();
+    }
+    return Pair.of(dataTableWriteStats, mdtWriteStats);
+  }
+
+  @Override
+  protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
processWriteMetadata(HoodieTable table, 
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, String instantTime) 
{
+    if (!isMetadataTable && config.isMetadataTableEnabled() && 
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion()))
 {

Review Comment:
   This condition is the same as line 87 and I think there are similar checks 
in the SparkRDDWriteClient, is there a common place to put this logic to keep 
it consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to