ldwnt opened a new issue, #5852:
URL: https://github.com/apache/iceberg/issues/5852
### Apache Iceberg version
0.13.0
### Query engine
Flink
### Please describe the bug 🐞
I use iceberg flink sink with upsert mode and equality id fields in flink
connector. When one row is modified from source side, the new value is expected
to read from iceberg target table. However, when where clause is used on this
modified column, the old value is read instead.
Procedures to reproduce this bug:
1. create iceberg table
```
CREATE TABLE IF NOT EXISTS ice_catalog.test.timequery_ice1(
`id` int NOT NULL,
`name` int,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH
('format-version'='2','engine.hive.enabled'='true','write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='1');
```
2. produce records to kafka topic:
```
1,1
```
3. read from kafka and write to iceberg using flink sink:
```
DataStream<RowData> ds = env
.fromSource(kafka, WatermarkStrategy.forMonotonousTimestamps(),
"src")
.map((MapFunction<Tuple3<String, String, String>, RowData>)
kafkaMessage -> {
String[] sts = kafkaMessage.f2.split(",");
GenericRowData rowData = new GenericRowData(2);
rowData.setField(0, Integer.parseInt(sts[0]));
rowData.setField(1, Integer.parseInt(sts[1]));
return rowData;
})
.setParallelism(1).uid("@dsm").name("@dsm");
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader,
TableIdentifier.of("test.timequery_ice1"));
FlinkSink.forRowData(ds)
.tableLoader(tableLoader)
.equalityFieldColumns(Arrays.asList("id"))
.upsert(true)
.uidPrefix("@s-test.timequery_ice2")
.append()
.setParallelism(1).uid("@s-test.timequery_ice2").name("@s-test.timequery_ice2");
```
4. wait for checkpoint completion and read iceberg table using spark sql (OK)
```
select * from test.timequery_ice2 where name < 2;
1 1
```
5. produce records to kafka topic:
```
1, 2
```
6. wait for checkpoint completion and read iceberg table using spark sql
(NG, empty set expected)
```
select * from test.timequery_ice2 where name < 2;
1 1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]