YuweiXiao commented on code in PR #6737:
URL: https://github.com/apache/hudi/pull/6737#discussion_r990982599
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java:
##########
@@ -18,23 +18,40 @@
package org.apache.hudi.sink.bucket;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
/**
* Operator for {@link BucketStreamWriteFunction}.
- *
- * @param <I> The input type
*/
-public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+public class BucketStreamWriteOperator {
- public BucketStreamWriteOperator(Configuration conf) {
- super(new BucketStreamWriteFunction<>(conf));
+ public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
+ String bucketEngineType = conf.get(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE);
+ if
(bucketEngineType.equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.SIMPLE.name()))
{
+ return WriteOperatorFactory.instance(conf, new
SimpleBucketStreamWriteOperator<>(conf));
+ } else if
(bucketEngineType.equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name()))
{
+ return WriteOperatorFactory.instance(conf, new
ConsistentBucketStreamWriteOperator<>(conf));
+ } else {
+ throw new HoodieException("Unknown bucket index engine type: " +
bucketEngineType);
+ }
}
- public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
- return WriteOperatorFactory.instance(conf, new
BucketStreamWriteOperator<>(conf));
+ public static class SimpleBucketStreamWriteOperator<I> extends
AbstractWriteOperator<I> {
+ public SimpleBucketStreamWriteOperator(Configuration conf) {
+ super(new BucketStreamWriteFunction<>(conf));
+ }
}
+
+ public static class ConsistentBucketStreamWriteOperator<I> extends
AbstractWriteOperator<I> {
+ public ConsistentBucketStreamWriteOperator(Configuration conf) {
Review Comment:
Thought putting them together could have a clear code layout. Will pull them
out and have a factory class to construct operators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]