This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7ad789edf [FLINK-38723][common/composer] Add getFlinkConf method to
Context. (#4179)
7ad789edf is described below
commit 7ad789edf9aaf7cd3fddc38f2218caa91794b896
Author: Kunni <[email protected]>
AuthorDate: Wed Nov 26 16:04:39 2025 +0800
[FLINK-38723][common/composer] Add getFlinkConf method to Context. (#4179)
---
.../apache/flink/cdc/common/factories/Factory.java | 6 ++
.../flink/cdc/common/factories/FactoryHelper.java | 19 ++++
.../flink/translator/DataSinkTranslator.java | 3 +-
.../flink/translator/DataSourceTranslator.java | 3 +-
.../composer/flink/FlinkPipelineComposerTest.java | 101 +++++++++++++++++++++
.../org.apache.flink.cdc.common.factories.Factory | 4 +-
6 files changed, 133 insertions(+), 3 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
index 07675d02e..b8a13b744 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.factories;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import java.util.Set;
@@ -78,5 +79,10 @@ public interface Factory {
* <p>The class loader is in particular useful for discovering
factories.
*/
ClassLoader getClassLoader();
+
+ /** Returns the flink configuration of the current session. */
+ default ReadableConfig getFlinkConf() {
+ return new org.apache.flink.configuration.Configuration();
+ }
}
}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
index 62bdcb8b1..8fa928883 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
@@ -168,14 +168,28 @@ public class FactoryHelper {
private final Configuration factoryConfiguration;
private final ClassLoader classLoader;
private final Configuration pipelineConfiguration;
+ private final ReadableConfig flinkConf;
public DefaultContext(
Configuration factoryConfiguration,
Configuration pipelineConfiguration,
ClassLoader classLoader) {
+ this(
+ factoryConfiguration,
+ pipelineConfiguration,
+ classLoader,
+ new org.apache.flink.configuration.Configuration());
+ }
+
+ public DefaultContext(
+ Configuration factoryConfiguration,
+ Configuration pipelineConfiguration,
+ ClassLoader classLoader,
+ ReadableConfig flinkConf) {
this.factoryConfiguration = factoryConfiguration;
this.pipelineConfiguration = pipelineConfiguration;
this.classLoader = classLoader;
+ this.flinkConf = flinkConf;
}
@Override
@@ -192,5 +206,10 @@ public class FactoryHelper {
public ClassLoader getClassLoader() {
return classLoader;
}
+
+ @Override
+ public ReadableConfig getFlinkConf() {
+ return flinkConf;
+ }
}
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index 64e1b2359..aa530300b 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -76,7 +76,8 @@ public class DataSinkTranslator {
new FactoryHelper.DefaultContext(
sinkDef.getConfig(),
pipelineConfig,
- Thread.currentThread().getContextClassLoader()));
+ Thread.currentThread().getContextClassLoader(),
+ env.getConfiguration()));
}
public void translate(
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 833c83dc0..e4f684f8a 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -97,7 +97,8 @@ public class DataSourceTranslator {
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
pipelineConfig,
- Thread.currentThread().getContextClassLoader());
+ Thread.currentThread().getContextClassLoader(),
+ env.getConfiguration());
return sourceFactory.createDataSource(context);
}
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
index 193f7d1fb..15bcd0495 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
@@ -17,17 +17,25 @@
package org.apache.flink.cdc.composer.flink;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSource;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -38,8 +46,11 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.stream.Stream;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** A test for the {@link FlinkPipelineComposer}. */
@@ -72,6 +83,96 @@ class FlinkPipelineComposerTest {
.isEqualTo("0.0.0.0");
}
+ @Test
+ void testGettingFlinkConfiguration() {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ new SourceDef(TestDataSourceFactory.IDENTIFIER, null,
new Configuration()),
+ new SinkDef(TestDataSinkFactory.IDENTIFIER, null, new
Configuration()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new Configuration());
+
+ assertThatCode(() ->
composer.compose(pipelineDef)).doesNotThrowAnyException();
+ }
+
+ /** A dummy {@link DataSinkFactory} that validates the execution target. */
+ public static class TestDataSinkFactory implements DataSinkFactory {
+
+ public static final String IDENTIFIER = "test-sink-factory";
+
+ @Override
+ public DataSink createDataSink(Context context) {
+ // This option has no default value.
+ String target =
context.getFlinkConf().get(DeploymentOptions.TARGET);
+ if (!"local".equals(target)) {
+ throw new IllegalArgumentException(
+ "The flink configuration is invalid. Please check the
pipeline configuration.");
+ }
+ return new DataSink() {
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ return null;
+ }
+
+ @Override
+ public MetadataApplier getMetadataApplier() {
+ return schemaChangeEvent -> {};
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+ }
+
+ /** A dummy {@link DataSourceFactory} that validates the execution target.
*/
+ public static class TestDataSourceFactory implements DataSourceFactory {
+
+ public static final String IDENTIFIER = "test-source-factory";
+
+ @Override
+ public DataSource createDataSource(Context context) {
+ // This option has no default value.
+ String target =
context.getFlinkConf().get(DeploymentOptions.TARGET);
+ if (!"local".equals(target)) {
+ throw new IllegalArgumentException(
+ "The flink configuration is invalid. Please check the
pipeline configuration.");
+ }
+ return new ValuesDataSource(
+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+ }
+
@ParameterizedTest
@MethodSource
void testInvalidPipelineConfiguration(Configuration pipelineConfig) {
diff --git
a/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
b/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
index 97da54827..274faa007 100644
---
a/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
+++
b/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
@@ -16,4 +16,6 @@ org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1
org.apache.flink.cdc.composer.utils.factory.DataSinkFactory2
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory1
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory2
-org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
\ No newline at end of file
+org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
+org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSinkFactory
+org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSourceFactory
\ No newline at end of file