the-other-tim-brown commented on code in PR #13295:
URL: https://github.com/apache/hudi/pull/13295#discussion_r2103652998


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -148,6 +152,24 @@ public JavaRDD<WriteStatus> 
upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
     return postWrite(resultRDD, instantTime, table);
   }
 
+  /**
+   * Used for streaming writes to metadata table.
+   */
+  @Override
+  public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, String instantTime, Option<List<Pair<String, String>>> 
partitionFileIdPairsOpt) {
+    ValidationUtils.checkArgument(isMetadataTable, "This version of upsert 
prepped can only be invoked for metadata table");
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
+        initTable(WriteOperationType.UPSERT_PREPPED, 
Option.ofNullable(instantTime));
+    table.validateUpsertSchema();
+    if (!(partitionFileIdPairsOpt.isPresent() && 
partitionFileIdPairsOpt.get().get(0).getKey().equals(FILES.getPartitionPath())))
 {

Review Comment:
   Should you check if any of the inputs contain the `FILES` partition?
   
   You can also rewrite this with `map` for a more functional style:
   ```
   partitionFileIdPairsOpt.map(partitionFileIdPairs -> 
partitionFileIdPairs.get(0).getKey().equals(FILES.getPartitionPath())).orElse(false))```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -148,6 +152,24 @@ public JavaRDD<WriteStatus> 
upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
     return postWrite(resultRDD, instantTime, table);
   }
 
+  /**
+   * Used for streaming writes to metadata table.

Review Comment:
   Nitpick but add some more context on how this is used and why it is only for 
MDT



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);
+
+  private static final HashMap<String, WorkloadStat> EMPTY_MAP = new 
HashMap<>();
+  private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new 
WorkloadStat();
+
+  private final List<Pair<String, String>> mdtPartitionPathFileGroupIdList;
+
+  public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table, String instantTime,
+                                                      
HoodieData<HoodieRecord<T>> preppedRecords, List<Pair<String, String>> 
mdtPartitionPathFileGroupIdList) {
+    super(context, config, table, instantTime, preppedRecords);
+    this.mdtPartitionPathFileGroupIdList = mdtPartitionPathFileGroupIdList;
+  }
+
+  @Override
+  protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> 
inputRDD) {
+    return inputRDD.getStorageLevel() == StorageLevel.NONE();
+  }
+
+  @Override
+  protected WorkloadProfile 
prepareWorkloadProfileAndSaveToInflight(HoodieData<HoodieRecord<T>> 
inputRecordsWithClusteringUpdate) {
+    // create workload profile only when we are writing to FILES partition in 
Metadata table.
+    WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(EMPTY_MAP, 
PLACEHOLDER_GLOBAL_STAT));
+    // with streaming writes support, we might write to metadata table 
multiple times for the same instant times.
+    // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), 
writeClient.upsert(batch2, t1), writeClient.commit(t1, ...)
+    // So, here we are generating inflight file only in the last known writes, 
which we know will only have FILES partition.
+    if (mdtPartitionPathFileGroupIdList.size() == 1 && 
mdtPartitionPathFileGroupIdList.get(0).getKey().equals(FILES.getPartitionPath()))
 {

Review Comment:
   Can you just check if there is already an `inflight` instant on the timeline 
here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);
+
+  private static final HashMap<String, WorkloadStat> EMPTY_MAP = new 
HashMap<>();
+  private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new 
WorkloadStat();
+
+  private final List<Pair<String, String>> mdtPartitionPathFileGroupIdList;
+
+  public SparkMetadataTableUpsertCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table, String instantTime,
+                                                      
HoodieData<HoodieRecord<T>> preppedRecords, List<Pair<String, String>> 
mdtPartitionPathFileGroupIdList) {
+    super(context, config, table, instantTime, preppedRecords);
+    this.mdtPartitionPathFileGroupIdList = mdtPartitionPathFileGroupIdList;
+  }
+
+  @Override
+  protected boolean shouldPersistInputRecords(JavaRDD<HoodieRecord<T>> 
inputRDD) {
+    return inputRDD.getStorageLevel() == StorageLevel.NONE();
+  }
+
+  @Override
+  protected WorkloadProfile 
prepareWorkloadProfileAndSaveToInflight(HoodieData<HoodieRecord<T>> 
inputRecordsWithClusteringUpdate) {
+    // create workload profile only when we are writing to FILES partition in 
Metadata table.
+    WorkloadProfile workloadProfile = new WorkloadProfile(Pair.of(EMPTY_MAP, 
PLACEHOLDER_GLOBAL_STAT));
+    // with streaming writes support, we might write to metadata table 
multiple times for the same instant times.
+    // ie. writeClient.startCommit(t1), writeClient.upsert(batch1, t1), 
writeClient.upsert(batch2, t1), writeClient.commit(t1, ...)
+    // So, here we are generating inflight file only in the last known writes, 
which we know will only have FILES partition.
+    if (mdtPartitionPathFileGroupIdList.size() == 1 && 
mdtPartitionPathFileGroupIdList.get(0).getKey().equals(FILES.getPartitionPath()))
 {
+      saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
+    }
+    return workloadProfile;
+  }
+
+  @Override
+  protected Partitioner getPartitioner(WorkloadProfile profile) {
+    List<BucketInfo> bucketInfoList = new ArrayList<>();
+    Map<String, Integer> fileIdToSparkPartitionIndexMap = new HashMap<>();
+    int counter = 0;
+    while (counter < mdtPartitionPathFileGroupIdList.size()) {
+      Pair<String, String> partitionPathFileIdPair = 
mdtPartitionPathFileGroupIdList.get(counter);
+      fileIdToSparkPartitionIndexMap.put(partitionPathFileIdPair.getValue(), 
counter);
+      bucketInfoList.add(new BucketInfo(BucketType.UPDATE, 
partitionPathFileIdPair.getValue(), partitionPathFileIdPair.getKey()));
+      counter++;
+    }
+    return new SparkMetadataTableUpsertPartitioner(bucketInfoList, 
fileIdToSparkPartitionIndexMap);

Review Comment:
   Nitpick: add `<>` since the partitioner has a type arg



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkMetadataTableUpsertPartitioner.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestSparkMetadataTableUpsertPartitioner extends 
HoodieClientTestBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestSparkMetadataTableUpsertPartitioner.class);

Review Comment:
   Log is unused and can be removed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);
+
+  private static final HashMap<String, WorkloadStat> EMPTY_MAP = new 
HashMap<>();
+  private static final WorkloadStat PLACEHOLDER_GLOBAL_STAT = new 
WorkloadStat();

Review Comment:
   Just double checking the `WorkloadStat` will not be modified by any other 
code since this is shared between commits and threads in the same JVM.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -126,6 +128,14 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
upsertPrepped(HoodieEngineCo
     return new 
SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) 
context, config, this, instantTime, preppedRecords).execute();
   }
 
+  @Override
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> 
upsertPrepped(HoodieEngineContext context, String instantTime,

Review Comment:
   Since some of this functionality is only for MDT, would it be better to have 
an MDT version of `HoodieSparkMergeOnReadTable`?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -148,6 +152,24 @@ public JavaRDD<WriteStatus> 
upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
     return postWrite(resultRDD, instantTime, table);
   }
 
+  /**
+   * Used for streaming writes to metadata table.
+   */
+  @Override
+  public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, String instantTime, Option<List<Pair<String, String>>> 
partitionFileIdPairsOpt) {
+    ValidationUtils.checkArgument(isMetadataTable, "This version of upsert 
prepped can only be invoked for metadata table");
+    HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
+        initTable(WriteOperationType.UPSERT_PREPPED, 
Option.ofNullable(instantTime));
+    table.validateUpsertSchema();
+    if (!(partitionFileIdPairsOpt.isPresent() && 
partitionFileIdPairsOpt.get().get(0).getKey().equals(FILES.getPartitionPath())))
 {
+      // we do not want to call prewrite more than once for the same instant 
writing to metadata table twice.

Review Comment:
   Is there a better way to do this? I am guessing we only ever want to call 
pre-write once regardless of the writer so maybe we can have some way to track 
this was already called for the instant within the write client.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);

Review Comment:
   This is unused and can be removed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMetadataTableUpsertCommitActionExecutor.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.WorkloadStat;
+import 
org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.storage.StorageLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Upsert commit action executor for Metadata table.
+ *
+ * @param <T>
+ */
+public class SparkMetadataTableUpsertCommitActionExecutor<T> extends 
SparkUpsertPreppedDeltaCommitActionExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMetadataTableUpsertCommitActionExecutor.class);
+
+  private static final HashMap<String, WorkloadStat> EMPTY_MAP = new 
HashMap<>();

Review Comment:
   Can you use `Collections.emptyMap()` instead?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestSparkMetadataTableUpsertPartitioner.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestSparkMetadataTableUpsertPartitioner extends 
HoodieClientTestBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestSparkMetadataTableUpsertPartitioner.class);
+  private static final Random RANDOM = new Random(0xDEED);
+  private static final String RANDOM_INSTANT_TIME = "100000";
+
+  @Test
+  public void testUpsertPartitioner() throws Exception {

Review Comment:
   nitpick: `throws Exception` is not required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to