nsivabalan commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1119363467


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -620,6 +620,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to enable commit conflict checking or not 
during early "
           + "conflict detection.");
 
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID 
= ConfigProperty
+      .key("hoodie.deltastreamer.multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Define Unique Id for source to be used in commit 
checkpoint");

Review Comment:
   lets add documentation that for a single deltastreamer, users don't need to 
set this. and clarify what is meant by multiwriter source. its not general 
multi-writer that users generally relate to. i.e. one writer using 
deltastreamer and another writer using spark-datasource.
   this is specifically targetting use-cases where multiple wirters each using 
deltastreamer to write to same target table. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -1061,4 +1105,8 @@ private Set<String> getPartitionColumns(KeyGenerator 
keyGenerator, TypedProperti
     String partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
     return 
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
   }
+
+  public String getId() {

Review Comment:
   minor. getMultiwriterIdentifier



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -149,6 +154,7 @@ public class DeltaSync implements Serializable, Closeable {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
+  private static final ObjectMapper OM = new ObjectMapper();

Review Comment:
   can we expand OM to "OBJECT_MAPPER"



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1911,6 +1920,80 @@ public void testKafkaTimestampType() throws Exception {
     TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, 
sqlContext);
   }
 
+  @Test
+  public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
+    // prep parquet source
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
+    int parquetRecords = 100;
+    HoodieTestDataGenerator dataGenerator = 
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true,
+        HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+    prepareParquetDFSSource(true, true, "source_uber.avsc", 
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "driver");
+
+    // delta streamer w/ parquet source
+    String tableBasePath = basePath + "/test_multi_checkpoint" + testNum;
+    HoodieDeltaStreamer.Config parquetCfg = 
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
+        Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
+        true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
+    parquetCfg.configs = new ArrayList<>();
+    parquetCfg.configs.add(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key() + 
"=parquet");
+    //parquetCfg.continuousMode = false;
+    HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(100, tableBasePath, sqlContext);
+
+    // prep json kafka source
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(20, true, topicName);
+    Map<String, String> kafkaExtraProps = new HashMap<>();
+    kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName, kafkaExtraProps);
+    // delta streamer w/ json kafka source
+    HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, Integer.MAX_VALUE, false, null, null, "timestamp", null), 
jsc);
+    kafkaDs.sync();
+    int totalExpectedRecords = parquetRecords + 20;
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, 
sqlContext);
+    //parquet again
+    prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
+        dataGenerator, "001");
+
+    //parquetCfg.continuousMode = false;
+    parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(parquetRecords * 2 + 20, tableBasePath, 
sqlContext);
+
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(jsc.hadoopConfiguration(), tableBasePath);
+    List<HoodieInstant> instants = 
metaClient.getCommitsTimeline().getInstants();
+
+    ObjectMapper om = new ObjectMapper();
+    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+        
.fromBytes(metaClient.getCommitsTimeline().getInstantDetails(instants.get(0)).get(),
 HoodieCommitMetadata.class);
+    Map<String,String>  m = 
om.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY), Map.class);

Review Comment:
   lets use good naming even in tests. "m" for instance. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1911,6 +1920,80 @@ public void testKafkaTimestampType() throws Exception {
     TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, 
sqlContext);
   }
 
+  @Test
+  public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
+    // prep parquet source
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
+    int parquetRecords = 100;
+    HoodieTestDataGenerator dataGenerator = 
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true,
+        HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+    prepareParquetDFSSource(true, true, "source_uber.avsc", 
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "driver");
+
+    // delta streamer w/ parquet source
+    String tableBasePath = basePath + "/test_multi_checkpoint" + testNum;
+    HoodieDeltaStreamer.Config parquetCfg = 
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
+        Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
+        true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
+    parquetCfg.configs = new ArrayList<>();
+    parquetCfg.configs.add(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key() + 
"=parquet");
+    //parquetCfg.continuousMode = false;
+    HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(100, tableBasePath, sqlContext);
+
+    // prep json kafka source
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(20, true, topicName);
+    Map<String, String> kafkaExtraProps = new HashMap<>();
+    kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName, kafkaExtraProps);
+    // delta streamer w/ json kafka source
+    HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, Integer.MAX_VALUE, false, null, null, "timestamp", null), 
jsc);
+    kafkaDs.sync();
+    int totalExpectedRecords = parquetRecords + 20;
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, 
sqlContext);
+    //parquet again
+    prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
+        dataGenerator, "001");
+
+    //parquetCfg.continuousMode = false;

Review Comment:
   remove extraneous lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to