lokeshj1703 commented on code in PR #13286:
URL: https://github.com/apache/hudi/pull/13286#discussion_r2112350988
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -144,6 +157,63 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ protected HoodieData<HoodieRecord>
repartitionByMDTFileSlice(HoodieData<HoodieRecord> records, int numPartitions) {
Review Comment:
Removed it. API was not used.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java:
##########
@@ -93,6 +93,11 @@ public static HoodieTableMetadataWriter
create(StorageConfiguration<?> conf, Hoo
super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext,
inflightInstantTimestamp);
}
+ @Override
+ MetadataIndexGenerator getMetadataIndexGenerator() {
+ return null;
Review Comment:
Addressed
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -144,6 +157,63 @@ protected JavaRDD<HoodieRecord>
convertHoodieDataToEngineSpecificData(HoodieData
return HoodieJavaRDD.getJavaRDD(records);
}
+ @Override
+ protected HoodieData<WriteStatus>
convertEngineSpecificDataToHoodieData(JavaRDD<WriteStatus> records) {
+ return HoodieJavaRDD.of(records);
+ }
+
+ @Override
+ protected HoodieData<HoodieRecord>
repartitionByMDTFileSlice(HoodieData<HoodieRecord> records, int numPartitions) {
+ return HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(records).mapToPair(new
PairFunction<HoodieRecord, Pair<String, String>, HoodieRecord>() {
+
+ @Override
+ public Tuple2<Pair<String, String>, HoodieRecord> call(HoodieRecord
record) throws Exception {
+ return new Tuple2<>(Pair.of(record.getPartitionPath(),
record.getCurrentLocation().getFileId()), record);
+ }
+ }).partitionBy(new Partitioner() {
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ Pair<String, String> entry = (Pair<String, String>) key;
+ return
mapPartitionKeyToSparkPartition(entry.getKey().concat(entry.getValue()),
numPartitions);
+ }
+ }).values());
+ }
+
+ @Override
+ public JavaRDD<WriteStatus>
streamWriteToMetadataTable(Pair<List<Pair<String, String>>,
HoodieData<HoodieRecord>> mdtRecordsHoodieData, String instantTime, boolean
initialCall) {
+ JavaRDD<HoodieRecord> mdtRecords =
HoodieJavaRDD.getJavaRDD(mdtRecordsHoodieData.getValue());
+
+ if (initialCall) {
Review Comment:
Removed it. preWrite is a NoOp for Spark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]