tuziling opened a new issue #3277:
URL: https://github.com/apache/iceberg/issues/3277
### Regarding the research on the real-time reading and writing direction of
Flink1.12.1 + Iceberg0.12.0 + S3, I have encountered some questions. Can anyone
help me answer them?
What has been implemented now is the real-time read and write in
append mode, that is,the read and write are two programs, one for continuous
writing and the other for continuous reading. This is no problem. But when
using upsert mode for real-time reading and writing, real-time writing is fine,
but when there is a delete operation, real-time reading will report an error.
I changed the iceberg-flink-runtime-0.12.0.jar in the program to
compile the package of the master branch in Iceberg by myself. When creating
the table, when the format version of the table is specified as V2, the upsert
mode performs real-time reading and writing , There is still no problem with
real-time writing. When there is a delete operation, real-time reading will
still report an error. The error message is as follows:
```
Caused by: java.lang.UnsupportedOperationException: Found overwrite
operation, cannot support incremental data in snapshots (4132625316523965953,
3766923260337214346)
at
org.apache.iceberg.IncrementalDataTableScan.snapshotsWithin(IncrementalDataTableScan.java:123)
at
org.apache.iceberg.IncrementalDataTableScan.planFiles(IncrementalDataTableScan.java:73)
at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:241)
at org.apache.iceberg.DataTableScan.planTasks(DataTableScan.java:28)
at
org.apache.iceberg.flink.source.FlinkSplitGenerator.tasks(FlinkSplitGenerator.java:86)
at
org.apache.iceberg.flink.source.FlinkSplitGenerator.createInputSplits(FlinkSplitGenerator.java:38)
at
org.apache.iceberg.flink.source.StreamingMonitorFunction.monitorAndForwardSplits(StreamingMonitorFunction.java:143)
at
org.apache.iceberg.flink.source.StreamingMonitorFunction.run(StreamingMonitorFunction.java:121)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
```
The table building statement is as follows:
```sql
CREATE TABLE tuziling_catalog.tuzl.tzling_test1(
id BIGINT PRIMARY KEY not ENFORCED,
data STRING
) comment'iceberg test table'
with (
'type'='iceberg',
'write.format.default'='parquet',
'format-version'='2',
'write.distribution-mode'='hash',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='10');
```
After the table is built, the table metadata information on S3 is as follows:
```
{
"format-version": 2,
"table-uuid": "07f59d0b-3b6a-4f4a-b006-883ec3d429a7",
"location": "s3://tuziling-bucket/tuziling/tuzl.db/tzling_test1",
"last-sequence-number": 0,
"last-updated-ms": 1633745889980,
"last-column-id": 2,
"current-schema-id": 0,
"schemas": [{
"type": "struct",
"schema-id": 0,
"fields": [{
"id": 1,
"name": "id",
"required": false,
"type": "long"
}, {
"id": 2,
"name": "data",
"required": false,
"type": "string"
}]
} ],
"default-spec-id": 0,
"partition-specs": [{
"spec-id": 0,
"fields": []
} ],
"last-partition-id": 999,
"default-sort-order-id": 0,
"sort-orders": [{
"order-id": 0,
"fields": []
} ],
"properties": {
"connector": "iceberg",
"write.metadata.previous-versions-max": "10",
"write.metadata.delete-after-commit.enabled": "true",
"write.distribution-mode": "hash"
},
"current-snapshot-id": -1,
"snapshots": [],
"snapshot-log": [],
"metadata-log": []
}
```
The real-time data source comes from kafka, the main code is as follows:
```
DataStream<RowData> input = transformData.flatMap(new
FlatMapFunction<Tuple2<Long, String>, RowData>() {
@Override
public void flatMap(Tuple2<Long, String> tuple2,
Collector<RowData> out) throws Exception {
GenericRowData rowData = new GenericRowData(2);
rowData.setField(0, tuple2.f0);
rowData.setField(1, StringData.fromString(tuple2.f1));
rowData.setRowKind(RowKind.UPDATE_BEFORE);
log.info("flatMap function --> rowData={}", rowData);
out.collect(rowData);
GenericRowData genericRowData = new GenericRowData(2);
genericRowData.setField(0, tuple2.f0);
genericRowData.setField(1, StringData.fromString(tuple2.f1));
genericRowData.setRowKind(RowKind.UPDATE_AFTER);
log.info("flatMap function --> genericRowData={}",
genericRowData);
out.collect(genericRowData);
}
});
String catalogName = "tuziling_catalog";
Configuration hadoopConf = new Configuration();
Map<String, String> props = new HashMap<>();
props.put("type", Common.CATALOG_TYPE);
map.put("warehouse", "s3://tuziling-bucket/tuziling");
map.put("catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog");
map.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO");
map.put("lock-impl","org.apache.iceberg.aws.glue.DynamoLockManager");
map.put("lock.table","myGlueLockTable");
CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, props,
hadoopConf, Common.CATALOG_IMPL);
TableIdentifier identifier = TableIdentifier.of("tuzl",
"tzling_test1");
TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog,
identifier);
//定义 tableSchema
TableSchema tableSchema = TableSchema.builder()
.field("id", DataTypes.BIGINT().notNull())
.field("data", DataTypes.STRING())
.primaryKey("id")
.build();
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(Arrays.asList("id"))
// .overwrite(true)
.build();
```
The main code for real-time reading is as follows:
```
String catalogName = "tuziling_catalog";
Map<String,String> map = new HashMap<>();
map.put("type","iceberg");
map.put("warehouse", "s3://tuziling-bucket/tuziling");
map.put("catalog-impl","org.apache.iceberg.aws.glue.GlueCatalog");
map.put("io-impl","org.apache.iceberg.aws.s3.S3FileIO");
map.put("lock-impl","org.apache.iceberg.aws.glue.DynamoLockManager");
map.put("lock.table","myGlueLockTable");
Configuration hadoopConf = new Configuration();
String impl = "org.apache.iceberg.aws.glue.GlueCatalog";
CatalogLoader tzlCatalog = CatalogLoader.custom(catalogName, map,
hadoopConf, impl);
TableIdentifier identifier = TableIdentifier.of("tuzl",
"tzling_test1");
TableLoader tableLoader = TableLoader.fromCatalog(tzlCatalog,
identifier);
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
//.startSnapshotId(5868847536461701834L)
.build();
```
### Excuse me, according to the above verification, is the upsert mode of
Flink + Iceberg + S3 real-time reading and writing currently not fully
supported by the official? If this mode is not officially supported, is there
any other solution that can replace it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]