starslink opened a new issue, #7577:
URL: https://github.com/apache/iceberg/issues/7577
### Query engine
Flink:1.16.0
Iceberg:1.2.0
jar version:iceberg-flink-runtime-1.16-1.2.0.jar
### Question
Below is a simple code to write lake data(Iceberg) using FlinkSQL.
```sql
SET execution.checkpointing.interval = 3s;
CREATE CATALOG `hadoop_catalog` WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://192.168.2.236:8020/warehouse/hadoop_catalog',
'property-version'='1'
);
INSERT INTO `hadoop_catalog`.`test001`.`sample` /*+
OPTIONS('upsert-enabled'='true') */ VALUES (99, '101') ;
```
The following is the code to read the data in iceberg in real time using
Flinksql.
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
environmentSettings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("pipeline.name", "GetCustomerToIceBerg");
configuration.setBoolean("table.dynamic-table-options.enabled", true);
// 设置检查点
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 状态存储 提供了HashMapStateBackend和EmbeddedRocksDBStateBackend
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///user/flink/checkpoints");
tEnv.executeSql("CREATE CATALOG `hadoop_catalog` WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'catalog-type'='hadoop',\n"
+ "
'warehouse'='hdfs://192.168.2.236:8020/warehouse/hadoop_catalog',\n"
+ " 'property-version'='1'\n"
+ ");");
tEnv.useCatalog("hadoop_catalog");
tEnv.executeSql("create database if not exists `test001`;");
tEnv.useDatabase("test001");
tEnv.executeSql("CREATE TABLE IF NOT EXISTS
`hadoop_catalog`.`test001`.`sample` (\n"
+ " `id` INT UNIQUE COMMENT 'unique id',\n"
+ " `data` STRING NOT NULL,\n"
+ " PRIMARY KEY(`id`) NOT ENFORCED\n"
+ ") with ('format-version'='2', 'write.upsert.enabled'='true');");
tEnv.executeSql(
"select * from `hadoop_catalog`.`test001`.`sample` /*+
OPTIONS('streaming'='true', 'monitor-interval'='3s')*/;").print();
```
The above code is very simple, it is a table with two fields, I have
carefully checked the data in hdfs, there is no problem. It can be read by
using spark sql, but not by using flinksqk. I have already set the checkpoint.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]