danny0405 commented on a change in pull request #3348:
URL: https://github.com/apache/hudi/pull/3348#discussion_r677171295



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
##########
@@ -111,53 +101,7 @@ public static void main(String[] args) throws Exception {
         dataStream = transformer.get().apply(dataStream);
       }
     }
-
-    DataStream<HoodieRecord> dataStream2 = dataStream.map(new 
RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
-
-    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      dataStream2 = dataStream2.rebalance()
-          .transform(
-              "index_bootstrap",
-              TypeInformation.of(HoodieRecord.class),
-              new ProcessOperator<>(new BootstrapFunction<>(conf)))
-          
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
-          .uid("uid_index_bootstrap_" + 
conf.getString(FlinkOptions.TABLE_NAME));
-    }
-
-    DataStream<Object> pipeline = dataStream2
-        // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
-        .keyBy(HoodieRecord::getRecordKey)
-        .transform(
-            "bucket_assigner",
-            TypeInformation.of(HoodieRecord.class),
-            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))

Review comment:
       The pipeline in `HoodieTableSink.java` is the only real thing and we can 
just discard the logic here.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -325,4 +343,69 @@ public static long instantTimeDiffSeconds(String 
newInstantTime, String oldInsta
     }
   }
 
+  public static DataStreamSink<?> createHoodieDataStreamSink(RowType rowType, 
Configuration conf, int parallelism,
+                                                             
StreamWriteOperatorFactory<HoodieRecord> operatorFactory, DataStream<RowData> 
dataStream) {
+    DataStream<HoodieRecord> dataStream2 = dataStream.map(new 
RowDataToHoodieFunction<>(rowType, conf),
+            TypeInformation.of(HoodieRecord.class));
+
+    if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      dataStream2 = getIndexBootstrapStream(conf, parallelism, dataStream2);

Review comment:
       Move these methods into a separate class `HoodieWritePipelines` and 
maybe you can create a fluent API to build all kinds of sub-pipelines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to