kingeasternsun edited a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971
Hello everyOne, I think May be we can just use iceberg connector to solve
this , no need to modify code. @hameizi @yittg
first of all, use flink to create a iceberg table without `Computed
columns` or watermark, like this:
```java
tenv.executeSql("CREATE CATALOG iceberg_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive'," +
" 'uri'='thrift://host:9083'," +
" 'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
")");
tenv.useCatalog("iceberg_catalog");
tenv.useDatabase("iceberg_db");
String tableName = "iceberg_origin";
tenv.executeSql("create table if not exists " + tableName + "(\n" +
" order_id STRING,\n" +
" price DECIMAL(32,2),\n" +
" currency STRING,\n" +
" origin_order_time TIMESTAMP(6)\n" +
") WITH (\n" +
" 'format.version' = '2'\n" +
")"
);
```
then use iceberg connector to create a Table source with `Computed columns`
and watermark support,like this @stevenzwu
```java
tenv.executeSql("create table if not exists
default_catalog.default_database.iceberg_table (\n" +
" order_id STRING,\n" +
" price DECIMAL(32,2),\n" +
" currency STRING,\n" +
" origin_order_time TIMESTAMP(6),\n" +
" order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE\n" +
") WITH (\n" +
" 'connector' = 'iceberg',\n" +
" 'catalog-name' = 'iceberg_catalog',\n" +
" 'catalog-database' = 'iceberg_db',\n" +
" 'catalog-table' = 'iceberg_origin',\n" +
" 'uri'='thrift://host:9083'," +
" 'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
")"
);
```
if different watermark strategy is needed, we just need create another table
source . for example, 10 MINUTES watermark
```java
tenv.executeSql("create table if not exists
default_catalog.default_database.iceberg_table (\n" +
" order_id STRING,\n" +
" price DECIMAL(32,2),\n" +
" currency STRING,\n" +
" origin_order_time TIMESTAMP(6),\n" +
" order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
" WATERMARK FOR order_time AS order_time - INTERVAL '10' MINUTE\n" +
") WITH (\n" +
" 'connector' = 'iceberg',\n" +
" 'catalog-name' = 'iceberg_catalog',\n" +
" 'catalog-database' = 'iceberg_db',\n" +
" 'catalog-table' = 'iceberg_origin',\n" +
" 'uri'='thrift://host:9083'," +
" 'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
")"
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]