[
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Khanh Vu updated FLINK-34076:
-----------------------------
Description:
The
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
which stops bundling `flink-connector-base` with `flink-connector-kinesis` has
caused kinesis sink failing to create when using Table API as required classes
from `flink-connector-base` are not loaded in runtime.
E.g. with following depenency only in pom.xml
{code:java}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.connector.kinesis.version}</version>
</dependency>
{code}
and a minimal job definition:
{code:java}
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(sEnv);
Schema a = Schema.newBuilder().column("a",
DataTypes.STRING()).build();
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.option("stream", "abc")
.option("aws.region",
"eu-central-1")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);
tEnv.executeSql("CREATE TABLE sinkTable " +
descriptor.toString()).print();
}
{code}
following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
at
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
~[?:?]
at
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
... 28 more
{code}
The fix is to explicitly specify `flink-connector-base` as dependency of the
project:
{code:java}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.connector.kinesis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
{code}
In general, `flink-connector-base` should be pulled in by default when pulling
in the kinesis connector, the current separation adds unnecessary hassle to use
the connector.
was:
The
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
which stops bundling `flink-connector-base` with `flink-connector-kinesis` has
caused kinesis sink failing to create when using Table API as required classes
from `flink-connector-base` are not loaded in runtime.
E.g. with following depenency only in pom.xml
{code:java}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.connector.kinesis.version}</version>
</dependency>
{code}
following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis'
can only be used as a source. It cannot be used as a sink.
at
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}
The fix is to explicitly specify `flink-connector-base` as dependency of the
project:
{code:java}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<version>${flink.connector.kinesis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
{code}
In general, `flink-connector-base` should be pulled in by default when pulling
in the connector, the current separation adds unnecessary hassle to use the
connector.
> flink-connector-base missing fails kinesis table sink to create
> ---------------------------------------------------------------
>
> Key: FLINK-34076
> URL: https://issues.apache.org/jira/browse/FLINK-34076
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: aws-connector-4.2.0
> Reporter: Khanh Vu
> Priority: Major
>
> The
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
> which stops bundling `flink-connector-base` with `flink-connector-kinesis`
> has caused kinesis sink failing to create when using Table API as required
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kinesis</artifactId>
> <version>${flink.connector.kinesis.version}</version>
> </dependency>
> {code}
> and a minimal job definition:
> {code:java}
> public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a",
> DataTypes.STRING()).build();
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region",
> "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.executeSql("CREATE TABLE sinkTable " +
> descriptor.toString()).print();
> }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
> at
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
> ~[?:?]
> at
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> ~[?:?]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
> ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the
> project:
> {code:java}
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kinesis</artifactId>
> <version>${flink.connector.kinesis.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-base</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> {code}
> In general, `flink-connector-base` should be pulled in by default when
> pulling in the kinesis connector, the current separation adds unnecessary
> hassle to use the connector.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)