This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b5838791abb feat: enable new source integration in 
`HoodieTableSource` (#18022)
8b5838791abb is described below

commit 8b5838791abb0c799fe8380e8fa0b56ec86d957a
Author: Peter Huang <[email protected]>
AuthorDate: Thu Jan 29 20:42:40 2026 -0800

    feat: enable new source integration in `HoodieTableSource` (#18022)
---
 .../apache/hudi/configuration/FlinkOptions.java    |   6 +
 .../{ScanContext.java => HoodieScanContext.java}   |  12 +-
 .../java/org/apache/hudi/source/HoodieSource.java  |  15 +-
 .../HoodieContinuousSplitEnumerator.java           |   6 +-
 .../source/split/DefaultHoodieSplitDiscover.java   |   6 +-
 .../org/apache/hudi/table/HoodieTableSource.java   | 146 +++++++++++--
 ...ScanContext.java => TestHoodieScanContext.java} | 243 ++++++++++++++++++---
 .../org/apache/hudi/source/TestHoodieSource.java   |  18 +-
 .../TestHoodieContinuousSplitEnumerator.java       |   8 +-
 .../function/TestHoodieSplitReaderFunction.java    |   1 -
 .../split/TestDefaultHoodieSplitDiscover.java      |  22 +-
 .../apache/hudi/table/TestHoodieTableSource.java   | 231 ++++++++++++++++++++
 12 files changed, 618 insertions(+), 96 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index ca03296a8599..e4795950e51e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -440,6 +440,12 @@ public class FlinkOptions extends HoodieConfig {
           + "the avg read splits number per-second would be 
'read.splits.limit'/'read.streaming.check-interval', by "
           + "default no limit");
 
+  public static final ConfigOption<Boolean> READ_SOURCE_V2_ENABLED = 
ConfigOptions
+      .key("read.source-v2.enabled")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether to use Flink FLIP27 new source to consume data 
files.");
+
   @AdvancedConfig
   public static final ConfigOption<Boolean> READ_CDC_FROM_CHANGELOG = 
ConfigOptions
       .key("read.cdc.from.changelog")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
similarity index 95%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index 4e47f2150ccf..db2f25970698 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -29,10 +29,10 @@ import java.io.Serializable;
 import java.time.Duration;
 
 /**
- * Hudi source scan context.
+ * Hudi source scan context for finding completed commits for streaming and 
incremental read.
  */
 @Internal
-public class ScanContext implements Serializable {
+public class HoodieScanContext implements Serializable {
   private final Configuration conf;
   private final Path path;
   private final RowType rowType;
@@ -52,7 +52,7 @@ public class ScanContext implements Serializable {
   // is streaming mode
   private final boolean isStreaming;
 
-  public ScanContext(
+  public HoodieScanContext(
       Configuration conf,
       Path path,
       RowType rowType,
@@ -132,7 +132,7 @@ public class ScanContext implements Serializable {
   }
 
   /**
-   * Builder for {@link ScanContext}.
+   * Builder for {@link HoodieScanContext}.
    */
   public static class Builder {
     private Configuration conf;
@@ -208,8 +208,8 @@ public class ScanContext implements Serializable {
       return this;
     }
 
-    public ScanContext build() {
-      return new ScanContext(
+    public HoodieScanContext build() {
+      return new HoodieScanContext(
           conf,
           path,
           rowType,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index cd4db5173c6e..a5dc05f07c42 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -55,20 +55,25 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * Hoodie Source implementation.
- * @param <T> record Type
+ * Hudi Flink Source V2 implementation for Flink streaming and batch reads.
+ *
+ * <p>This source supports both bounded (batch) and unbounded (streaming) modes
+ * based on the configuration. It uses Flink's new Source API @see FLIP-27 to
+ * provide efficient reading of Hudi tables.
+ *
+ * @param <T> the record type to emit
  */
 public class HoodieSource<T> implements Source<T, HoodieSourceSplit, 
HoodieSplitEnumeratorState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSource.class);
 
-  private final ScanContext scanContext;
+  private final HoodieScanContext scanContext;
   private final SplitReaderFunction<T> readerFunction;
   private final SerializableComparator<HoodieSourceSplit> splitComparator;
   private final HoodieTableMetaClient metaClient;
   private final HoodieRecordEmitter<T> recordEmitter;
 
-  HoodieSource(
-      ScanContext scanContext,
+  public HoodieSource(
+      HoodieScanContext scanContext,
       SplitReaderFunction<T> readerFunction,
       SerializableComparator<HoodieSourceSplit> splitComparator,
       HoodieTableMetaClient metaClient,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
index b45acd60374c..5f4a3e1d7ded 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.source.enumerator;
 
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
 import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
 import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
 import org.apache.hudi.source.split.HoodieSourceSplit;
@@ -40,7 +40,7 @@ public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerat
   private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
   private final HoodieSplitProvider splitProvider;
   private final HoodieContinuousSplitDiscover splitDiscover;
-  private final ScanContext scanContext;
+  private final HoodieScanContext scanContext;
 
   /**
    * Instant for the last enumerated commit. Next incremental enumeration 
should be based off
@@ -52,7 +52,7 @@ public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerat
       SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
       HoodieSplitProvider splitProvider,
       HoodieContinuousSplitDiscover splitDiscover,
-      ScanContext scanContext,
+      HoodieScanContext scanContext,
       Option<HoodieSplitEnumeratorState> enumStateOpt) {
     super(enumeratorContext, splitProvider);
     this.enumeratorContext = enumeratorContext;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
index 718b78fe966e..1cd2d7586d93 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -20,7 +20,7 @@ package org.apache.hudi.source.split;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.source.IncrementalInputSplits;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,11 +32,11 @@ public class DefaultHoodieSplitDiscover implements 
HoodieContinuousSplitDiscover
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
 
   private final HoodieTableMetaClient metaClient;
-  private final ScanContext scanContext;
+  private final HoodieScanContext scanContext;
   private final IncrementalInputSplits incrementalInputSplits;
 
   public DefaultHoodieSplitDiscover(
-      ScanContext scanContext,
+      HoodieScanContext scanContext,
       HoodieTableMetaClient metaClient) {
     this.scanContext = scanContext;
     this.metaClient = metaClient;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 32fe9e5d9f8f..97fca224ee04 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
 import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
 import org.apache.hudi.adapter.TableFunctionProviderAdapter;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -35,7 +36,9 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsInference;
@@ -47,6 +50,8 @@ import org.apache.hudi.source.ExpressionEvaluators;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.source.FileIndex;
+import org.apache.hudi.source.HoodieScanContext;
+import org.apache.hudi.source.HoodieSource;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
@@ -54,6 +59,9 @@ import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionBucketIdFunc;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.source.reader.HoodieRecordEmitter;
+import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplitComparator;
 import 
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
 import 
org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
 import org.apache.hudi.source.rebalance.selector.StreamReadAppendKeySelector;
@@ -63,6 +71,7 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.cdc.CdcInputFormat;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
@@ -74,6 +83,7 @@ import org.apache.hudi.table.lookup.HoodieLookupTableReader;
 import org.apache.hudi.util.ChangelogModes;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.ExpressionUtils;
+import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.InputFormats;
 import org.apache.hudi.util.SerializableSchema;
@@ -81,11 +91,13 @@ import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -105,7 +117,6 @@ import 
org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
 
 import javax.annotation.Nullable;
 
@@ -141,7 +152,6 @@ public class HoodieTableSource implements
     SupportsReadingMetadata,
     Serializable {
   private static final long serialVersionUID = 1L;
-
   private static final long NO_LIMIT_CONSTANT = -1;
 
   private final StorageConfiguration<org.apache.hadoop.conf.Configuration> 
hadoopConf;
@@ -221,32 +231,122 @@ public class HoodieTableSource implements
         TypeInformation<RowData> typeInfo =
             (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
         OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
-        if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
-          StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
-              conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
-          InputFormat<RowData, ?> inputFormat = getInputFormat(true);
-          OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<MergeOnReadInputSplit> 
monitorOperatorStream = execEnv.addSource(monitoringFunction, 
getSourceOperatorName("split_monitor"))
-              .uid(Pipelines.opUID("split_monitor", conf))
-              .setParallelism(1)
-              .setMaxParallelism(1);
-
-          DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
-
-          SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
-              .transform("split_reader", typeInfo, factory)
-              .uid(Pipelines.opUID("split_reader", conf))
-              .setParallelism(conf.get(FlinkOptions.READ_TASKS));
-          return new DataStreamSource<>(streamReadSource);
+
+        if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+          return produceNewSourceDataStream(execEnv);
         } else {
-          InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
-          DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+          return produceLegacySourceDataStream(execEnv, typeInfo);
         }
       }
     };
   }
 
+  /**
+   * Produces a DataStream using the new FLIP-27 HoodieSource.
+   *
+   * @param execEnv the stream execution environment
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> 
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+    HoodieSource<RowData> hoodieSource = createHoodieSource();
+    DataStreamSource<RowData> source = execEnv.fromSource(
+        hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+
+
+
+    return 
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
 conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+  }
+
+  /**
+   * Produces a DataStream using the legacy source implementation.
+   *
+   * @param execEnv the stream execution environment
+   * @param typeInfo type information for RowData
+   * @return the configured DataStream
+   */
+  private DataStream<RowData> produceLegacySourceDataStream(
+      StreamExecutionEnvironment execEnv,
+      TypeInformation<RowData> typeInfo) {
+    if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+      StreamReadMonitoringFunction monitoringFunction = new 
StreamReadMonitoringFunction(
+          conf, FilePathUtils.toFlinkPath(path), tableRowType, 
maxCompactionMemoryInBytes, partitionPruner);
+      InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+      OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+      SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream 
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+          .uid(Pipelines.opUID("split_monitor", conf))
+          .setParallelism(1)
+          .setMaxParallelism(1);
+
+      DataStream<MergeOnReadInputSplit> sourceWithKey = 
addFileDistributionStrategy(monitorOperatorStream);
+
+      SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
+          .transform("split_reader", typeInfo, factory)
+          .uid(Pipelines.opUID("split_reader", conf))
+          .setParallelism(conf.get(FlinkOptions.READ_TASKS));
+      return new DataStreamSource<>(streamReadSource);
+    } else {
+      InputFormatSourceFunctionAdapter<RowData> func = new 
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
+      DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
+      return 
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+    }
+  }
+
+  /**
+   * Creates a new Hudi Flink Source V2 for reading data from the Hudi table.
+   *
+   * @see <a 
href="FLIP-27">https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface</a>
+   *
+   * @return the configured HoodieSource instance
+   */
+  private HoodieSource<RowData> createHoodieSource() {
+    ValidationUtils.checkState(metaClient != null, "MetaClient must be 
initialized before creating HoodieSource");
+    ValidationUtils.checkState(hadoopConf != null, "Hadoop configuration must 
be initialized");
+    ValidationUtils.checkState(conf != null, "Configuration must be 
initialized");
+
+    HoodieSchema tableSchema =  !tableDataExists() ? inferSchemaFromDdl() : 
getTableSchema();
+    final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
+    final RowType rowType = (RowType) rowDataType.getLogicalType();
+    final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
+
+    HoodieScanContext context = createHoodieScanContext(rowType);
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+    HoodieFlinkEngineContext flinkEngineContext = new 
HoodieFlinkEngineContext(hadoopConf.unwrap());
+    HoodieFlinkTable<RowData> flinkTable = 
HoodieFlinkTable.create(writeConfig, flinkEngineContext);
+    FlinkReaderContextFactory readerContextFactory = new 
FlinkReaderContextFactory(metaClient);
+    HoodieSplitReaderFunction splitReaderFunction = new 
HoodieSplitReaderFunction(
+        flinkTable,
+        readerContextFactory.getContext(),
+        conf,
+        tableSchema,
+        HoodieSchemaConverter.convertToSchema(requiredRowType),
+        conf.get(FlinkOptions.MERGE_TYPE),
+        Option.empty());
+    return new HoodieSource<>(context, splitReaderFunction, new 
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
+  }
+
+  /**
+   * Creates a HoodieScanContext for configuring the scan operation.
+   *
+   * @param rowType the row type for the scan
+   * @return the configured HoodieScanContext instance
+   */
+  private HoodieScanContext createHoodieScanContext(RowType rowType) {
+    return new HoodieScanContext
+        .Builder()
+        .conf(conf)
+        .path(new Path(path.toUri().getPath()))
+        .cdcEnabled(conf.get(FlinkOptions.CDC_ENABLED))
+        .rowType(rowType)
+        .startInstant(conf.get(FlinkOptions.READ_START_COMMIT))
+        .endInstant(conf.get(FlinkOptions.READ_END_COMMIT))
+        .skipCompaction(conf.get(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+        .skipClustering(conf.get(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
+        
.skipInsertOverwrite(conf.get(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
+        
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
+        .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
+        .build();
+  }
+
   /**
    * Specify the file distribution strategy based on different upstream 
writing mechanisms,
    *  to prevent hot spot issues during stream reading.
@@ -593,7 +693,7 @@ public class HoodieTableSource implements
     }
 
     return new CopyOnWriteInputFormat(
-        FilePathUtils.toFlinkPaths(paths),
+        paths,
         this.schema.getColumnNames().toArray(new String[0]),
         this.schema.getColumnDataTypes().toArray(new DataType[0]),
         this.requiredPos,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
similarity index 60%
rename from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
rename to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
index eb4b5cf47850..3bfce66d3eef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
@@ -34,16 +34,16 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Test cases for {@link ScanContext}.
+ * Test cases for {@link HoodieScanContext}.
  */
-public class TestScanContext {
+public class TestHoodieScanContext {
 
   @Test
   public void testGetConf() throws Exception {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, "/tmp/test");
 
-    ScanContext scanContext = createTestScanContext(conf,  new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf,  new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
 
@@ -57,7 +57,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     Path expectedPath = new Path("/tmp/test/table");
 
-    ScanContext scanContext = createTestScanContext(conf, expectedPath,
+    HoodieScanContext scanContext = createTestScanContext(conf, expectedPath,
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
 
@@ -69,7 +69,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     RowType rowType = TestConfigurations.ROW_TYPE;
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         rowType, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
 
@@ -82,7 +82,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     String expectedInstant = "20231201000000000";
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, expectedInstant, 100 * 1024 * 1024,
         1000, false, false, false, false);
 
@@ -95,7 +95,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     long expectedMemory = 1024L * 1024L * 1024L; // 1GB
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", expectedMemory,
         1000, false, false, false, false);
 
@@ -108,7 +108,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     long expectedMaxPendingSplits = 5000L;
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         expectedMaxPendingSplits, false, false, false, false);
 
@@ -120,12 +120,12 @@ public class TestScanContext {
   public void testSkipCompaction() throws Exception {
     Configuration conf = new Configuration();
 
-    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, true, false, false, false);
     assertTrue(scanContextTrue.skipCompaction(), "Skip compaction should be 
true");
 
-    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     assertFalse(scanContextFalse.skipCompaction(), "Skip compaction should be 
false");
@@ -135,12 +135,12 @@ public class TestScanContext {
   public void testSkipClustering() throws Exception {
     Configuration conf = new Configuration();
 
-    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, true, false, false);
     assertTrue(scanContextTrue.skipClustering(), "Skip clustering should be 
true");
 
-    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     assertFalse(scanContextFalse.skipClustering(), "Skip clustering should be 
false");
@@ -150,12 +150,12 @@ public class TestScanContext {
   public void testSkipInsertOverwrite() throws Exception {
     Configuration conf = new Configuration();
 
-    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, true, false);
     assertTrue(scanContextTrue.skipInsertOverwrite(), "Skip insert overwrite 
should be true");
 
-    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     assertFalse(scanContextFalse.skipInsertOverwrite(), "Skip insert overwrite 
should be false");
@@ -165,12 +165,12 @@ public class TestScanContext {
   public void testCdcEnabled() throws Exception {
     Configuration conf = new Configuration();
 
-    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, true);
     assertTrue(scanContextTrue.cdcEnabled(), "CDC should be enabled");
 
-    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     assertFalse(scanContextFalse.cdcEnabled(), "CDC should be disabled");
@@ -181,7 +181,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 5);
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     Duration scanInterval = scanContext.getScanInterval();
@@ -195,7 +195,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     // Not setting READ_STREAMING_CHECK_INTERVAL to use default
 
-    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
     Duration scanInterval = scanContext.getScanInterval();
@@ -214,7 +214,7 @@ public class TestScanContext {
     long maxCompactionMemory = 2L * 1024L * 1024L * 1024L; // 2GB
     long maxPendingSplits = 10000L;
 
-    ScanContext scanContext = createTestScanContext(conf, path,
+    HoodieScanContext scanContext = createTestScanContext(conf, path,
         TestConfigurations.ROW_TYPE, startInstant, maxCompactionMemory,
         maxPendingSplits, true, true, true, true);
 
@@ -234,7 +234,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     Path path = new Path("/tmp/test");
 
-    ScanContext scanContext = new ScanContext.Builder()
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(path)
         .rowType(TestConfigurations.ROW_TYPE)
@@ -256,8 +256,8 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     Path path = new Path("/tmp/test");
 
-    ScanContext.Builder builder = new ScanContext.Builder();
-    ScanContext scanContext = builder
+    HoodieScanContext.Builder builder = new HoodieScanContext.Builder();
+    HoodieScanContext scanContext = builder
         .conf(conf)
         .path(path)
         .rowType(TestConfigurations.ROW_TYPE)
@@ -286,7 +286,7 @@ public class TestScanContext {
     Path path = new Path("/tmp/test");
     String endInstant = "20240201000000";
 
-    ScanContext scanContext = new ScanContext.Builder()
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(path)
         .rowType(TestConfigurations.ROW_TYPE)
@@ -308,13 +308,13 @@ public class TestScanContext {
   public void testIsStreaming() throws Exception {
     Configuration conf = new Configuration();
 
-    ScanContext streamingScanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+    HoodieScanContext streamingScanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
         TestConfigurations.ROW_TYPE, "20240101000000", 100 * 1024 * 1024,
         1000, false, false, false, false);
 
     assertFalse(streamingScanContext.isStreaming(), "Should not be streaming 
by default");
 
-    ScanContext batchScanContext = new ScanContext.Builder()
+    HoodieScanContext batchScanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path("/tmp/test"))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -336,7 +336,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     String endCommit = "20240201000000";
 
-    ScanContext scanContext = new ScanContext.Builder()
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path("/tmp/test"))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -358,7 +358,7 @@ public class TestScanContext {
     Configuration conf = new Configuration();
     Path path = new Path("/tmp/test");
 
-    ScanContext.Builder builder = new ScanContext.Builder()
+    HoodieScanContext.Builder builder = new HoodieScanContext.Builder()
         .conf(conf)
         .path(path)
         .rowType(TestConfigurations.ROW_TYPE)
@@ -370,8 +370,8 @@ public class TestScanContext {
         .skipInsertOverwrite(false)
         .cdcEnabled(false);
 
-    ScanContext scanContext1 = builder.build();
-    ScanContext scanContext2 = builder.build();
+    HoodieScanContext scanContext1 = builder.build();
+    HoodieScanContext scanContext2 = builder.build();
 
     // Both should be valid but independent instances
     assertNotNull(scanContext1);
@@ -379,7 +379,7 @@ public class TestScanContext {
   }
 
   // Helper method to create ScanContext using the Builder
-  private ScanContext createTestScanContext(
+  private HoodieScanContext createTestScanContext(
       Configuration conf,
       Path path,
       RowType rowType,
@@ -390,7 +390,7 @@ public class TestScanContext {
       boolean skipClustering,
       boolean skipInsertOverwrite,
       boolean cdcEnabled) throws Exception {
-    return new ScanContext.Builder()
+    return new HoodieScanContext.Builder()
         .conf(conf)
         .path(path)
         .rowType(rowType)
@@ -403,4 +403,185 @@ public class TestScanContext {
         .cdcEnabled(cdcEnabled)
         .build();
   }
+
+  @Test
+  public void testStreamingModeConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+
+    // Test with isStreaming = true
+    HoodieScanContext streamingContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .isStreaming(true)
+        .build();
+
+    assertTrue(streamingContext.isStreaming(), "Streaming mode should be 
enabled");
+
+    // Test with isStreaming = false (batch mode)
+    HoodieScanContext batchContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .isStreaming(false)
+        .build();
+
+    assertFalse(batchContext.isStreaming(), "Batch mode should be enabled");
+  }
+
+  @Test
+  public void testStreamingModeDefault() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .build();
+
+    assertFalse(scanContext.isStreaming(), "Streaming mode should default to 
false");
+  }
+
+  @Test
+  public void testBuilderWithAllStreamingFlags() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(true)
+        .skipClustering(true)
+        .skipInsertOverwrite(true)
+        .cdcEnabled(true)
+        .isStreaming(true)
+        .build();
+
+    assertTrue(scanContext.skipCompaction(), "skipCompaction should be true");
+    assertTrue(scanContext.skipClustering(), "skipClustering should be true");
+    assertTrue(scanContext.skipInsertOverwrite(), "skipInsertOverwrite should 
be true");
+    assertTrue(scanContext.cdcEnabled(), "cdcEnabled should be true");
+    assertTrue(scanContext.isStreaming(), "isStreaming should be true");
+  }
+
+  @Test
+  public void testBuilderWithStartAndEndInstants() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+    String startInstant = "20240101000000";
+    String endInstant = "20240201000000";
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant(startInstant)
+        .endInstant(endInstant)
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .build();
+
+    assertEquals(startInstant, scanContext.getStartCommit(), "Start instant 
should match");
+    assertEquals(endInstant, scanContext.getEndCommit(), "End instant should 
match");
+  }
+
+  @Test
+  public void testBuilderWithCustomScanInterval() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 10);
+    Path path = new Path("/tmp/test");
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .build();
+
+    Duration scanInterval = scanContext.getScanInterval();
+    assertEquals(10, scanInterval.getSeconds(), "Scan interval should be 10 
seconds");
+  }
+
+  @Test
+  public void testBuilderWithLargeMemoryConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+    long largeMemory = 10L * 1024L * 1024L * 1024L; // 10GB
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(largeMemory)
+        .maxPendingSplits(1000)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .build();
+
+    assertEquals(largeMemory, scanContext.getMaxCompactionMemoryInBytes(),
+        "Large memory configuration should be preserved");
+  }
+
+  @Test
+  public void testBuilderWithHighMaxPendingSplits() throws Exception {
+    Configuration conf = new Configuration();
+    Path path = new Path("/tmp/test");
+    long highPendingSplits = 100000L;
+
+    HoodieScanContext scanContext = new HoodieScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20240101000000")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(highPendingSplits)
+        .skipCompaction(false)
+        .skipClustering(false)
+        .skipInsertOverwrite(false)
+        .cdcEnabled(false)
+        .build();
+
+    assertEquals(highPendingSplits, scanContext.getMaxPendingSplits(),
+        "High max pending splits should be preserved");
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index dd06b90e15f5..75d47c0e70ce 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -71,7 +71,7 @@ public class TestHoodieSource {
   File tempDir;
 
   private HoodieTableMetaClient mockMetaClient;
-  private ScanContext scanContext;
+  private HoodieScanContext scanContext;
   private SplitReaderFunction<RowData> mockReaderFunction;
   private SerializableComparator<HoodieSourceSplit> mockComparator;
   private HoodieRecordEmitter<RowData> mockRecordEmitter;
@@ -102,7 +102,7 @@ public class TestHoodieSource {
     conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
     conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 60);
 
-    scanContext = new ScanContext.Builder()
+    scanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(tempDir.getAbsolutePath()))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -214,7 +214,7 @@ public class TestHoodieSource {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
 
-    ScanContext streamingScanContext = new ScanContext.Builder()
+    HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(tempDir.getAbsolutePath()))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -278,7 +278,7 @@ public class TestHoodieSource {
     // Insert test data
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     metaClient.reloadActiveTimeline();
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
 
     HoodieSource<RowData> source = new HoodieSource<>(
         scanContext,
@@ -384,7 +384,7 @@ public class TestHoodieSource {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
 
-    ScanContext streamingScanContext = new ScanContext.Builder()
+    HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(tempDir.getAbsolutePath()))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -418,7 +418,7 @@ public class TestHoodieSource {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
 
-    ScanContext streamingScanContext = new ScanContext.Builder()
+    HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(tempDir.getAbsolutePath()))
         .rowType(TestConfigurations.ROW_TYPE)
@@ -529,16 +529,16 @@ public class TestHoodieSource {
     return mockEnumContext;
   }
 
-  private ScanContext createScanContext(Configuration conf) throws Exception {
+  private HoodieScanContext createScanContext(Configuration conf) throws 
Exception {
     return createScanContextWithSkipOptions(conf, false, false, false);
   }
 
-  private ScanContext createScanContextWithSkipOptions(
+  private HoodieScanContext createScanContextWithSkipOptions(
       Configuration conf,
       boolean skipCompaction,
       boolean skipClustering,
       boolean skipInsertOverwrite) throws Exception {
-    return new ScanContext.Builder()
+    return new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(tempDir.getAbsolutePath()))
         .rowType(TestConfigurations.ROW_TYPE)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 833b77b3dd64..a3400576f105 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -20,7 +20,7 @@ package org.apache.hudi.source.enumerator;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
 import org.apache.hudi.source.split.DefaultHoodieSplitProvider;
 import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
 import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
@@ -58,7 +58,7 @@ public class TestHoodieContinuousSplitEnumerator {
   private MockSplitEnumeratorContext context;
   private DefaultHoodieSplitProvider splitProvider;
   private MockContinuousSplitDiscover splitDiscover;
-  private ScanContext scanContext;
+  private HoodieScanContext scanContext;
   private HoodieContinuousSplitEnumerator enumerator;
   private HoodieSourceSplit split1;
   private HoodieSourceSplit split2;
@@ -396,7 +396,7 @@ public class TestHoodieContinuousSplitEnumerator {
   /**
    * Test implementation of ScanContext for testing.
    */
-  private static class TestScanContext extends ScanContext {
+  private static class TestScanContext extends HoodieScanContext {
     private TestScanContext(
         Configuration conf,
         Path path,
@@ -442,7 +442,7 @@ public class TestHoodieContinuousSplitEnumerator {
         return this;
       }
 
-      public ScanContext build() {
+      public HoodieScanContext build() {
         return new TestScanContext(conf, path, rowType, startInstant, 
maxPendingSplits);
       }
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index 49e03546d5fd..2a3ab40fd400 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.HoodieFlinkTable;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.junit.jupiter.api.BeforeEach;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
index 3c437e0e7e35..a562916a784d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -61,7 +61,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
         .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
     String lastInstant = 
commitsTimeline.lastInstant().get().getCompletionTime();
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -91,7 +91,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
     metaClient.reloadActiveTimeline();
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -113,7 +113,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
     // Insert test data
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -135,7 +135,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
     // Insert test data
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -162,7 +162,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
     HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
     String firstCompletionTime = firstInstant.getCompletionTime();
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -184,7 +184,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
     // Insert test data
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
 
-    ScanContext scanContext = createScanContextWithSkipOptions(conf, true, 
true, false);
+    HoodieScanContext scanContext = createScanContextWithSkipOptions(conf, 
true, true, false);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -199,7 +199,7 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
     metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
     Configuration conf = TestConfigurations.getDefaultConf(basePath);
 
-    ScanContext scanContext = createScanContext(conf);
+    HoodieScanContext scanContext = createScanContext(conf);
     DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
         scanContext, metaClient);
 
@@ -208,16 +208,16 @@ public class TestDefaultHoodieSplitDiscover extends 
HoodieCommonTestHarness {
 
   // Helper methods
 
-  private ScanContext createScanContext(Configuration conf) throws Exception {
+  private HoodieScanContext createScanContext(Configuration conf) throws 
Exception {
     return createScanContextWithSkipOptions(conf, false, false, false);
   }
 
-  private ScanContext createScanContextWithSkipOptions(
+  private HoodieScanContext createScanContextWithSkipOptions(
       Configuration conf,
       boolean skipCompaction,
       boolean skipClustering,
       boolean skipInsertOverwrite) throws Exception {
-    return new ScanContext.Builder()
+    return new HoodieScanContext.Builder()
         .conf(conf)
         .path(new Path(basePath))
         .rowType(TestConfigurations.ROW_TYPE)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 4e8713b53bb1..3c275c4c5da6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -557,4 +557,235 @@ public class TestHoodieTableSource {
         Arrays.asList(ref, literal),
         DataTypes.BOOLEAN());
   }
+
+  @Test
+  void testNewHoodieSourceCreationWithFlag() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource should be created 
successfully");
+
+    // Verify that the table source can be used in streaming mode with new 
source
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    HoodieTableSource streamingSource = createHoodieTableSource(conf);
+    assertNotNull(streamingSource, "Streaming HoodieTableSource should be 
created successfully");
+  }
+
+  @Test
+  void testNewHoodieSourceWithBatchMode() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.READ_AS_STREAMING, false);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "Batch mode HoodieTableSource with new source 
should be created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithStreamingMode() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "Streaming mode HoodieTableSource with new 
source should be created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithCDCEnabled() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.CDC_ENABLED, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with CDC enabled should be 
created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithReadRange() throws Exception {
+    beforeEach();
+    String firstCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+    TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+    String secondCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, firstCommitTime);
+    conf.set(FlinkOptions.READ_END_COMMIT, secondCommitTime);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with read range should be 
created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithSkipFlags() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with skip flags should be 
created");
+  }
+
+  @Test
+  void testNewHoodieSourceBackwardCompatibility() throws Exception {
+    beforeEach();
+    // Test that old behavior still works when READ_SOURCE_V2_ENABLED is false
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, false);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with old source should still 
work");
+  }
+
+  @Test
+  void testNewHoodieSourceWithCopyOnWriteTable() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with COW table should be 
created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithMergeOnReadTable() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with MOR table should be 
created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithSnapshotQuery() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with snapshot query should 
be created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithIncrementalQuery() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with incremental query 
should be created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithProjectionPushdown() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    // Apply projection - only select uuid and name columns
+    int[][] projections = new int[][] {{0}, {1}};
+    DataType producedType = DataTypes.ROW(
+        DataTypes.FIELD("uuid", DataTypes.STRING()),
+        DataTypes.FIELD("name", DataTypes.STRING())
+    ).bridgedTo(RowData.class);
+
+    tableSource.applyProjection(projections, producedType);
+    assertNotNull(tableSource, "HoodieTableSource with projection should 
work");
+  }
+
+  @Test
+  void testNewHoodieSourceWithFilterPushdown() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+
+    // Apply filter - uuid = 'id1'
+    FieldReferenceExpression ref = new FieldReferenceExpression("uuid", 
DataTypes.STRING(), 0, 0);
+    ValueLiteralExpression literal = new ValueLiteralExpression("id1", 
DataTypes.STRING().notNull());
+    ResolvedExpression filterExpr = CallExpression.permanent(
+        BuiltInFunctionDefinitions.EQUALS,
+        Arrays.asList(ref, literal),
+        DataTypes.BOOLEAN());
+
+    tableSource.applyFilters(Collections.singletonList(filterExpr));
+    assertNotNull(tableSource, "HoodieTableSource with filter should work");
+  }
+
+  @Test
+  void testNewHoodieSourceWithLimitPushdown() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    tableSource.applyLimit(100);
+    assertNotNull(tableSource, "HoodieTableSource with limit should work");
+  }
+
+  @Test
+  void testNewHoodieSourceWithPartitionedTable() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with partitioned table 
should be created");
+  }
+
+  @Test
+  void testNewHoodieSourceWithNonPartitionedTable() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.PARTITION_PATH_FIELD, "");
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with non-partitioned table 
should be created");
+  }
+
+  @Test
+  void testNewHoodieSourceCopyRetainsConfig() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
+
+    assertNotNull(copiedSource, "Copied source should not be null");
+    
assertEquals(tableSource.getConf().get(FlinkOptions.READ_SOURCE_V2_ENABLED),
+                 
copiedSource.getConf().get(FlinkOptions.READ_SOURCE_V2_ENABLED),
+                 "READ_SOURCE_V2_ENABLED flag should be retained in copy");
+  }
+
+  @Test
+  void testNewHoodieSourceWithMaxCompactionMemory() throws Exception {
+    beforeEach();
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 512);
+
+    HoodieTableSource tableSource = createHoodieTableSource(conf);
+    assertNotNull(tableSource, "HoodieTableSource with custom compaction 
memory should be created");
+  }
 }

Reply via email to