This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b361db58a [FLINK-36007][[cdc-composer]  Loading factory and added jar 
in one search
b361db58a is described below

commit b361db58ab19021a7ec23742cf7779f9e2eb65b5
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 9 23:12:04 2024 +0800

    [FLINK-36007][[cdc-composer]  Loading factory and added jar in one search
    
    This close #3520.
---
 .../cdc/composer/flink/FlinkPipelineComposer.java  | 28 ++----------------
 .../flink/translator/DataSinkTranslator.java       | 24 +++++++++++++++
 .../flink/translator/DataSourceTranslator.java     | 34 ++++++++++++----------
 .../cdc/composer/utils/FactoryDiscoveryUtils.java  | 11 ++++---
 .../composer/utils/FactoryDiscoveryUtilsTest.java  |  8 -----
 5 files changed, 53 insertions(+), 52 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index ffb328f1f..ca4378ad1 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -18,24 +18,19 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.factories.DataSinkFactory;
-import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.sink.DataSink;
 import org.apache.flink.cdc.composer.PipelineComposer;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
-import org.apache.flink.cdc.composer.definition.SinkDef;
 import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
 import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
 import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
 import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
 import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
 import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
-import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -130,7 +125,9 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                         pipelineDef.getUdfs());
 
         // Build DataSink in advance as schema operator requires 
MetadataApplier
-        DataSink dataSink = createDataSink(pipelineDef.getSink(), 
pipelineDef.getConfig());
+        DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+        DataSink dataSink =
+                sinkTranslator.createDataSink(pipelineDef.getSink(), 
pipelineDef.getConfig(), env);
 
         stream =
                 schemaOperatorTranslator.translate(
@@ -152,7 +149,6 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                         dataSink.getDataChangeEventHashFunctionProvider());
 
         // Build Sink Operator
-        DataSinkTranslator sinkTranslator = new DataSinkTranslator();
         sinkTranslator.translate(
                 pipelineDef.getSink(), stream, dataSink, 
schemaOperatorIDGenerator.generate());
 
@@ -163,24 +159,6 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                 env, 
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
     }
 
-    private DataSink createDataSink(SinkDef sinkDef, Configuration 
pipelineConfig) {
-        // Search the data sink factory
-        DataSinkFactory sinkFactory =
-                FactoryDiscoveryUtils.getFactoryByIdentifier(
-                        sinkDef.getType(), DataSinkFactory.class);
-
-        // Include sink connector JAR
-        FactoryDiscoveryUtils.getJarPathByIdentifier(sinkDef.getType(), 
DataSinkFactory.class)
-                .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
-
-        // Create data sink
-        return sinkFactory.createDataSink(
-                new FactoryHelper.DefaultContext(
-                        sinkDef.getConfig(),
-                        pipelineConfig,
-                        Thread.currentThread().getContextClassLoader()));
-    }
-
     private void addFrameworkJars() {
         try {
             Set<URI> frameworkJars = new HashSet<>();
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index 1ac924e24..efbdb8686 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -22,12 +22,17 @@ import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.sink.DataSink;
 import org.apache.flink.cdc.common.sink.EventSinkProvider;
 import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
 import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
 import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
+import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
 import 
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -53,6 +58,25 @@ public class DataSinkTranslator {
     private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
     private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";
 
+    public DataSink createDataSink(
+            SinkDef sinkDef, Configuration pipelineConfig, 
StreamExecutionEnvironment env) {
+        // Search the data sink factory
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier(
+                        sinkDef.getType(), DataSinkFactory.class);
+
+        // Include sink connector JAR
+        FactoryDiscoveryUtils.getJarPathByIdentifier(sinkFactory)
+                .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+
+        // Create data sink
+        return sinkFactory.createDataSink(
+                new FactoryHelper.DefaultContext(
+                        sinkDef.getConfig(),
+                        pipelineConfig,
+                        Thread.currentThread().getContextClassLoader()));
+    }
+
     public void translate(
             SinkDef sinkDef,
             DataStream<Event> input,
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index ee9b17d7b..7b631c6f1 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -42,22 +42,8 @@ public class DataSourceTranslator {
 
     public DataStreamSource<Event> translate(
             SourceDef sourceDef, StreamExecutionEnvironment env, Configuration 
pipelineConfig) {
-        // Search the data source factory
-        DataSourceFactory sourceFactory =
-                FactoryDiscoveryUtils.getFactoryByIdentifier(
-                        sourceDef.getType(), DataSourceFactory.class);
-
         // Create data source
-        DataSource dataSource =
-                sourceFactory.createDataSource(
-                        new FactoryHelper.DefaultContext(
-                                sourceDef.getConfig(),
-                                pipelineConfig,
-                                
Thread.currentThread().getContextClassLoader()));
-
-        // Add source JAR to environment
-        FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(), 
DataSourceFactory.class)
-                .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+        DataSource dataSource = createDataSource(sourceDef, env, 
pipelineConfig);
 
         // Get source provider
         final int sourceParallelism = 
pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
@@ -91,6 +77,24 @@ public class DataSourceTranslator {
         }
     }
 
+    private DataSource createDataSource(
+            SourceDef sourceDef, StreamExecutionEnvironment env, Configuration 
pipelineConfig) {
+        // Search the data source factory
+        DataSourceFactory sourceFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier(
+                        sourceDef.getType(), DataSourceFactory.class);
+        // Add source JAR to environment
+        FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
+                .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+        DataSource dataSource =
+                sourceFactory.createDataSource(
+                        new FactoryHelper.DefaultContext(
+                                sourceDef.getConfig(),
+                                pipelineConfig,
+                                
Thread.currentThread().getContextClassLoader()));
+        return dataSource;
+    }
+
     private String generateDefaultSourceName(SourceDef sourceDef) {
         return String.format("Flink CDC Event Source: %s", 
sourceDef.getType());
     }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
index 4f7649d9e..1fd9e1692 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.cdc.common.factories.Factory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -42,6 +44,7 @@ public class FactoryDiscoveryUtils {
     private FactoryDiscoveryUtils() {}
 
     /** Returns the {@link Factory} for the given identifier. */
+    @Nonnull
     @SuppressWarnings("unchecked")
     public static <T extends Factory> T getFactoryByIdentifier(
             String identifier, Class<T> factoryClass) {
@@ -88,10 +91,8 @@ public class FactoryDiscoveryUtils {
     /**
      * Return the path of the jar file that contains the {@link Factory} for 
the given identifier.
      */
-    public static <T extends Factory> Optional<URL> getJarPathByIdentifier(
-            String identifier, Class<T> factoryClass) {
+    public static <T extends Factory> Optional<URL> getJarPathByIdentifier(T 
factory) {
         try {
-            T factory = getFactoryByIdentifier(identifier, factoryClass);
             URL url = 
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
             String urlString = url.toString();
             if (urlString.contains("usrlib")) {
@@ -110,7 +111,9 @@ public class FactoryDiscoveryUtils {
             return Optional.of(url);
         } catch (Exception e) {
             throw new RuntimeException(
-                    String.format("Failed to search JAR by factory identifier 
\"%s\"", identifier),
+                    String.format(
+                            "Failed to search JAR by factory identifier 
\"%s\"",
+                            factory.identifier()),
                     e);
         }
     }
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
index 5f6a9e62e..497b100ae 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
@@ -48,12 +48,4 @@ class FactoryDiscoveryUtilsTest {
                 .hasMessageStartingWith(
                         "Cannot find factory with identifier 
\"data-sink-factory-3\" in the classpath");
     }
-
-    @Test
-    void getJarPathByIdentifier() {
-        assertThat(
-                        FactoryDiscoveryUtils.getJarPathByIdentifier(
-                                "data-source-factory-1", Factory.class))
-                .isNotPresent();
-    }
 }

Reply via email to