nsivabalan commented on a change in pull request #2438:
URL: https://github.com/apache/hudi/pull/2438#discussion_r641925742



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
     @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
     public String checkpoint = null;
 
+    /**
+     * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value

Review comment:
       this format is specific to kafka. lets call it out. other sources could 
have checkpoint differently. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
     @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
     public String checkpoint = null;
 
+    /**
+     * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value
+     * 2. timestamp: kafka offset timestamp
+     * example
+     * 1. hudi_topic,0:100,1:101,2:201
+     * 2. 1621947081
+     */
+    @Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, 
divided into timestamp or string offset")
+    public String checkpointType = "string";

Review comment:
       I am contemplating between "string" or "default" or "regular" to be set 
as default checkpoint type. @n3nash : any thoughts. We are looking to introduce 
a new config called checkpoint type. by default we need to set some value. this 
patch adds a new checkpoint type "timestamp" for kafka source. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -38,6 +38,7 @@
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;

Review comment:
       can we revert unintended changes in this file. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -326,6 +328,16 @@ private boolean onDeltaSyncShutdown(boolean error) {
     @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
     public String checkpoint = null;
 
+    /**
+     * 1. string: topicName,partition number 0:offset value,partition number 
1:offset value
+     * 2. timestamp: kafka offset timestamp
+     * example
+     * 1. hudi_topic,0:100,1:101,2:201
+     * 2. 1621947081
+     */
+    @Parameter(names = {"--checkpoint-type"}, description = "Checkpoint type, 
divided into timestamp or string offset")
+    public String checkpointType = "string";

Review comment:
       sorry, why do we have this config in two places. We have it defined as 
top level config in HoodieDeltaStreamer.Config. But in KafkaOffsetGen, I see 
you are accessing it as "hoodie.deltastreamer.source.kafka.checkpoint.type". 
May be we should rely on this config param and remove it from top level since 
this is applicable just to kafka for now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to