nsivabalan commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2145329234


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write 
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+  // Cached HoodieTableMetadataWriter for each action in data table. This will 
be cleaned up when action is completed or when write client is closed.
+  protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap = 
new ConcurrentHashMap<>();
+
+  /**
+   * Called by data table write client and data table table service client to 
perform streaming write to metadata table.
+   * @param table {@link HoodieTable} instance for data table of interest.
+   * @param dataTableWriteStatuses {@link WriteStatus} from data table writes.
+   * @param instantTime instant time of interest.
+   * @return {@link HoodieData} of {@link WriteStatus} referring to both data 
table writes and partial metadata table writes.
+   */
+  public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, 
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) {
+    Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriter(instantTime, table);
+    if (metadataWriterOpt.isPresent()) {
+      return streamWriteToMetadataTable(dataTableWriteStatuses, 
metadataWriterOpt, table, instantTime);
+    } else {
+      throw new HoodieMetadataException("Cannot instantiate metadata writer 
for the table of interest " + table.getMetaClient().getBasePath());
+    }
+  }
+
+  // to be invoked by write client or table service client.
+
+  /**
+   * To be invoked by write client or table service client to write to 
metadata table.
+   * When streaming writes are enabled, writes to left over metadata 
partitions which was not covered in {@link 
#streamWriteToMetadataTable(HoodieTable, HoodieData, String)}
+   * will be invoked here.
+   * If not, writes take the legacy way of writing to metadata table.
+   * @param table {@link HoodieTable} instance for data table of interest.
+   * @param instantTime instant time of interest.
+   * @param metadata {@link HoodieCommitMetadata} of interest.
+   * @param metadataWriteStatsSoFar List of {@link HoodieWriteStat}s referring 
to partial writes completed in metadata table with streaming writes.
+   */
+  public void writeToMetadataTable(HoodieTable table, String instantTime,
+                                   HoodieCommitMetadata metadata, 
List<HoodieWriteStat> metadataWriteStatsSoFar) {
+    writeToMetadataTable(table, instantTime, metadataWriteStatsSoFar, 
metadata);
+  }

Review Comment:
   this api is invoked from WRiteClient and TableServiceClient. Would like to 
keep it generic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to