bvinayakumar opened a new issue #3124:
URL: https://github.com/apache/iceberg/issues/3124
I am trying to write data to an Iceberg table using Flink engine. Code
snippet is provided below for reference. Please note I am testing this code
presently from IDE. The iceberg table is created using Flink SQL client. I am
using Flink version 1.13.1 and Iceberg 0.12.0 runtime.
```
val catalogProperties = mutable.Map[String, String]().empty
catalogProperties.put(Constants.CatalogType, "iceberg")
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION,
"s3://my-iceberg-bucket")
catalogProperties.put(CatalogProperties.CATALOG_IMPL,
"org.apache.iceberg.aws.glue.GlueCatalog")
catalogProperties.put(CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.aws.s3.S3FileIO")
catalogProperties.put(CatalogProperties.LOCK_IMPL,
"org.apache.iceberg.aws.glue.DynamoLockManager")
catalogProperties.put(CatalogProperties.LOCK_TABLE, "myLockTable")
val hadoopConf = new Configuration
val catalogLoader = CatalogLoader.custom("my_catalog",
catalogProperties.asJava, hadoopConf, "org.apache.iceberg.aws.glue.GlueCatalog")
val tableLoader = TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of("my_ns", "data"))
val dataStream = env.addSource(kafkaConsumer).setParallelism(4).flatMap(new
DataMapper) // DataMapper converts JSON message from kafka source to
GenericRowData
FlinkSink.forRowData(dataStream.javaStream)
.tableLoader(tableLoader)
.build()
```
The data is written successfully (i.e. I can see parquet files being created
under `data` folder within the specified S3 bucket). However, the
`current-snapshot-id` is not updated (-1) in the metadata JSON file. There is
only one JSON file generated in the `metadata` folder but there are several
parquet files under the `data` folder.
```
{
"format-version" : 1,
"table-uuid" : "3a1efd9d-a3ce-47b4-9f5f-d2a61ff6efd9",
"location" : "s3://...",
"last-updated-ms" : 1631709058366,
"last-column-id" : 10,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ ... ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ ... ]
} ],
"partition-spec" : [ ... ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ... ]
} ],
"last-partition-id" : 1003,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : { },
"current-snapshot-id" : -1,
"snapshots" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
}
```
I have read an excellent blog [1] about Apache Iceberg architecture and was
expecting the current snapshot ID to be updated on successful writes.
Any comments on why the current snapshot ID may not be updated in the
metadata JSON file?
[1] Apache Iceberg: An Architectural Look Under the Covers
https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]