garyli1019 commented on a change in pull request #1486: [HUDI-759] Integrate 
checkpoint privoder with delta streamer
URL: https://github.com/apache/incubator-hudi/pull/1486#discussion_r406555030
 
 

 ##########
 File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 ##########
 @@ -90,35 +90,33 @@
 
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws 
IOException {
     this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
-        getDefaultHiveConf(jssc.hadoopConfiguration()));
+        jssc.hadoopConfiguration(), null);
   }
 
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, 
TypedProperties props) throws IOException {
     this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
-        getDefaultHiveConf(jssc.hadoopConfiguration()), props);
+        jssc.hadoopConfiguration(), props);
   }
 
-  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf,
-                             TypedProperties properties) throws IOException {
-    this.cfg = cfg;
-    this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, 
properties);
+  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration hiveConf) throws IOException {
+    this(cfg, jssc, fs, hiveConf, null);
   }
 
-  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf) throws IOException {
+  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration hiveConf,
+                             TypedProperties properties) throws IOException {
+    if (cfg.initialCheckpointProvider != null && cfg.bootstrapFromPath != null 
&& cfg.checkpoint == null) {
+      InitialCheckPointProvider checkPointProvider =
+          
UtilHelpers.createInitialCheckpointProvider(cfg.initialCheckpointProvider, new 
Path(cfg.bootstrapFromPath), fs);
+      cfg.checkpoint = checkPointProvider.getCheckpoint();
 
 Review comment:
   I think this depends on how we design the migration flow for the user.
   What I did myself is I use Spark datasource to do a bulkInsert to convert 
all the plain parquet files to Hudi format, then the second job I'd like to use 
delta streamer to read from Kafka. So this initialCheckpointProvider should be 
the first delta streamer job when switching sources. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to