danny0405 commented on a change in pull request #3348:
URL: https://github.com/apache/hudi/pull/3348#discussion_r677171295
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
##########
@@ -111,53 +101,7 @@ public static void main(String[] args) throws Exception {
dataStream = transformer.get().apply(dataStream);
}
}
-
- DataStream<HoodieRecord> dataStream2 = dataStream.map(new
RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
-
- if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- dataStream2 = dataStream2.rebalance()
- .transform(
- "index_bootstrap",
- TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)))
-
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
- .uid("uid_index_bootstrap_" +
conf.getString(FlinkOptions.TABLE_NAME));
- }
-
- DataStream<Object> pipeline = dataStream2
- // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
- .keyBy(HoodieRecord::getRecordKey)
- .transform(
- "bucket_assigner",
- TypeInformation.of(HoodieRecord.class),
- new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
Review comment:
The pipeline in `HoodieTableSink.java` is the only real thing and we can
just discard the logic here.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -325,4 +343,69 @@ public static long instantTimeDiffSeconds(String
newInstantTime, String oldInsta
}
}
+ public static DataStreamSink<?> createHoodieDataStreamSink(RowType rowType,
Configuration conf, int parallelism,
+
StreamWriteOperatorFactory<HoodieRecord> operatorFactory, DataStream<RowData>
dataStream) {
+ DataStream<HoodieRecord> dataStream2 = dataStream.map(new
RowDataToHoodieFunction<>(rowType, conf),
+ TypeInformation.of(HoodieRecord.class));
+
+ if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ dataStream2 = getIndexBootstrapStream(conf, parallelism, dataStream2);
Review comment:
Move these methods into a separate class `HoodieWritePipelines` and
maybe you can create a fluent API to build all kinds of sub-pipelines.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]