massquantity opened a new issue #3022:
URL: https://github.com/apache/iceberg/issues/3022


   I'm trying to use pyflink to read data from Kafka, then transform and append 
data into existing iceberg tables in AWS Glue Catalog.
   The catalog and tables have been created in iceberg using Spark 3 before. 
   
   This is actually working using Flink SQL Client, but maybe it is unrealistic 
to sink a large amount of data using the interactive SQL Client. 
   
   I've tried on Flink 1.11.1 and 1.12.4. In Flink 1.11.1, here is the code and 
error message:
   
   ```python
   from pyflink.datastream import StreamExecutionEnvironment
   from pyflink.table import StreamTableEnvironment
   
   env = StreamExecutionEnvironment.get_execution_environment()
   
   table_env = StreamTableEnvironment.create(env)
   table_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///home/iceberg-flink-runtime-0.12.0.jar;file:///home/bundle-2.15.40.jar;file:///home/url-connection-client-2.15.40.jar")
   
   table_env.execute_sql("""CREATE CATALOG lake_catalog WITH (
     'type'='iceberg',
     'warehouse'='s3://bucket',
     'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
     'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
   ```
   
   ```shell
   py4j.protocol.Py4JJavaError: An error occurred while calling o30.executeSql.
   : org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find 
a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' 
in
   the classpath.
   
   Reason: Required context properties mismatch.
   
   The following properties are requested:
   catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
   io-impl=org.apache.iceberg.aws.s3.S3FileIO
   type=iceberg
   warehouse=s3://bucket
   
   The following factories have been considered:
   org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
        at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
        at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
        at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
        at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1080)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1019)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   And in Flink 1.12.4, the code and error message: 
   
   ```python
   from pyflink.datastream import StreamExecutionEnvironment
   from pyflink.table import StreamTableEnvironment
   
   env = StreamExecutionEnvironment.get_execution_environment()
   env.add_jars("file:///home/iceberg-flink-runtime-0.12.0.jar")
   env.add_jars("file:///home/bundle-2.15.40.jar")
   env.add_jars("file:///home/url-connection-client-2.15.40.jar")
   
   table_env = StreamTableEnvironment.create(env)
   table_env.execute_sql("""CREATE CATALOG lake_catalog WITH (
     'type'='iceberg',
     'warehouse'='s3://bucket',
     'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
     'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
   ```
   
   ```shell
   Py4JJavaError: An error occurred while calling o39.executeSql.
   : java.lang.IllegalArgumentException: Cannot initialize Catalog 
implementation org.apache.iceberg.aws.glue.GlueCatalog: Cannot find constructor 
for interface org.apache.iceberg.catalog.Catalog
        Missing org.apache.iceberg.aws.glue.GlueCatalog 
[java.lang.NoClassDefFoundError: 
software/amazon/awssdk/services/glue/model/EntityNotFoundException]
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:176)
        at 
org.apache.iceberg.flink.CatalogLoader$CustomCatalogLoader.loadCatalog(CatalogLoader.java:146)
        at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:110)
        at 
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:131)
        at 
org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:118)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1121)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1019)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: java.lang.NoSuchMethodException: Cannot find constructor for 
interface org.apache.iceberg.catalog.Catalog
        Missing org.apache.iceberg.aws.glue.GlueCatalog 
[java.lang.NoClassDefFoundError: 
software/amazon/awssdk/services/glue/model/EntityNotFoundException]
        at 
org.apache.iceberg.common.DynConstructors$Builder.buildChecked(DynConstructors.java:227)
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:174)
        ... 18 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to