xushiyan commented on code in PR #18022:
URL: https://github.com/apache/hudi/pull/18022#discussion_r2744151601


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);

Review Comment:
   legacy source takes in partition pruner. does the new source handles 
partition pruning in some way?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
+      InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+      SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream 
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+          .uid(Pipelines.opUID("split_monitor", conf))
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);

Review Comment:
   does the new source support setting file distribution strategy ?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
+      InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+      SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream 
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+          .uid(Pipelines.opUID("split_monitor", conf))
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
+
+      SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
+          .transform("split_reader", typeInfo, factory)
+          .uid(Pipelines.opUID("split_reader", conf))
+          .setParallelism(conf.get(FlinkOptions.READ_TASKS));
+      return new DataStreamSource<>(streamReadSource);
+    } else {
+      InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
+      DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
+      return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+    }
+  }
+
+  /**
+   * Creates a new Flink FLIP-27 HoodieSource for reading data from the Hudi 
table.
+   *
+   * @return the configured HoodieSource instance

Review Comment:
   name fixes: "Hoodie", and FLIP-27 ref



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -221,32 +231,117 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
+      InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+      SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream 
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+          .uid(Pipelines.opUID("split_monitor", conf))
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
+
+      SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
+          .transform("split_reader", typeInfo, factory)
+          .uid(Pipelines.opUID("split_reader", conf))
+          .setParallelism(conf.get(FlinkOptions.READ_TASKS));
+      return new DataStreamSource<>(streamReadSource);
+    } else {
+      InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
+      DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
+      return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+    }
+  }
+
+  /**
+   * Creates a new Flink FLIP-27 HoodieSource for reading data from the Hudi 
table.
+   *
+   * @return the configured HoodieSource instance
+   */
+  private HoodieSource<RowData> createHoodieSource() {
+    ValidationUtils.checkState(metaClient != null, "MetaClient must be 
initialized before creating HoodieSource");
+    ValidationUtils.checkState(hadoopConf != null, "Hadoop configuration must 
be initialized");
+    ValidationUtils.checkState(conf != null, "Configuration must be 
initialized");
+
+    HoodieSchema tableSchema =  !tableDataExists() ? inferSchemaFromDdl() : 
getTableSchema();
+    final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+    final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
+
+    HoodieScanContext context = createHoodieScanContext(rowType);
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+    HoodieFlinkEngineContext flinkEngineContext = new 
HoodieFlinkEngineContext(hadoopConf.unwrap());
+    HoodieFlinkTable<RowData> flinkTable = 
HoodieFlinkTable.create(writeConfig, flinkEngineContext);
+    FlinkReaderContextFactory readerContextFactory = new 
FlinkReaderContextFactory(metaClient);
+    HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
+        flinkTable,
+        readerContextFactory.getContext(),
+        conf,
+        tableSchema,
+        HoodieSchemaConverter.convertToSchema(requiredRowType),
+        conf.get(FlinkOptions.MERGE_TYPE),
+        Option.empty());
+    return new HoodieSource<>(context, splitReaderFunction, new 
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
+  }
+
+  /**
+   * Creates a HoodieScanContext for configuring the scan operation.
+   *
+   * @param rowType the row type for the scan
+   * @return the configured HoodieScanContext instance
+   */
+  private HoodieScanContext createHoodieScanContext(RowType rowType) {
+    return new HoodieScanContext
+        .Builder()
+        .conf(conf)
+        .path(new Path(path.toUri().getPath()))
+        .cdcEnabled(conf.get(FlinkOptions.CDC_ENABLED))
+        .rowType(rowType)
+        .startInstant(conf.get(FlinkOptions.READ_START_COMMIT))
+        .endInstant(conf.get(FlinkOptions.READ_END_COMMIT))
+        .skipCompaction(conf.get(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+        .skipClustering(conf.get(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
+        
.skipInsertOverwrite(conf.get(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
+        
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
+        .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
+        .build();

Review Comment:
   don't see maxPendingSplits set here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to