kingeasternsun edited a comment on pull request #3681:
URL: https://github.com/apache/iceberg/pull/3681#issuecomment-1064887971


   Hello everyOne, I think May be we can just use iceberg connector to solve 
this , no need to modify code. @hameizi  @yittg 
   
   first of all,  use flink to create a iceberg table without  `Computed 
columns` or watermark, like this:
   ```java
       tenv.executeSql("CREATE CATALOG iceberg_catalog WITH (\n" +
           "  'type'='iceberg',\n" +
           "  'catalog-type'='hive'," +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")");
   
       tenv.useCatalog("iceberg_catalog");
       tenv.useDatabase("iceberg_db");
   
       String tableName = "iceberg_origin";
   
       tenv.executeSql("create table if not exists  " + tableName + "(\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6)\n" +
           ") WITH (\n" +
           " 'format.version' = '2'\n" +
           ")"
       );
   ```
   
   then use iceberg connector to create a Table source with `Computed columns`  
and watermark support,like this
   ```java
       tenv.executeSql("create table if not exists  
default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '5' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```
   
   if different watermark strategy is needed, we just need create another table 
source . for example, 10 MINUTES watermark 
   ```java
       tenv.executeSql("create table if not exists  
default_catalog.default_database.iceberg_table (\n" +
           " order_id    STRING,\n" +
           " price       DECIMAL(32,2),\n" +
           " currency    STRING,\n" +
           " origin_order_time TIMESTAMP(6),\n" +
           " order_time as cast(origin_order_time as TIMESTAMP(3)),\n" +
           " WATERMARK FOR order_time AS order_time - INTERVAL '10' MINUTE\n" +
           ") WITH (\n" +
           " 'connector' = 'iceberg',\n" +
           " 'catalog-name' = 'iceberg_catalog',\n" +
           " 'catalog-database' = 'iceberg_db',\n" +
           " 'catalog-table' = 'iceberg_origin',\n" +
           "  'uri'='thrift://host:9083'," +
           "  'warehouse'='hdfs://host:9009/user/hive/warehouse/'" +
           ")"
       );
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to