This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b361db58a [FLINK-36007][[cdc-composer] Loading factory and added jar
in one search
b361db58a is described below
commit b361db58ab19021a7ec23742cf7779f9e2eb65b5
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Aug 9 23:12:04 2024 +0800
[FLINK-36007][[cdc-composer] Loading factory and added jar in one search
This close #3520.
---
.../cdc/composer/flink/FlinkPipelineComposer.java | 28 ++----------------
.../flink/translator/DataSinkTranslator.java | 24 +++++++++++++++
.../flink/translator/DataSourceTranslator.java | 34 ++++++++++++----------
.../cdc/composer/utils/FactoryDiscoveryUtils.java | 11 ++++---
.../composer/utils/FactoryDiscoveryUtilsTest.java | 8 -----
5 files changed, 53 insertions(+), 52 deletions(-)
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index ffb328f1f..ca4378ad1 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -18,24 +18,19 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.factories.DataSinkFactory;
-import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
-import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
-import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -130,7 +125,9 @@ public class FlinkPipelineComposer implements
PipelineComposer {
pipelineDef.getUdfs());
// Build DataSink in advance as schema operator requires
MetadataApplier
- DataSink dataSink = createDataSink(pipelineDef.getSink(),
pipelineDef.getConfig());
+ DataSinkTranslator sinkTranslator = new DataSinkTranslator();
+ DataSink dataSink =
+ sinkTranslator.createDataSink(pipelineDef.getSink(),
pipelineDef.getConfig(), env);
stream =
schemaOperatorTranslator.translate(
@@ -152,7 +149,6 @@ public class FlinkPipelineComposer implements
PipelineComposer {
dataSink.getDataChangeEventHashFunctionProvider());
// Build Sink Operator
- DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink,
schemaOperatorIDGenerator.generate());
@@ -163,24 +159,6 @@ public class FlinkPipelineComposer implements
PipelineComposer {
env,
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
- private DataSink createDataSink(SinkDef sinkDef, Configuration
pipelineConfig) {
- // Search the data sink factory
- DataSinkFactory sinkFactory =
- FactoryDiscoveryUtils.getFactoryByIdentifier(
- sinkDef.getType(), DataSinkFactory.class);
-
- // Include sink connector JAR
- FactoryDiscoveryUtils.getJarPathByIdentifier(sinkDef.getType(),
DataSinkFactory.class)
- .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
-
- // Create data sink
- return sinkFactory.createDataSink(
- new FactoryHelper.DefaultContext(
- sinkDef.getConfig(),
- pipelineConfig,
- Thread.currentThread().getContextClassLoader()));
- }
-
private void addFrameworkJars() {
try {
Set<URI> frameworkJars = new HashSet<>();
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index 1ac924e24..efbdb8686 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -22,12 +22,17 @@ import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
+import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -53,6 +58,25 @@ public class DataSinkTranslator {
private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";
+ public DataSink createDataSink(
+ SinkDef sinkDef, Configuration pipelineConfig,
StreamExecutionEnvironment env) {
+ // Search the data sink factory
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ sinkDef.getType(), DataSinkFactory.class);
+
+ // Include sink connector JAR
+ FactoryDiscoveryUtils.getJarPathByIdentifier(sinkFactory)
+ .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+
+ // Create data sink
+ return sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ sinkDef.getConfig(),
+ pipelineConfig,
+ Thread.currentThread().getContextClassLoader()));
+ }
+
public void translate(
SinkDef sinkDef,
DataStream<Event> input,
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
index ee9b17d7b..7b631c6f1 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
@@ -42,22 +42,8 @@ public class DataSourceTranslator {
public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration
pipelineConfig) {
- // Search the data source factory
- DataSourceFactory sourceFactory =
- FactoryDiscoveryUtils.getFactoryByIdentifier(
- sourceDef.getType(), DataSourceFactory.class);
-
// Create data source
- DataSource dataSource =
- sourceFactory.createDataSource(
- new FactoryHelper.DefaultContext(
- sourceDef.getConfig(),
- pipelineConfig,
-
Thread.currentThread().getContextClassLoader()));
-
- // Add source JAR to environment
- FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(),
DataSourceFactory.class)
- .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+ DataSource dataSource = createDataSource(sourceDef, env,
pipelineConfig);
// Get source provider
final int sourceParallelism =
pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
@@ -91,6 +77,24 @@ public class DataSourceTranslator {
}
}
+ private DataSource createDataSource(
+ SourceDef sourceDef, StreamExecutionEnvironment env, Configuration
pipelineConfig) {
+ // Search the data source factory
+ DataSourceFactory sourceFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ sourceDef.getType(), DataSourceFactory.class);
+ // Add source JAR to environment
+ FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
+ .ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
+ DataSource dataSource =
+ sourceFactory.createDataSource(
+ new FactoryHelper.DefaultContext(
+ sourceDef.getConfig(),
+ pipelineConfig,
+
Thread.currentThread().getContextClassLoader()));
+ return dataSource;
+ }
+
private String generateDefaultSourceName(SourceDef sourceDef) {
return String.format("Flink CDC Event Source: %s",
sourceDef.getType());
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
index 4f7649d9e..1fd9e1692 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.cdc.common.factories.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -42,6 +44,7 @@ public class FactoryDiscoveryUtils {
private FactoryDiscoveryUtils() {}
/** Returns the {@link Factory} for the given identifier. */
+ @Nonnull
@SuppressWarnings("unchecked")
public static <T extends Factory> T getFactoryByIdentifier(
String identifier, Class<T> factoryClass) {
@@ -88,10 +91,8 @@ public class FactoryDiscoveryUtils {
/**
* Return the path of the jar file that contains the {@link Factory} for
the given identifier.
*/
- public static <T extends Factory> Optional<URL> getJarPathByIdentifier(
- String identifier, Class<T> factoryClass) {
+ public static <T extends Factory> Optional<URL> getJarPathByIdentifier(T
factory) {
try {
- T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url =
factory.getClass().getProtectionDomain().getCodeSource().getLocation();
String urlString = url.toString();
if (urlString.contains("usrlib")) {
@@ -110,7 +111,9 @@ public class FactoryDiscoveryUtils {
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Failed to search JAR by factory identifier
\"%s\"", identifier),
+ String.format(
+ "Failed to search JAR by factory identifier
\"%s\"",
+ factory.identifier()),
e);
}
}
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
index 5f6a9e62e..497b100ae 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
@@ -48,12 +48,4 @@ class FactoryDiscoveryUtilsTest {
.hasMessageStartingWith(
"Cannot find factory with identifier
\"data-sink-factory-3\" in the classpath");
}
-
- @Test
- void getJarPathByIdentifier() {
- assertThat(
- FactoryDiscoveryUtils.getJarPathByIdentifier(
- "data-source-factory-1", Factory.class))
- .isNotPresent();
- }
}