Hi liubo07199
Thanks for testing the iceberg row-level delete, I skimmed the code, it
seems you were trying the equality-delete feature. For iceberg users, I
think we don't have to write those iceberg internal codes to get this work,
this isn't friendly for users. Instead, we usually use the
equality-delete ( CDC events ingestion or flink aggregation upsert
streams) feature based on the compute-engine work. Currently, we've
supported the flink cdc-events integration (Flink Datastream integration
has been merged [1] while the Flink SQL integration depends on the time
when we are ready to expose iceberg format v2 [2])
About what's the time to expose format v2 to users, you may want to read
this mail [3].
If you just want to have a basic test for writing cdc by flink, you can
apply this patch in your own repository, and then create an iceberg table
with an extra option like the following:
public static Table createTable(String path, Map<String, String>
properties, boolean partitioned) {
PartitionSpec spec;
if (partitioned) {
spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
} else {
spec = PartitionSpec.unpartitioned();
}
properties.put(TableProperties.FORMAT_VERSION, "2");
return new HadoopTables().create(SCHEMA, spec, properties, path);
}
Then use the flink data stream api or flink sql to write the cdc events
into an apache iceberg table. For data stream job to sinking cdc events I
suggest to use the similar way here [4].
I'd like to help if you have further feedback.
Thanks.
[1]. https://github.com/apache/iceberg/pull/1974
[2]. https://github.com/apache/iceberg/pull/1978
[3].
https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E
[4].
https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143
On Sat, Dec 26, 2020 at 7:14 PM 1 <[email protected]> wrote:
> Hi, all:
>
> I want to try row level delete, but get the exception :
> IllegalArgumentException:
> Cannot write delete files in a v1 table.
> I look over https://iceberg.apache.org/spec/#table-metadata for
> format-version, it said that An integer version number for the format.
> Currently, this is always 1. Implementations must throw an exception if a
> table’s version is higher than the supported version. so what can i do to
> test row-level deletion ?
> So what can I do to have a try to row level delete? how to create a v2
> table ?
>
> thx
>
> Code is :
>
> private static void deleteRead() throws IOException {
> Schema deleteRowSchema = table.schema().select("id");
> Record dataDelete = GenericRecord.create(deleteRowSchema);
> List<Record> dataDeletes = Lists.newArrayList(
> dataDelete.copy("id", 11), // id = 29
> dataDelete.copy("id", 12), // id = 89
> dataDelete.copy("id", 13) // id = 122
> );
>
> DeleteFile eqDeletes = writeDeleteFile(table, Files.localOutput(tmpFile),
> dataDeletes, deleteRowSchema);
>
> table.newRowDelta()
> .addDeletes(eqDeletes)
> .commit();
> }
>
> private static DeleteFile writeDeleteFile(Table table, OutputFile out,
> List<Record> deletes, Schema
> deleteRowSchema) throws IOException {
> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
> .forTable(table)
> .withPartition(Row.of("20201221"))
> .rowSchema(deleteRowSchema)
> .createWriterFunc(GenericParquetWriter::buildWriter)
> .overwrite()
>
> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
> .buildEqualityWriter();
>
> try (Closeable toClose = writer) {
> writer.deleteAll(deletes);
> }
>
> return writer.toDeleteFile();
> }
>
> liubo07199
> [email protected]
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D>
>