starslink opened a new issue, #7577:
URL: https://github.com/apache/iceberg/issues/7577

   ### Query engine
   
   Flink:1.16.0
   Iceberg:1.2.0
   jar version:iceberg-flink-runtime-1.16-1.2.0.jar
   
   ### Question
   
   Below is a simple code to write lake data(Iceberg) using FlinkSQL.
   ```sql
   SET execution.checkpointing.interval = 3s;
   
   CREATE CATALOG `hadoop_catalog` WITH (
     'type'='iceberg',
     'catalog-type'='hadoop',
     'warehouse'='hdfs://192.168.2.236:8020/warehouse/hadoop_catalog',
     'property-version'='1'
   );
   
   INSERT INTO `hadoop_catalog`.`test001`.`sample` /*+ 
OPTIONS('upsert-enabled'='true') */ VALUES (99, '101') ;
   ```
   The following is the code to read the data in iceberg in real time using 
Flinksql.
   ```java
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   EnvironmentSettings environmentSettings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
environmentSettings);
   Configuration configuration = tEnv.getConfig().getConfiguration();
   configuration.setString("pipeline.name", "GetCustomerToIceBerg");
   configuration.setBoolean("table.dynamic-table-options.enabled", true);
   // 设置检查点
   env.enableCheckpointing(3000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
   env.getCheckpointConfig().setCheckpointTimeout(60000);
   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
   // 状态存储 提供了HashMapStateBackend和EmbeddedRocksDBStateBackend
   env.setStateBackend(new HashMapStateBackend());
   
env.getCheckpointConfig().setCheckpointStorage("hdfs:///user/flink/checkpoints");
   
   tEnv.executeSql("CREATE CATALOG `hadoop_catalog` WITH (\n"
           + "  'type'='iceberg',\n"
           + "  'catalog-type'='hadoop',\n"
           + "  
'warehouse'='hdfs://192.168.2.236:8020/warehouse/hadoop_catalog',\n"
           + "  'property-version'='1'\n"
           + ");");
   
   tEnv.useCatalog("hadoop_catalog");
   
   tEnv.executeSql("create database if not exists `test001`;");
   
   tEnv.useDatabase("test001");
   
   tEnv.executeSql("CREATE TABLE IF NOT EXISTS 
`hadoop_catalog`.`test001`.`sample` (\n"
           + "  `id`  INT UNIQUE COMMENT 'unique id',\n"
           + "  `data` STRING NOT NULL,\n"
           + " PRIMARY KEY(`id`) NOT ENFORCED\n"
           + ") with ('format-version'='2', 'write.upsert.enabled'='true');");
   tEnv.executeSql(
           "select * from `hadoop_catalog`.`test001`.`sample`  /*+ 
OPTIONS('streaming'='true', 'monitor-interval'='3s')*/;").print();
   ```
   The above code is very simple, it is a table with two fields, I have 
carefully checked the data in hdfs, there is no problem. It can be read by 
using spark sql, but not by using flinksqk. I have already set the checkpoint.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to