lokeshj1703 commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2112350988


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -144,6 +157,63 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  protected HoodieData<HoodieRecord> 
repartitionByMDTFileSlice(HoodieData<HoodieRecord> records, int numPartitions) {

Review Comment:
   Removed it. API was not used.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java:
##########
@@ -93,6 +93,11 @@ public static HoodieTableMetadataWriter 
create(StorageConfiguration<?> conf, Hoo
     super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
   }
 
+  @Override
+  MetadataIndexGenerator getMetadataIndexGenerator() {
+    return null;

Review Comment:
   Addressed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -144,6 +157,63 @@ protected JavaRDD<HoodieRecord> 
convertHoodieDataToEngineSpecificData(HoodieData
     return HoodieJavaRDD.getJavaRDD(records);
   }
 
+  @Override
+  protected HoodieData<WriteStatus> 
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+    return HoodieJavaRDD.of(records);
+  }
+
+  @Override
+  protected HoodieData<HoodieRecord> 
repartitionByMDTFileSlice(HoodieData<HoodieRecord> records, int numPartitions) {
+    return HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(records).mapToPair(new 
PairFunction<HoodieRecord, Pair<String, String>, HoodieRecord>() {
+
+      @Override
+      public Tuple2<Pair<String, String>, HoodieRecord> call(HoodieRecord 
record) throws Exception {
+        return new Tuple2<>(Pair.of(record.getPartitionPath(), 
record.getCurrentLocation().getFileId()), record);
+      }
+    }).partitionBy(new Partitioner() {
+      @Override
+      public int numPartitions() {
+        return numPartitions;
+      }
+
+      @Override
+      public int getPartition(Object key) {
+        Pair<String, String> entry = (Pair<String, String>) key;
+        return 
mapPartitionKeyToSparkPartition(entry.getKey().concat(entry.getValue()), 
numPartitions);
+      }
+    }).values());
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> 
streamWriteToMetadataTable(Pair<List<Pair<String, String>>, 
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime, boolean 
initialCall) {
+    JavaRDD<HoodieRecord> mdtRecords = 
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+
+    if (initialCall) {

Review Comment:
   Removed it. preWrite is a NoOp for Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to