This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ad789edf [FLINK-38723][common/composer] Add getFlinkConf method to 
Context. (#4179)
7ad789edf is described below

commit 7ad789edf9aaf7cd3fddc38f2218caa91794b896
Author: Kunni <[email protected]>
AuthorDate: Wed Nov 26 16:04:39 2025 +0800

    [FLINK-38723][common/composer] Add getFlinkConf method to Context. (#4179)
---
 .../apache/flink/cdc/common/factories/Factory.java |   6 ++
 .../flink/cdc/common/factories/FactoryHelper.java  |  19 ++++
 .../flink/translator/DataSinkTranslator.java       |   3 +-
 .../flink/translator/DataSourceTranslator.java     |   3 +-
 .../composer/flink/FlinkPipelineComposerTest.java  | 101 +++++++++++++++++++++
 .../org.apache.flink.cdc.common.factories.Factory  |   4 +-
 6 files changed, 133 insertions(+), 3 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
index 07675d02e..b8a13b744 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.factories;
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 
 import java.util.Set;
 
@@ -78,5 +79,10 @@ public interface Factory {
          * <p>The class loader is in particular useful for discovering 
factories.
          */
         ClassLoader getClassLoader();
+
+        /** Returns the flink configuration of the current session. */
+        default ReadableConfig getFlinkConf() {
+            return new org.apache.flink.configuration.Configuration();
+        }
     }
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
index 62bdcb8b1..8fa928883 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java
@@ -168,14 +168,28 @@ public class FactoryHelper {
         private final Configuration factoryConfiguration;
         private final ClassLoader classLoader;
         private final Configuration pipelineConfiguration;
+        private final ReadableConfig flinkConf;
 
         public DefaultContext(
                 Configuration factoryConfiguration,
                 Configuration pipelineConfiguration,
                 ClassLoader classLoader) {
+            this(
+                    factoryConfiguration,
+                    pipelineConfiguration,
+                    classLoader,
+                    new org.apache.flink.configuration.Configuration());
+        }
+
+        public DefaultContext(
+                Configuration factoryConfiguration,
+                Configuration pipelineConfiguration,
+                ClassLoader classLoader,
+                ReadableConfig flinkConf) {
             this.factoryConfiguration = factoryConfiguration;
             this.pipelineConfiguration = pipelineConfiguration;
             this.classLoader = classLoader;
+            this.flinkConf = flinkConf;
         }
 
         @Override
@@ -192,5 +206,10 @@ public class FactoryHelper {
         public ClassLoader getClassLoader() {
             return classLoader;
         }
+
+        @Override
+        public ReadableConfig getFlinkConf() {
+            return flinkConf;
+        }
     }
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index 64e1b2359..aa530300b 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -76,7 +76,8 @@ public class DataSinkTranslator {
                 new FactoryHelper.DefaultContext(
                         sinkDef.getConfig(),
                         pipelineConfig,
-                        Thread.currentThread().getContextClassLoader()));
+                        Thread.currentThread().getContextClassLoader(),
+                        env.getConfiguration()));
     }
 
     public void translate(
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index 833c83dc0..e4f684f8a 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -97,7 +97,8 @@ public class DataSourceTranslator {
                 new FactoryHelper.DefaultContext(
                         sourceDef.getConfig(),
                         pipelineConfig,
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(),
+                        env.getConfiguration());
         return sourceFactory.createDataSource(context);
     }
 
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
index 193f7d1fb..15bcd0495 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java
@@ -17,17 +17,25 @@
 
 package org.apache.flink.cdc.composer.flink;
 
+import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.DataSourceFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
 import org.apache.flink.cdc.composer.definition.SinkDef;
 import org.apache.flink.cdc.composer.definition.SourceDef;
 import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
 import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSource;
+import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
+import org.apache.flink.configuration.DeploymentOptions;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
@@ -38,8 +46,11 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.stream.Stream;
 
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** A test for the {@link FlinkPipelineComposer}. */
@@ -72,6 +83,96 @@ class FlinkPipelineComposerTest {
                 .isEqualTo("0.0.0.0");
     }
 
+    @Test
+    void testGettingFlinkConfiguration() {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        new SourceDef(TestDataSourceFactory.IDENTIFIER, null, 
new Configuration()),
+                        new SinkDef(TestDataSinkFactory.IDENTIFIER, null, new 
Configuration()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new Configuration());
+
+        assertThatCode(() -> 
composer.compose(pipelineDef)).doesNotThrowAnyException();
+    }
+
+    /** A dummy {@link DataSinkFactory} that validates the execution target. */
+    public static class TestDataSinkFactory implements DataSinkFactory {
+
+        public static final String IDENTIFIER = "test-sink-factory";
+
+        @Override
+        public DataSink createDataSink(Context context) {
+            // This option has no default value.
+            String target = 
context.getFlinkConf().get(DeploymentOptions.TARGET);
+            if (!"local".equals(target)) {
+                throw new IllegalArgumentException(
+                        "The flink configuration is invalid. Please check the 
pipeline configuration.");
+            }
+            return new DataSink() {
+                @Override
+                public EventSinkProvider getEventSinkProvider() {
+                    return null;
+                }
+
+                @Override
+                public MetadataApplier getMetadataApplier() {
+                    return schemaChangeEvent -> {};
+                }
+            };
+        }
+
+        @Override
+        public String identifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public Set<ConfigOption<?>> requiredOptions() {
+            return new HashSet<>();
+        }
+
+        @Override
+        public Set<ConfigOption<?>> optionalOptions() {
+            return new HashSet<>();
+        }
+    }
+
+    /** A dummy {@link DataSourceFactory} that validates the execution target. 
*/
+    public static class TestDataSourceFactory implements DataSourceFactory {
+
+        public static final String IDENTIFIER = "test-source-factory";
+
+        @Override
+        public DataSource createDataSource(Context context) {
+            // This option has no default value.
+            String target = 
context.getFlinkConf().get(DeploymentOptions.TARGET);
+            if (!"local".equals(target)) {
+                throw new IllegalArgumentException(
+                        "The flink configuration is invalid. Please check the 
pipeline configuration.");
+            }
+            return new ValuesDataSource(
+                    
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE, Integer.MAX_VALUE);
+        }
+
+        @Override
+        public String identifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public Set<ConfigOption<?>> requiredOptions() {
+            return new HashSet<>();
+        }
+
+        @Override
+        public Set<ConfigOption<?>> optionalOptions() {
+            return new HashSet<>();
+        }
+    }
+
     @ParameterizedTest
     @MethodSource
     void testInvalidPipelineConfiguration(Configuration pipelineConfig) {
diff --git 
a/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
 
b/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
index 97da54827..274faa007 100644
--- 
a/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
+++ 
b/flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory
@@ -16,4 +16,6 @@ org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1
 org.apache.flink.cdc.composer.utils.factory.DataSinkFactory2
 org.apache.flink.cdc.composer.utils.factory.DataSourceFactory1
 org.apache.flink.cdc.composer.utils.factory.DataSourceFactory2
-org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
\ No newline at end of file
+org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
+org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSinkFactory
+org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSourceFactory
\ No newline at end of file

Reply via email to