bvinayakumar opened a new issue #3124:
URL: https://github.com/apache/iceberg/issues/3124


   I am trying to write data to an Iceberg table using Flink engine. Code 
snippet is provided below for reference. Please note I am testing this code 
presently from IDE. The iceberg table is created using Flink SQL client. I am 
using Flink version 1.13.1 and Iceberg 0.12.0 runtime.
   
   ```
   val catalogProperties = mutable.Map[String, String]().empty
   catalogProperties.put(Constants.CatalogType, "iceberg")
   catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, 
"s3://my-iceberg-bucket")
   catalogProperties.put(CatalogProperties.CATALOG_IMPL, 
"org.apache.iceberg.aws.glue.GlueCatalog")
   catalogProperties.put(CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.aws.s3.S3FileIO")
   catalogProperties.put(CatalogProperties.LOCK_IMPL, 
"org.apache.iceberg.aws.glue.DynamoLockManager")
   catalogProperties.put(CatalogProperties.LOCK_TABLE, "myLockTable")
   
   val hadoopConf = new Configuration
   val catalogLoader = CatalogLoader.custom("my_catalog", 
catalogProperties.asJava, hadoopConf, "org.apache.iceberg.aws.glue.GlueCatalog")
   val tableLoader = TableLoader.fromCatalog(catalogLoader, 
TableIdentifier.of("my_ns", "data"))
   
   val dataStream = env.addSource(kafkaConsumer).setParallelism(4).flatMap(new 
DataMapper) // DataMapper converts JSON message from kafka source to 
GenericRowData 
   
   FlinkSink.forRowData(dataStream.javaStream)
     .tableLoader(tableLoader)
     .build()
   ```
   
   The data is written successfully (i.e. I can see parquet files being created 
under `data` folder within the specified S3 bucket). However, the 
`current-snapshot-id` is not updated (-1) in the metadata JSON file. There is 
only one JSON file generated in the `metadata` folder but there are several 
parquet files under the `data` folder.
   
   ```
   {
     "format-version" : 1,
     "table-uuid" : "3a1efd9d-a3ce-47b4-9f5f-d2a61ff6efd9",
     "location" : "s3://...",
     "last-updated-ms" : 1631709058366,
     "last-column-id" : 10,
     "schema" : {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ ... ]
     },
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ ... ]
     } ],
     "partition-spec" : [ ... ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ... ]
     } ],
     "last-partition-id" : 1003,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : { },
     "current-snapshot-id" : -1,
     "snapshots" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ ]
   }
   ```
   
   I have read an excellent blog [1] about Apache Iceberg architecture and was 
expecting the current snapshot ID to be updated on successful writes.
   
   Any comments on why the current snapshot ID may not be updated in the 
metadata JSON file?
   
   [1] Apache Iceberg: An Architectural Look Under the Covers
   https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to