a8356555 opened a new issue, #8916: URL: https://github.com/apache/iceberg/issues/8916
### Apache Iceberg version 1.4.0 ### Query engine Flink ### Please describe the bug π I'm using following Dockerfile as my environment: ```Dockerfile FROM alpine:3.17.0 AS builder # Download required jars WORKDIR /tmp/download/my-jars RUN wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.0/iceberg-flink-runtime-1.16-1.4.0.jar RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.263/bundle-2.17.263.jar RUN wget https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.263/url-connection-client-2.17.263.jar FROM apache/flink:1.16.2-scala_2.12-java8 AS runtime # Install Python 3.8 & git & PyFlink RUN apt-get update && apt-get install -y software-properties-common && \ add-apt-repository -y ppa:deadsnakes/ppa && \ apt-get remove -y software-properties-common && apt-get autoremove -y && apt-get clean RUN apt-get update && apt-get install -y python3.8 python3-pip python3.8-distutils git && apt-get clean RUN python3.8 -m pip install --upgrade pip RUN python3.8 -m pip install apache-flink==1.16.2 --no-cache-dir Install Hadoop & export Hadoop classpath WORKDIR /tmp/download/my-hadoop RUN wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz && \ tar xzf hadoop-3.3.4.tar.gz && \ mv hadoop-3.3.4 /opt/hadoop-3.3.4 && \ rm hadoop-3.3.4.tar.gz ENV HADOOP_HOME=/opt/hadoop-3.3.4 ENV HADOOP_CLASSPATH=/opt/hadoop-3.3.4/etc/hadoop:/opt/hadoop-3.3.4/share/hadoop/common/lib/*:/opt/hadoop-3.3.4/share/hadoop/common/*:/opt/hadoop-3.3.4/share/hadoop/hdfs:/opt/hadoop-3.3.4/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.4/share/hadoop/hdfs/*:/opt/hadoop-3.3.4/share/hadoop/mapreduce/*:/opt/hadoop-3.3.4/share/hadoop/yarn:/opt/hadoop-3.3.4/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.4/share/hadoop/yarn/* Copy jars from builder stage COPY --from=builder /tmp/download/my-jars/. /opt/flink/lib/. ``` Here is my pyflink code (job.py) ```python import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common import Types ICEBERG_GLUE_WAREHOUSE = os.environ['ICEBERG_GLUE_WAREHOUSE'] ICEBERG_GLUE_DATABASE_NAME_SRC = os.environ['ICEBERG_GLUE_DATABASE_NAME_SRC'] ICEBERG_GLUE_DATABASE_NAME_DST = os.environ['ICEBERG_GLUE_DATABASE_NAME_DST'] ICEBERG_GLUE_TABLE_NAME_SRC = os.environ['ICEBERG_GLUE_TABLE_NAME_SRC'] ICEBERG_GLUE_TABLE_NAME_DST = os.environ['ICEBERG_GLUE_TABLE_NAME_DST'] env = StreamExecutionEnvironment.get_execution_environment() env.disable_operator_chaining() env.set_parallelism(1) env.enable_checkpointing(60000) # 60s checkpoint_config = env.get_checkpoint_config() checkpoint_config.set_checkpoint_storage_dir('file:///tmp/checkpoint') stenv = StreamTableEnvironment.create(env) stenv.get_config().get_configuration().set_string('table.local-time-zone', 'UTC') stenv.execute_sql(f''' CREATE TEMPORARY TABLE `mytable` ( `t` TIMESTAMP, `table` STRING, `op` STRING, `before` MAP<STRING, STRING>, `after` MAP<STRING, STRING>, `_kc_tx` TIMESTAMP, `_kc_source` MAP<STRING, STRING>, `_kafka` ROW<`topic` STRING, `partition` INT, `offset` BIGINT, `timestamp` TIMESTAMP> ) WITH ( 'connector' = 'iceberg', 'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}', 'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'catalog-name' = 'mycatalog', 'catalog-database' = '{ICEBERG_GLUE_DATABASE_NAME_SRC}', 'catalog-table' = '{ICEBERG_GLUE_TABLE_NAME_SRC}' ); ''') stenv.execute_sql(f''' CREATE CATALOG `mycatalog` WITH ( 'type' = 'iceberg', 'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}' ); ''') stenv.execute_sql(f''' CREATE TABLE IF NOT EXISTS `mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}` ( `t` TIMESTAMP, `table` STRING, `op` STRING, PRIMARY KEY (`table`) NOT ENFORCED ) PARTITIONED BY (`table`) WITH ( 'format-version'='2', 'write.upsert.enabled'='true' ); ''') type_info_datom = Types.ROW_NAMED( field_names=[ 't', 'table', 'op' ], field_types=[ Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING() ] ) sql = f''' select `t`, `table`, `op` from `table_src` /*+ OPTIONS('streaming'='true', 'monitor-interval'='300s')*/ ''' cdc_datom_stream = stenv.to_append_stream( stenv.sql_query(sql), type_info_datom ) stenv.create_temporary_view('event', cdc_datom_stream) stenv.execute_sql(f''' INSERT INTO `mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}` /*+ OPTIONS('upsert-enabled'='true', 'write-parallelism'='1') */ SELECT `t`, `table` ,`op` FROM mytable; ''') ``` I'm running this code using ``` flink run -pyexec /usr/bin/python3.8 -pyclientexec /usr/bin/python3.8 -py job.py ``` However, the job never write data into my iceberg in s3 here is the dashboard  Why is Iceberg StreamWriter stuck? There's no error in the jobmanager's log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
