massquantity opened a new issue #3022:
URL: https://github.com/apache/iceberg/issues/3022
I'm trying to use pyflink to read data from Kafka, then transform and append
data into existing iceberg tables in AWS Glue Catalog.
The catalog and tables have been created in iceberg using Spark 3 before.
This is actually working using Flink SQL Client, but maybe it is unrealistic
to sink a large amount of data using the interactive SQL Client.
I've tried on Flink 1.11.1 and 1.12.4. In Flink 1.11.1, here is the code and
error message:
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
table_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///home/iceberg-flink-runtime-0.12.0.jar;file:///home/bundle-2.15.40.jar;file:///home/url-connection-client-2.15.40.jar")
table_env.execute_sql("""CREATE CATALOG lake_catalog WITH (
'type'='iceberg',
'warehouse'='s3://bucket',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
```
```shell
py4j.protocol.Py4JJavaError: An error occurred while calling o30.executeSql.
: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory'
in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
io-impl=org.apache.iceberg.aws.s3.S3FileIO
type=iceberg
warehouse=s3://bucket
The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1080)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1019)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
And in Flink 1.12.4, the code and error message:
```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///home/iceberg-flink-runtime-0.12.0.jar")
env.add_jars("file:///home/bundle-2.15.40.jar")
env.add_jars("file:///home/url-connection-client-2.15.40.jar")
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""CREATE CATALOG lake_catalog WITH (
'type'='iceberg',
'warehouse'='s3://bucket',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
```
```shell
Py4JJavaError: An error occurred while calling o39.executeSql.
: java.lang.IllegalArgumentException: Cannot initialize Catalog
implementation org.apache.iceberg.aws.glue.GlueCatalog: Cannot find constructor
for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.aws.glue.GlueCatalog
[java.lang.NoClassDefFoundError:
software/amazon/awssdk/services/glue/model/EntityNotFoundException]
at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:176)
at
org.apache.iceberg.flink.CatalogLoader$CustomCatalogLoader.loadCatalog(CatalogLoader.java:146)
at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:110)
at
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:131)
at
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:118)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1121)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1019)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodException: Cannot find constructor for
interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.aws.glue.GlueCatalog
[java.lang.NoClassDefFoundError:
software/amazon/awssdk/services/glue/model/EntityNotFoundException]
at
org.apache.iceberg.common.DynConstructors$Builder.buildChecked(DynConstructors.java:227)
at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:174)
... 18 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]