a8356555 opened a new issue, #8916:
URL: https://github.com/apache/iceberg/issues/8916

   ### Apache Iceberg version
   
   1.4.0
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   I'm using following Dockerfile as my environment:
   ```Dockerfile
   FROM alpine:3.17.0 AS builder
   
   # Download required jars
   WORKDIR /tmp/download/my-jars
   RUN wget 
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.4.0/iceberg-flink-runtime-1.16-1.4.0.jar
   RUN wget 
https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.263/bundle-2.17.263.jar
   RUN wget 
https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.263/url-connection-client-2.17.263.jar
   
   FROM apache/flink:1.16.2-scala_2.12-java8 AS runtime
   
   # Install Python 3.8 & git & PyFlink
   RUN apt-get update && apt-get install -y software-properties-common &&  \
       add-apt-repository -y ppa:deadsnakes/ppa &&  \
       apt-get remove -y software-properties-common && apt-get autoremove -y && 
apt-get clean
   RUN apt-get update && apt-get install -y python3.8 python3-pip 
python3.8-distutils git && apt-get clean
   RUN python3.8 -m pip install --upgrade pip
   RUN python3.8 -m pip install apache-flink==1.16.2 --no-cache-dir
   
   Install Hadoop & export Hadoop classpath
   WORKDIR /tmp/download/my-hadoop
   RUN wget 
https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz &&  \
       tar xzf hadoop-3.3.4.tar.gz &&  \
       mv hadoop-3.3.4 /opt/hadoop-3.3.4 &&  \
       rm hadoop-3.3.4.tar.gz
   ENV HADOOP_HOME=/opt/hadoop-3.3.4
   ENV 
HADOOP_CLASSPATH=/opt/hadoop-3.3.4/etc/hadoop:/opt/hadoop-3.3.4/share/hadoop/common/lib/*:/opt/hadoop-3.3.4/share/hadoop/common/*:/opt/hadoop-3.3.4/share/hadoop/hdfs:/opt/hadoop-3.3.4/share/hadoop/hdfs/lib/*:/opt/hadoop-3.3.4/share/hadoop/hdfs/*:/opt/hadoop-3.3.4/share/hadoop/mapreduce/*:/opt/hadoop-3.3.4/share/hadoop/yarn:/opt/hadoop-3.3.4/share/hadoop/yarn/lib/*:/opt/hadoop-3.3.4/share/hadoop/yarn/*
   
   Copy jars from builder stage
   COPY --from=builder /tmp/download/my-jars/. /opt/flink/lib/.
   ```
   Here is my pyflink code (job.py)
   ```python
   
   import os
   
   from pyflink.datastream import StreamExecutionEnvironment
   from pyflink.table import StreamTableEnvironment
   from pyflink.common import Types
   
   ICEBERG_GLUE_WAREHOUSE = os.environ['ICEBERG_GLUE_WAREHOUSE']
   ICEBERG_GLUE_DATABASE_NAME_SRC = os.environ['ICEBERG_GLUE_DATABASE_NAME_SRC']
   ICEBERG_GLUE_DATABASE_NAME_DST = os.environ['ICEBERG_GLUE_DATABASE_NAME_DST']
   ICEBERG_GLUE_TABLE_NAME_SRC = os.environ['ICEBERG_GLUE_TABLE_NAME_SRC']
   ICEBERG_GLUE_TABLE_NAME_DST = os.environ['ICEBERG_GLUE_TABLE_NAME_DST']
   
   env = StreamExecutionEnvironment.get_execution_environment()
   env.disable_operator_chaining()
   env.set_parallelism(1)
   env.enable_checkpointing(60000) # 60s
   checkpoint_config = env.get_checkpoint_config()
   checkpoint_config.set_checkpoint_storage_dir('file:///tmp/checkpoint')
   
   stenv = StreamTableEnvironment.create(env)
   stenv.get_config().get_configuration().set_string('table.local-time-zone', 
'UTC')
   
   stenv.execute_sql(f'''
   CREATE TEMPORARY TABLE `mytable` (
                   `t`             TIMESTAMP,
                   `table`         STRING,
                   `op`            STRING,
                   `before`        MAP<STRING, STRING>,
                   `after`         MAP<STRING, STRING>,
                   `_kc_tx`        TIMESTAMP,
                   `_kc_source`    MAP<STRING, STRING>,
                   `_kafka`        ROW<`topic` STRING, `partition` INT, 
`offset` BIGINT, `timestamp` TIMESTAMP>
               ) WITH (
                   'connector' = 'iceberg',
                   'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}',
                   'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
                   'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',  
                   'catalog-name' = 'mycatalog',
                   'catalog-database' = '{ICEBERG_GLUE_DATABASE_NAME_SRC}',
                   'catalog-table' = '{ICEBERG_GLUE_TABLE_NAME_SRC}'
                 );
   ''')
   
   
   stenv.execute_sql(f'''
   CREATE CATALOG `mycatalog` 
           WITH (
               'type' = 'iceberg',
               'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
               'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',  
               'warehouse' = '{ICEBERG_GLUE_WAREHOUSE}'
           );
   ''')
   
   stenv.execute_sql(f'''
   CREATE TABLE IF NOT EXISTS 
`mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}`
           (
               `t` TIMESTAMP,
               `table` STRING,
               `op` STRING,
               PRIMARY KEY (`table`) NOT ENFORCED
           ) 
           PARTITIONED BY (`table`)
           WITH (
               'format-version'='2',
               'write.upsert.enabled'='true'
           );
   ''')
   
   type_info_datom = Types.ROW_NAMED(
                                       field_names=[
                                           't',
                                           'table',
                                           'op'
                                       ],
                                       field_types=[
                                           Types.SQL_TIMESTAMP(),
                                           Types.STRING(),
                                           Types.STRING()
                                       ]
                                   )
   
   
   sql = f'''
       select 
           `t`,
           `table`,
           `op`
       from `table_src` 
       /*+ OPTIONS('streaming'='true', 'monitor-interval'='300s')*/
   '''
   
   
   cdc_datom_stream = stenv.to_append_stream(
       stenv.sql_query(sql),
       type_info_datom
   )
   
   
   stenv.create_temporary_view('event', cdc_datom_stream)
   
   stenv.execute_sql(f'''
   INSERT INTO 
`mycatalog`.`{ICEBERG_GLUE_DATABASE_NAME_DST}`.`{ICEBERG_GLUE_TABLE_NAME_DST}` 
/*+ OPTIONS('upsert-enabled'='true', 'write-parallelism'='1') */
       SELECT 
           `t`, `table` ,`op`
       FROM mytable;
   
   ''')
   ```
   I'm running this code using
   ```
   flink run -pyexec /usr/bin/python3.8 -pyclientexec /usr/bin/python3.8 -py 
job.py
   ```
   
   However, the job never write data into my iceberg in s3
   here is the dashboard
   
   ![ζˆͺεœ– 2023-10-24 δΈ‹εˆ7 20 
03](https://github.com/apache/iceberg/assets/59360204/fe396cc0-38a1-4407-a475-c0e0fa233f6e)
   
   Why is Iceberg StreamWriter stuck? There's no error in the jobmanager's log


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to