[ 
https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khanh Vu updated FLINK-34076:
-----------------------------
    Description: 
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>${flink.connector.kinesis.version}</version>
        </dependency>
{code}

and a minimal job definition:

{code:java}
        public static void main(String[] args) throws Exception {
                // create data stream environment
                StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
                StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(sEnv);

                Schema a = Schema.newBuilder().column("a", 
DataTypes.STRING()).build();
                TableDescriptor descriptor =
                                TableDescriptor.forConnector("kinesis")
                                                .schema(a)
                                                .format("json")
                                                .option("stream", "abc")
                                                .option("aws.region", 
"eu-central-1")
                                                .build();
                tEnv.createTemporaryTable("sinkTable", descriptor);

                tEnv.executeSql("CREATE TABLE sinkTable " + 
descriptor.toString()).print();
        }
{code}

following exception will be thrown:
{code:java}
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
        at 
jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
~[?:?]
        at 
jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
 ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
        ... 28 more
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kinesis</artifactId>
                        <version>${flink.connector.kinesis.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-base</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>
{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the kinesis connector, the current separation adds unnecessary hassle to use 
the connector.

  was:
The 
[commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
 which stops bundling `flink-connector-base` with `flink-connector-kinesis` has 
caused kinesis sink failing to create when using Table API as required classes 
from `flink-connector-base` are not loaded in runtime.

E.g. with following depenency only in pom.xml
{code:java}
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kinesis</artifactId>
            <version>${flink.connector.kinesis.version}</version>
        </dependency>
{code}

following exception will be thrown:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Connector 'kinesis' 
can only be used as a source. It cannot be used as a sink.
at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:756)
at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
{code}

The fix is to explicitly specify `flink-connector-base` as dependency of the 
project:

{code:java}
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kinesis</artifactId>
                        <version>${flink.connector.kinesis.version}</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-base</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>
{code}

In general, `flink-connector-base` should be pulled in by default when pulling 
in the connector, the current separation adds unnecessary hassle to use the 
connector.


> flink-connector-base missing fails kinesis table sink to create
> ---------------------------------------------------------------
>
>                 Key: FLINK-34076
>                 URL: https://issues.apache.org/jira/browse/FLINK-34076
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-4.2.0
>            Reporter: Khanh Vu
>            Priority: Major
>
> The 
> [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a]
>  which stops bundling `flink-connector-base` with `flink-connector-kinesis` 
> has caused kinesis sink failing to create when using Table API as required 
> classes from `flink-connector-base` are not loaded in runtime.
> E.g. with following depenency only in pom.xml
> {code:java}
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kinesis</artifactId>
>             <version>${flink.connector.kinesis.version}</version>
>         </dependency>
> {code}
> and a minimal job definition:
> {code:java}
>       public static void main(String[] args) throws Exception {
>               // create data stream environment
>               StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>               StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(sEnv);
>               Schema a = Schema.newBuilder().column("a", 
> DataTypes.STRING()).build();
>               TableDescriptor descriptor =
>                               TableDescriptor.forConnector("kinesis")
>                                               .schema(a)
>                                               .format("json")
>                                               .option("stream", "abc")
>                                               .option("aws.region", 
> "eu-central-1")
>                                               .build();
>               tEnv.createTemporaryTable("sinkTable", descriptor);
>               tEnv.executeSql("CREATE TABLE sinkTable " + 
> descriptor.toString()).print();
>       }
> {code}
> following exception will be thrown:
> {code:java}
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory
>       at 
> jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) 
> ~[?:?]
>       at 
> jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[?:?]
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?]
>       ... 28 more
> {code}
> The fix is to explicitly specify `flink-connector-base` as dependency of the 
> project:
> {code:java}
>               <dependency>
>                       <groupId>org.apache.flink</groupId>
>                       <artifactId>flink-connector-kinesis</artifactId>
>                       <version>${flink.connector.kinesis.version}</version>
>               </dependency>
>               <dependency>
>                       <groupId>org.apache.flink</groupId>
>                       <artifactId>flink-connector-base</artifactId>
>                       <version>${flink.version}</version>
>                       <scope>provided</scope>
>               </dependency>
> {code}
> In general, `flink-connector-base` should be pulled in by default when 
> pulling in the kinesis connector, the current separation adds unnecessary 
> hassle to use the connector.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to