This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8b5838791abb feat: enable new source integration in
`HoodieTableSource` (#18022)
8b5838791abb is described below
commit 8b5838791abb0c799fe8380e8fa0b56ec86d957a
Author: Peter Huang <[email protected]>
AuthorDate: Thu Jan 29 20:42:40 2026 -0800
feat: enable new source integration in `HoodieTableSource` (#18022)
---
.../apache/hudi/configuration/FlinkOptions.java | 6 +
.../{ScanContext.java => HoodieScanContext.java} | 12 +-
.../java/org/apache/hudi/source/HoodieSource.java | 15 +-
.../HoodieContinuousSplitEnumerator.java | 6 +-
.../source/split/DefaultHoodieSplitDiscover.java | 6 +-
.../org/apache/hudi/table/HoodieTableSource.java | 146 +++++++++++--
...ScanContext.java => TestHoodieScanContext.java} | 243 ++++++++++++++++++---
.../org/apache/hudi/source/TestHoodieSource.java | 18 +-
.../TestHoodieContinuousSplitEnumerator.java | 8 +-
.../function/TestHoodieSplitReaderFunction.java | 1 -
.../split/TestDefaultHoodieSplitDiscover.java | 22 +-
.../apache/hudi/table/TestHoodieTableSource.java | 231 ++++++++++++++++++++
12 files changed, 618 insertions(+), 96 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index ca03296a8599..e4795950e51e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -440,6 +440,12 @@ public class FlinkOptions extends HoodieConfig {
+ "the avg read splits number per-second would be
'read.splits.limit'/'read.streaming.check-interval', by "
+ "default no limit");
+ public static final ConfigOption<Boolean> READ_SOURCE_V2_ENABLED =
ConfigOptions
+ .key("read.source-v2.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to use Flink FLIP27 new source to consume data
files.");
+
@AdvancedConfig
public static final ConfigOption<Boolean> READ_CDC_FROM_CHANGELOG =
ConfigOptions
.key("read.cdc.from.changelog")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
similarity index 95%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
index 4e47f2150ccf..db2f25970698 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieScanContext.java
@@ -29,10 +29,10 @@ import java.io.Serializable;
import java.time.Duration;
/**
- * Hudi source scan context.
+ * Hudi source scan context for finding completed commits for streaming and
incremental read.
*/
@Internal
-public class ScanContext implements Serializable {
+public class HoodieScanContext implements Serializable {
private final Configuration conf;
private final Path path;
private final RowType rowType;
@@ -52,7 +52,7 @@ public class ScanContext implements Serializable {
// is streaming mode
private final boolean isStreaming;
- public ScanContext(
+ public HoodieScanContext(
Configuration conf,
Path path,
RowType rowType,
@@ -132,7 +132,7 @@ public class ScanContext implements Serializable {
}
/**
- * Builder for {@link ScanContext}.
+ * Builder for {@link HoodieScanContext}.
*/
public static class Builder {
private Configuration conf;
@@ -208,8 +208,8 @@ public class ScanContext implements Serializable {
return this;
}
- public ScanContext build() {
- return new ScanContext(
+ public HoodieScanContext build() {
+ return new HoodieScanContext(
conf,
path,
rowType,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
index cd4db5173c6e..a5dc05f07c42 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java
@@ -55,20 +55,25 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * Hoodie Source implementation.
- * @param <T> record Type
+ * Hudi Flink Source V2 implementation for Flink streaming and batch reads.
+ *
+ * <p>This source supports both bounded (batch) and unbounded (streaming) modes
+ * based on the configuration. It uses Flink's new Source API @see FLIP-27 to
+ * provide efficient reading of Hudi tables.
+ *
+ * @param <T> the record type to emit
*/
public class HoodieSource<T> implements Source<T, HoodieSourceSplit,
HoodieSplitEnumeratorState> {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieSource.class);
- private final ScanContext scanContext;
+ private final HoodieScanContext scanContext;
private final SplitReaderFunction<T> readerFunction;
private final SerializableComparator<HoodieSourceSplit> splitComparator;
private final HoodieTableMetaClient metaClient;
private final HoodieRecordEmitter<T> recordEmitter;
- HoodieSource(
- ScanContext scanContext,
+ public HoodieSource(
+ HoodieScanContext scanContext,
SplitReaderFunction<T> readerFunction,
SerializableComparator<HoodieSourceSplit> splitComparator,
HoodieTableMetaClient metaClient,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
index b45acd60374c..5f4a3e1d7ded 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -19,7 +19,7 @@
package org.apache.hudi.source.enumerator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
import org.apache.hudi.source.split.HoodieSourceSplit;
@@ -40,7 +40,7 @@ public class HoodieContinuousSplitEnumerator extends
AbstractHoodieSplitEnumerat
private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
private final HoodieSplitProvider splitProvider;
private final HoodieContinuousSplitDiscover splitDiscover;
- private final ScanContext scanContext;
+ private final HoodieScanContext scanContext;
/**
* Instant for the last enumerated commit. Next incremental enumeration
should be based off
@@ -52,7 +52,7 @@ public class HoodieContinuousSplitEnumerator extends
AbstractHoodieSplitEnumerat
SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
HoodieSplitProvider splitProvider,
HoodieContinuousSplitDiscover splitDiscover,
- ScanContext scanContext,
+ HoodieScanContext scanContext,
Option<HoodieSplitEnumeratorState> enumStateOpt) {
super(enumeratorContext, splitProvider);
this.enumeratorContext = enumeratorContext;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
index 718b78fe966e..1cd2d7586d93 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -20,7 +20,7 @@ package org.apache.hudi.source.split;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.source.IncrementalInputSplits;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,11 +32,11 @@ public class DefaultHoodieSplitDiscover implements
HoodieContinuousSplitDiscover
private static final Logger LOG =
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
private final HoodieTableMetaClient metaClient;
- private final ScanContext scanContext;
+ private final HoodieScanContext scanContext;
private final IncrementalInputSplits incrementalInputSplits;
public DefaultHoodieSplitDiscover(
- ScanContext scanContext,
+ HoodieScanContext scanContext,
HoodieTableMetaClient metaClient) {
this.scanContext = scanContext;
this.metaClient = metaClient;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 32fe9e5d9f8f..97fca224ee04 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
import org.apache.hudi.adapter.TableFunctionProviderAdapter;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -35,7 +36,9 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
@@ -47,6 +50,8 @@ import org.apache.hudi.source.ExpressionEvaluators;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.source.FileIndex;
+import org.apache.hudi.source.HoodieScanContext;
+import org.apache.hudi.source.HoodieSource;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
@@ -54,6 +59,9 @@ import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionBucketIdFunc;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
+import org.apache.hudi.source.reader.HoodieRecordEmitter;
+import org.apache.hudi.source.reader.function.HoodieSplitReaderFunction;
+import org.apache.hudi.source.split.HoodieSourceSplitComparator;
import
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
import
org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
import org.apache.hudi.source.rebalance.selector.StreamReadAppendKeySelector;
@@ -63,6 +71,7 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.FlinkReaderContextFactory;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
@@ -74,6 +83,7 @@ import org.apache.hudi.table.lookup.HoodieLookupTableReader;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.ExpressionUtils;
+import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.SerializableSchema;
@@ -81,11 +91,13 @@ import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -105,7 +117,6 @@ import
org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
import javax.annotation.Nullable;
@@ -141,7 +152,6 @@ public class HoodieTableSource implements
SupportsReadingMetadata,
Serializable {
private static final long serialVersionUID = 1L;
-
private static final long NO_LIMIT_CONSTANT = -1;
private final StorageConfiguration<org.apache.hadoop.conf.Configuration>
hadoopConf;
@@ -221,32 +231,122 @@ public class HoodieTableSource implements
TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
OptionsInference.setupSourceTasks(conf, execEnv.getParallelism());
- if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
- StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
- conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, partitionPruner);
- InputFormat<RowData, ?> inputFormat = getInputFormat(true);
- OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
- SingleOutputStreamOperator<MergeOnReadInputSplit>
monitorOperatorStream = execEnv.addSource(monitoringFunction,
getSourceOperatorName("split_monitor"))
- .uid(Pipelines.opUID("split_monitor", conf))
- .setParallelism(1)
- .setMaxParallelism(1);
-
- DataStream<MergeOnReadInputSplit> sourceWithKey =
addFileDistributionStrategy(monitorOperatorStream);
-
- SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
- .transform("split_reader", typeInfo, factory)
- .uid(Pipelines.opUID("split_reader", conf))
- .setParallelism(conf.get(FlinkOptions.READ_TASKS));
- return new DataStreamSource<>(streamReadSource);
+
+ if (conf.get(FlinkOptions.READ_SOURCE_V2_ENABLED)) {
+ return produceNewSourceDataStream(execEnv);
} else {
- InputFormatSourceFunctionAdapter<RowData> func = new
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
- DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
- return
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ return produceLegacySourceDataStream(execEnv, typeInfo);
}
}
};
}
+ /**
+ * Produces a DataStream using the new FLIP-27 HoodieSource.
+ *
+ * @param execEnv the stream execution environment
+ * @return the configured DataStream
+ */
+ private DataStream<RowData>
produceNewSourceDataStream(StreamExecutionEnvironment execEnv) {
+ HoodieSource<RowData> hoodieSource = createHoodieSource();
+ DataStreamSource<RowData> source = execEnv.fromSource(
+ hoodieSource, WatermarkStrategy.noWatermarks(), "hudi_source");
+
+
+
+ return
source.name(getSourceOperatorName("hudi_source")).uid(Pipelines.opUID("hudi_source",
conf)).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ }
+
+ /**
+ * Produces a DataStream using the legacy source implementation.
+ *
+ * @param execEnv the stream execution environment
+ * @param typeInfo type information for RowData
+ * @return the configured DataStream
+ */
+ private DataStream<RowData> produceLegacySourceDataStream(
+ StreamExecutionEnvironment execEnv,
+ TypeInformation<RowData> typeInfo) {
+ if (conf.get(FlinkOptions.READ_AS_STREAMING)) {
+ StreamReadMonitoringFunction monitoringFunction = new
StreamReadMonitoringFunction(
+ conf, FilePathUtils.toFlinkPath(path), tableRowType,
maxCompactionMemoryInBytes, partitionPruner);
+ InputFormat<RowData, ?> inputFormat = getInputFormat(true);
+ OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory =
StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
+ SingleOutputStreamOperator<MergeOnReadInputSplit> monitorOperatorStream
= execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
+ .uid(Pipelines.opUID("split_monitor", conf))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+
+ DataStream<MergeOnReadInputSplit> sourceWithKey =
addFileDistributionStrategy(monitorOperatorStream);
+
+ SingleOutputStreamOperator<RowData> streamReadSource = sourceWithKey
+ .transform("split_reader", typeInfo, factory)
+ .uid(Pipelines.opUID("split_reader", conf))
+ .setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ return new DataStreamSource<>(streamReadSource);
+ } else {
+ InputFormatSourceFunctionAdapter<RowData> func = new
InputFormatSourceFunctionAdapter<>(getInputFormat(), typeInfo);
+ DataStreamSource<RowData> source = execEnv.addSource(func,
asSummaryString(), typeInfo);
+ return
source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.get(FlinkOptions.READ_TASKS));
+ }
+ }
+
+ /**
+ * Creates a new Hudi Flink Source V2 for reading data from the Hudi table.
+ *
+ * @see <a
href="FLIP-27">https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface</a>
+ *
+ * @return the configured HoodieSource instance
+ */
+ private HoodieSource<RowData> createHoodieSource() {
+ ValidationUtils.checkState(metaClient != null, "MetaClient must be
initialized before creating HoodieSource");
+ ValidationUtils.checkState(hadoopConf != null, "Hadoop configuration must
be initialized");
+ ValidationUtils.checkState(conf != null, "Configuration must be
initialized");
+
+ HoodieSchema tableSchema = !tableDataExists() ? inferSchemaFromDdl() :
getTableSchema();
+ final DataType rowDataType =
HoodieSchemaConverter.convertToDataType(tableSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
+ final RowType requiredRowType = (RowType)
getProducedDataType().notNull().getLogicalType();
+
+ HoodieScanContext context = createHoodieScanContext(rowType);
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+ HoodieFlinkEngineContext flinkEngineContext = new
HoodieFlinkEngineContext(hadoopConf.unwrap());
+ HoodieFlinkTable<RowData> flinkTable =
HoodieFlinkTable.create(writeConfig, flinkEngineContext);
+ FlinkReaderContextFactory readerContextFactory = new
FlinkReaderContextFactory(metaClient);
+ HoodieSplitReaderFunction splitReaderFunction = new
HoodieSplitReaderFunction(
+ flinkTable,
+ readerContextFactory.getContext(),
+ conf,
+ tableSchema,
+ HoodieSchemaConverter.convertToSchema(requiredRowType),
+ conf.get(FlinkOptions.MERGE_TYPE),
+ Option.empty());
+ return new HoodieSource<>(context, splitReaderFunction, new
HoodieSourceSplitComparator(), metaClient, new HoodieRecordEmitter<>());
+ }
+
+ /**
+ * Creates a HoodieScanContext for configuring the scan operation.
+ *
+ * @param rowType the row type for the scan
+ * @return the configured HoodieScanContext instance
+ */
+ private HoodieScanContext createHoodieScanContext(RowType rowType) {
+ return new HoodieScanContext
+ .Builder()
+ .conf(conf)
+ .path(new Path(path.toUri().getPath()))
+ .cdcEnabled(conf.get(FlinkOptions.CDC_ENABLED))
+ .rowType(rowType)
+ .startInstant(conf.get(FlinkOptions.READ_START_COMMIT))
+ .endInstant(conf.get(FlinkOptions.READ_END_COMMIT))
+ .skipCompaction(conf.get(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+ .skipClustering(conf.get(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
+
.skipInsertOverwrite(conf.get(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE))
+
.maxCompactionMemoryInBytes(conf.get(FlinkOptions.COMPACTION_MAX_MEMORY))
+ .isStreaming(conf.get(FlinkOptions.READ_AS_STREAMING))
+ .build();
+ }
+
/**
* Specify the file distribution strategy based on different upstream
writing mechanisms,
* to prevent hot spot issues during stream reading.
@@ -593,7 +693,7 @@ public class HoodieTableSource implements
}
return new CopyOnWriteInputFormat(
- FilePathUtils.toFlinkPaths(paths),
+ paths,
this.schema.getColumnNames().toArray(new String[0]),
this.schema.getColumnDataTypes().toArray(new DataType[0]),
this.requiredPos,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
similarity index 60%
rename from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
rename to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
index eb4b5cf47850..3bfce66d3eef 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieScanContext.java
@@ -34,16 +34,16 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Test cases for {@link ScanContext}.
+ * Test cases for {@link HoodieScanContext}.
*/
-public class TestScanContext {
+public class TestHoodieScanContext {
@Test
public void testGetConf() throws Exception {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, "/tmp/test");
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
@@ -57,7 +57,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
Path expectedPath = new Path("/tmp/test/table");
- ScanContext scanContext = createTestScanContext(conf, expectedPath,
+ HoodieScanContext scanContext = createTestScanContext(conf, expectedPath,
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
@@ -69,7 +69,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
RowType rowType = TestConfigurations.ROW_TYPE;
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
rowType, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
@@ -82,7 +82,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
String expectedInstant = "20231201000000000";
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, expectedInstant, 100 * 1024 * 1024,
1000, false, false, false, false);
@@ -95,7 +95,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
long expectedMemory = 1024L * 1024L * 1024L; // 1GB
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", expectedMemory,
1000, false, false, false, false);
@@ -108,7 +108,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
long expectedMaxPendingSplits = 5000L;
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
expectedMaxPendingSplits, false, false, false, false);
@@ -120,12 +120,12 @@ public class TestScanContext {
public void testSkipCompaction() throws Exception {
Configuration conf = new Configuration();
- ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, true, false, false, false);
assertTrue(scanContextTrue.skipCompaction(), "Skip compaction should be
true");
- ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
assertFalse(scanContextFalse.skipCompaction(), "Skip compaction should be
false");
@@ -135,12 +135,12 @@ public class TestScanContext {
public void testSkipClustering() throws Exception {
Configuration conf = new Configuration();
- ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, true, false, false);
assertTrue(scanContextTrue.skipClustering(), "Skip clustering should be
true");
- ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
assertFalse(scanContextFalse.skipClustering(), "Skip clustering should be
false");
@@ -150,12 +150,12 @@ public class TestScanContext {
public void testSkipInsertOverwrite() throws Exception {
Configuration conf = new Configuration();
- ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, true, false);
assertTrue(scanContextTrue.skipInsertOverwrite(), "Skip insert overwrite
should be true");
- ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
assertFalse(scanContextFalse.skipInsertOverwrite(), "Skip insert overwrite
should be false");
@@ -165,12 +165,12 @@ public class TestScanContext {
public void testCdcEnabled() throws Exception {
Configuration conf = new Configuration();
- ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, true);
assertTrue(scanContextTrue.cdcEnabled(), "CDC should be enabled");
- ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
assertFalse(scanContextFalse.cdcEnabled(), "CDC should be disabled");
@@ -181,7 +181,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 5);
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
Duration scanInterval = scanContext.getScanInterval();
@@ -195,7 +195,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
// Not setting READ_STREAMING_CHECK_INTERVAL to use default
- ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
1000, false, false, false, false);
Duration scanInterval = scanContext.getScanInterval();
@@ -214,7 +214,7 @@ public class TestScanContext {
long maxCompactionMemory = 2L * 1024L * 1024L * 1024L; // 2GB
long maxPendingSplits = 10000L;
- ScanContext scanContext = createTestScanContext(conf, path,
+ HoodieScanContext scanContext = createTestScanContext(conf, path,
TestConfigurations.ROW_TYPE, startInstant, maxCompactionMemory,
maxPendingSplits, true, true, true, true);
@@ -234,7 +234,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
Path path = new Path("/tmp/test");
- ScanContext scanContext = new ScanContext.Builder()
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(path)
.rowType(TestConfigurations.ROW_TYPE)
@@ -256,8 +256,8 @@ public class TestScanContext {
Configuration conf = new Configuration();
Path path = new Path("/tmp/test");
- ScanContext.Builder builder = new ScanContext.Builder();
- ScanContext scanContext = builder
+ HoodieScanContext.Builder builder = new HoodieScanContext.Builder();
+ HoodieScanContext scanContext = builder
.conf(conf)
.path(path)
.rowType(TestConfigurations.ROW_TYPE)
@@ -286,7 +286,7 @@ public class TestScanContext {
Path path = new Path("/tmp/test");
String endInstant = "20240201000000";
- ScanContext scanContext = new ScanContext.Builder()
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(path)
.rowType(TestConfigurations.ROW_TYPE)
@@ -308,13 +308,13 @@ public class TestScanContext {
public void testIsStreaming() throws Exception {
Configuration conf = new Configuration();
- ScanContext streamingScanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ HoodieScanContext streamingScanContext = createTestScanContext(conf, new
Path("/tmp/test"),
TestConfigurations.ROW_TYPE, "20240101000000", 100 * 1024 * 1024,
1000, false, false, false, false);
assertFalse(streamingScanContext.isStreaming(), "Should not be streaming
by default");
- ScanContext batchScanContext = new ScanContext.Builder()
+ HoodieScanContext batchScanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path("/tmp/test"))
.rowType(TestConfigurations.ROW_TYPE)
@@ -336,7 +336,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
String endCommit = "20240201000000";
- ScanContext scanContext = new ScanContext.Builder()
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path("/tmp/test"))
.rowType(TestConfigurations.ROW_TYPE)
@@ -358,7 +358,7 @@ public class TestScanContext {
Configuration conf = new Configuration();
Path path = new Path("/tmp/test");
- ScanContext.Builder builder = new ScanContext.Builder()
+ HoodieScanContext.Builder builder = new HoodieScanContext.Builder()
.conf(conf)
.path(path)
.rowType(TestConfigurations.ROW_TYPE)
@@ -370,8 +370,8 @@ public class TestScanContext {
.skipInsertOverwrite(false)
.cdcEnabled(false);
- ScanContext scanContext1 = builder.build();
- ScanContext scanContext2 = builder.build();
+ HoodieScanContext scanContext1 = builder.build();
+ HoodieScanContext scanContext2 = builder.build();
// Both should be valid but independent instances
assertNotNull(scanContext1);
@@ -379,7 +379,7 @@ public class TestScanContext {
}
// Helper method to create ScanContext using the Builder
- private ScanContext createTestScanContext(
+ private HoodieScanContext createTestScanContext(
Configuration conf,
Path path,
RowType rowType,
@@ -390,7 +390,7 @@ public class TestScanContext {
boolean skipClustering,
boolean skipInsertOverwrite,
boolean cdcEnabled) throws Exception {
- return new ScanContext.Builder()
+ return new HoodieScanContext.Builder()
.conf(conf)
.path(path)
.rowType(rowType)
@@ -403,4 +403,185 @@ public class TestScanContext {
.cdcEnabled(cdcEnabled)
.build();
}
+
+ @Test
+ public void testStreamingModeConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+
+ // Test with isStreaming = true
+ HoodieScanContext streamingContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .isStreaming(true)
+ .build();
+
+ assertTrue(streamingContext.isStreaming(), "Streaming mode should be
enabled");
+
+ // Test with isStreaming = false (batch mode)
+ HoodieScanContext batchContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .isStreaming(false)
+ .build();
+
+ assertFalse(batchContext.isStreaming(), "Batch mode should be enabled");
+ }
+
+ @Test
+ public void testStreamingModeDefault() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .build();
+
+ assertFalse(scanContext.isStreaming(), "Streaming mode should default to
false");
+ }
+
+ @Test
+ public void testBuilderWithAllStreamingFlags() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(true)
+ .skipClustering(true)
+ .skipInsertOverwrite(true)
+ .cdcEnabled(true)
+ .isStreaming(true)
+ .build();
+
+ assertTrue(scanContext.skipCompaction(), "skipCompaction should be true");
+ assertTrue(scanContext.skipClustering(), "skipClustering should be true");
+ assertTrue(scanContext.skipInsertOverwrite(), "skipInsertOverwrite should
be true");
+ assertTrue(scanContext.cdcEnabled(), "cdcEnabled should be true");
+ assertTrue(scanContext.isStreaming(), "isStreaming should be true");
+ }
+
+ @Test
+ public void testBuilderWithStartAndEndInstants() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+ String startInstant = "20240101000000";
+ String endInstant = "20240201000000";
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant(startInstant)
+ .endInstant(endInstant)
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .build();
+
+ assertEquals(startInstant, scanContext.getStartCommit(), "Start instant
should match");
+ assertEquals(endInstant, scanContext.getEndCommit(), "End instant should
match");
+ }
+
+ @Test
+ public void testBuilderWithCustomScanInterval() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 10);
+ Path path = new Path("/tmp/test");
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .build();
+
+ Duration scanInterval = scanContext.getScanInterval();
+ assertEquals(10, scanInterval.getSeconds(), "Scan interval should be 10
seconds");
+ }
+
+ @Test
+ public void testBuilderWithLargeMemoryConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+ long largeMemory = 10L * 1024L * 1024L * 1024L; // 10GB
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(largeMemory)
+ .maxPendingSplits(1000)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .build();
+
+ assertEquals(largeMemory, scanContext.getMaxCompactionMemoryInBytes(),
+ "Large memory configuration should be preserved");
+ }
+
+ @Test
+ public void testBuilderWithHighMaxPendingSplits() throws Exception {
+ Configuration conf = new Configuration();
+ Path path = new Path("/tmp/test");
+ long highPendingSplits = 100000L;
+
+ HoodieScanContext scanContext = new HoodieScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20240101000000")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(highPendingSplits)
+ .skipCompaction(false)
+ .skipClustering(false)
+ .skipInsertOverwrite(false)
+ .cdcEnabled(false)
+ .build();
+
+ assertEquals(highPendingSplits, scanContext.getMaxPendingSplits(),
+ "High max pending splits should be preserved");
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
index dd06b90e15f5..75d47c0e70ce 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestHoodieSource.java
@@ -71,7 +71,7 @@ public class TestHoodieSource {
File tempDir;
private HoodieTableMetaClient mockMetaClient;
- private ScanContext scanContext;
+ private HoodieScanContext scanContext;
private SplitReaderFunction<RowData> mockReaderFunction;
private SerializableComparator<HoodieSourceSplit> mockComparator;
private HoodieRecordEmitter<RowData> mockRecordEmitter;
@@ -102,7 +102,7 @@ public class TestHoodieSource {
conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 60);
- scanContext = new ScanContext.Builder()
+ scanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(tempDir.getAbsolutePath()))
.rowType(TestConfigurations.ROW_TYPE)
@@ -214,7 +214,7 @@ public class TestHoodieSource {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
- ScanContext streamingScanContext = new ScanContext.Builder()
+ HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(tempDir.getAbsolutePath()))
.rowType(TestConfigurations.ROW_TYPE)
@@ -278,7 +278,7 @@ public class TestHoodieSource {
// Insert test data
TestData.writeData(TestData.DATA_SET_INSERT, conf);
metaClient.reloadActiveTimeline();
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
HoodieSource<RowData> source = new HoodieSource<>(
scanContext,
@@ -384,7 +384,7 @@ public class TestHoodieSource {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
- ScanContext streamingScanContext = new ScanContext.Builder()
+ HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(tempDir.getAbsolutePath()))
.rowType(TestConfigurations.ROW_TYPE)
@@ -418,7 +418,7 @@ public class TestHoodieSource {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tempDir.getAbsolutePath());
- ScanContext streamingScanContext = new ScanContext.Builder()
+ HoodieScanContext streamingScanContext = new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(tempDir.getAbsolutePath()))
.rowType(TestConfigurations.ROW_TYPE)
@@ -529,16 +529,16 @@ public class TestHoodieSource {
return mockEnumContext;
}
- private ScanContext createScanContext(Configuration conf) throws Exception {
+ private HoodieScanContext createScanContext(Configuration conf) throws
Exception {
return createScanContextWithSkipOptions(conf, false, false, false);
}
- private ScanContext createScanContextWithSkipOptions(
+ private HoodieScanContext createScanContextWithSkipOptions(
Configuration conf,
boolean skipCompaction,
boolean skipClustering,
boolean skipInsertOverwrite) throws Exception {
- return new ScanContext.Builder()
+ return new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(tempDir.getAbsolutePath()))
.rowType(TestConfigurations.ROW_TYPE)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
index 833b77b3dd64..a3400576f105 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -20,7 +20,7 @@ package org.apache.hudi.source.enumerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
import org.apache.hudi.source.split.DefaultHoodieSplitProvider;
import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
@@ -58,7 +58,7 @@ public class TestHoodieContinuousSplitEnumerator {
private MockSplitEnumeratorContext context;
private DefaultHoodieSplitProvider splitProvider;
private MockContinuousSplitDiscover splitDiscover;
- private ScanContext scanContext;
+ private HoodieScanContext scanContext;
private HoodieContinuousSplitEnumerator enumerator;
private HoodieSourceSplit split1;
private HoodieSourceSplit split2;
@@ -396,7 +396,7 @@ public class TestHoodieContinuousSplitEnumerator {
/**
* Test implementation of ScanContext for testing.
*/
- private static class TestScanContext extends ScanContext {
+ private static class TestScanContext extends HoodieScanContext {
private TestScanContext(
Configuration conf,
Path path,
@@ -442,7 +442,7 @@ public class TestHoodieContinuousSplitEnumerator {
return this;
}
- public ScanContext build() {
+ public HoodieScanContext build() {
return new TestScanContext(conf, path, rowType, startInstant,
maxPendingSplits);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
index 49e03546d5fd..2a3ab40fd400 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/reader/function/TestHoodieSplitReaderFunction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.table.HoodieFlinkTable;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.BeforeEach;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
index 3c437e0e7e35..a562916a784d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.HoodieScanContext;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -61,7 +61,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
.filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
String lastInstant =
commitsTimeline.lastInstant().get().getCompletionTime();
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -91,7 +91,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
metaClient.reloadActiveTimeline();
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -113,7 +113,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
// Insert test data
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -135,7 +135,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
// Insert test data
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -162,7 +162,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
String firstCompletionTime = firstInstant.getCompletionTime();
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -184,7 +184,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
// Insert test data
TestData.writeData(TestData.DATA_SET_INSERT, conf);
- ScanContext scanContext = createScanContextWithSkipOptions(conf, true,
true, false);
+ HoodieScanContext scanContext = createScanContextWithSkipOptions(conf,
true, true, false);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -199,7 +199,7 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
Configuration conf = TestConfigurations.getDefaultConf(basePath);
- ScanContext scanContext = createScanContext(conf);
+ HoodieScanContext scanContext = createScanContext(conf);
DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
scanContext, metaClient);
@@ -208,16 +208,16 @@ public class TestDefaultHoodieSplitDiscover extends
HoodieCommonTestHarness {
// Helper methods
- private ScanContext createScanContext(Configuration conf) throws Exception {
+ private HoodieScanContext createScanContext(Configuration conf) throws
Exception {
return createScanContextWithSkipOptions(conf, false, false, false);
}
- private ScanContext createScanContextWithSkipOptions(
+ private HoodieScanContext createScanContextWithSkipOptions(
Configuration conf,
boolean skipCompaction,
boolean skipClustering,
boolean skipInsertOverwrite) throws Exception {
- return new ScanContext.Builder()
+ return new HoodieScanContext.Builder()
.conf(conf)
.path(new Path(basePath))
.rowType(TestConfigurations.ROW_TYPE)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 4e8713b53bb1..3c275c4c5da6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -557,4 +557,235 @@ public class TestHoodieTableSource {
Arrays.asList(ref, literal),
DataTypes.BOOLEAN());
}
+
+ @Test
+ void testNewHoodieSourceCreationWithFlag() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource should be created
successfully");
+
+ // Verify that the table source can be used in streaming mode with new
source
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ HoodieTableSource streamingSource = createHoodieTableSource(conf);
+ assertNotNull(streamingSource, "Streaming HoodieTableSource should be
created successfully");
+ }
+
+ @Test
+ void testNewHoodieSourceWithBatchMode() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.READ_AS_STREAMING, false);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "Batch mode HoodieTableSource with new source
should be created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithStreamingMode() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "Streaming mode HoodieTableSource with new
source should be created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithCDCEnabled() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.CDC_ENABLED, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with CDC enabled should be
created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithReadRange() throws Exception {
+ beforeEach();
+ String firstCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+ TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+ String secondCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.READ_START_COMMIT, firstCommitTime);
+ conf.set(FlinkOptions.READ_END_COMMIT, secondCommitTime);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with read range should be
created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithSkipFlags() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_INSERT_OVERWRITE, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with skip flags should be
created");
+ }
+
+ @Test
+ void testNewHoodieSourceBackwardCompatibility() throws Exception {
+ beforeEach();
+ // Test that old behavior still works when READ_SOURCE_V2_ENABLED is false
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, false);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with old source should still
work");
+ }
+
+ @Test
+ void testNewHoodieSourceWithCopyOnWriteTable() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with COW table should be
created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithMergeOnReadTable() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with MOR table should be
created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithSnapshotQuery() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with snapshot query should
be created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithIncrementalQuery() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with incremental query
should be created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithProjectionPushdown() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ // Apply projection - only select uuid and name columns
+ int[][] projections = new int[][] {{0}, {1}};
+ DataType producedType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.STRING()),
+ DataTypes.FIELD("name", DataTypes.STRING())
+ ).bridgedTo(RowData.class);
+
+ tableSource.applyProjection(projections, producedType);
+ assertNotNull(tableSource, "HoodieTableSource with projection should
work");
+ }
+
+ @Test
+ void testNewHoodieSourceWithFilterPushdown() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+
+ // Apply filter - uuid = 'id1'
+ FieldReferenceExpression ref = new FieldReferenceExpression("uuid",
DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression literal = new ValueLiteralExpression("id1",
DataTypes.STRING().notNull());
+ ResolvedExpression filterExpr = CallExpression.permanent(
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(ref, literal),
+ DataTypes.BOOLEAN());
+
+ tableSource.applyFilters(Collections.singletonList(filterExpr));
+ assertNotNull(tableSource, "HoodieTableSource with filter should work");
+ }
+
+ @Test
+ void testNewHoodieSourceWithLimitPushdown() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ tableSource.applyLimit(100);
+ assertNotNull(tableSource, "HoodieTableSource with limit should work");
+ }
+
+ @Test
+ void testNewHoodieSourceWithPartitionedTable() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with partitioned table
should be created");
+ }
+
+ @Test
+ void testNewHoodieSourceWithNonPartitionedTable() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.PARTITION_PATH_FIELD, "");
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with non-partitioned table
should be created");
+ }
+
+ @Test
+ void testNewHoodieSourceCopyRetainsConfig() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
+
+ assertNotNull(copiedSource, "Copied source should not be null");
+
assertEquals(tableSource.getConf().get(FlinkOptions.READ_SOURCE_V2_ENABLED),
+
copiedSource.getConf().get(FlinkOptions.READ_SOURCE_V2_ENABLED),
+ "READ_SOURCE_V2_ENABLED flag should be retained in copy");
+ }
+
+ @Test
+ void testNewHoodieSourceWithMaxCompactionMemory() throws Exception {
+ beforeEach();
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.READ_SOURCE_V2_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 512);
+
+ HoodieTableSource tableSource = createHoodieTableSource(conf);
+ assertNotNull(tableSource, "HoodieTableSource with custom compaction
memory should be created");
+ }
}