Thanks for the clarification. On Tue, Apr 6, 2021 at 10:25 PM OpenInx <open...@gmail.com> wrote:
> Hi Chen Song > > If want to test the format v2 under your env, you could follow this > comment https://github.com/apache/iceberg/pull/2410#issuecomment-812463051 > to upgrade your iceberg table to format v2. > > The TableProperties.FORMAT_VERSION was introduced in a separate PoC PR , > so we could not find this static variable in the current apache iceberg > master branch. > > On Wed, Apr 7, 2021 at 3:28 AM Chen Song <chen.song...@gmail.com> wrote: > >> Hey I want to quickly follow up on this thread. >> >> I cannot seem to find any pull request to expose V2 format version on >> table creation, specifically for the line below referenced in your >> previous email. >> >> TableProperties.FORMAT_VERSION >> >> Can you suggest? I want to create a V2 table to test some row level >> upserts/deletes. >> >> Chen >> >> On Sun, Dec 27, 2020 at 9:33 PM OpenInx <open...@gmail.com> wrote: >> >>> > you can apply this patch in your own repository >>> >>> The patch is : https://github.com/apache/iceberg/pull/1978 >>> >>> On Mon, Dec 28, 2020 at 10:32 AM OpenInx <open...@gmail.com> wrote: >>> >>>> Hi liubo07199 >>>> >>>> Thanks for testing the iceberg row-level delete, I skimmed the code, >>>> it seems you were trying the equality-delete feature. For iceberg users, I >>>> think we don't have to write those iceberg internal codes to get this work, >>>> this isn't friendly for users. Instead, we usually use the >>>> equality-delete ( CDC events ingestion or flink aggregation upsert >>>> streams) feature based on the compute-engine work. Currently, we've >>>> supported the flink cdc-events integration (Flink Datastream integration >>>> has been merged [1] while the Flink SQL integration depends on the time >>>> when we are ready to expose iceberg format v2 [2]) >>>> >>>> About what's the time to expose format v2 to users, you may want to >>>> read this mail [3]. >>>> >>>> If you just want to have a basic test for writing cdc by flink, you >>>> can apply this patch in your own repository, and then create an iceberg >>>> table with an extra option like the following: >>>> >>>> public static Table createTable(String path, Map<String, String> >>>> properties, boolean partitioned) { >>>> PartitionSpec spec; >>>> if (partitioned) { >>>> spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); >>>> } else { >>>> spec = PartitionSpec.unpartitioned(); >>>> } >>>> properties.put(TableProperties.FORMAT_VERSION, "2"); >>>> return new HadoopTables().create(SCHEMA, spec, properties, path); >>>> } >>>> >>>> Then use the flink data stream api or flink sql to write the cdc events >>>> into an apache iceberg table. For data stream job to sinking cdc events I >>>> suggest to use the similar way here [4]. >>>> >>>> I'd like to help if you have further feedback. >>>> >>>> Thanks. >>>> >>>> [1]. https://github.com/apache/iceberg/pull/1974 >>>> [2]. https://github.com/apache/iceberg/pull/1978 >>>> [3]. >>>> https://mail-archives.apache.org/mod_mbox/iceberg-dev/202012.mbox/%3CCACc8XkGt%2B5kxr-XRMgY1eUKjd70mej38KFbhDuV2MH3AVMON2g%40mail.gmail.com%3E >>>> [4]. >>>> https://github.com/apache/iceberg/pull/1974/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR143 >>>> >>>> On Sat, Dec 26, 2020 at 7:14 PM 1 <liubo1022...@126.com> wrote: >>>> >>>>> Hi, all: >>>>> >>>>> I want to try row level delete, but get the exception : >>>>> IllegalArgumentException: >>>>> Cannot write delete files in a v1 table. >>>>> I look over https://iceberg.apache.org/spec/#table-metadata for >>>>> format-version, it said that An integer version number for the format. >>>>> Currently, this is always 1. Implementations must throw an exception if a >>>>> table’s version is higher than the supported version. so what can i do to >>>>> test row-level deletion ? >>>>> So what can I do to have a try to row level delete? how to create >>>>> a v2 table ? >>>>> >>>>> thx >>>>> >>>>> Code is : >>>>> >>>>> private static void deleteRead() throws IOException { >>>>> Schema deleteRowSchema = table.schema().select("id"); >>>>> Record dataDelete = GenericRecord.create(deleteRowSchema); >>>>> List<Record> dataDeletes = Lists.newArrayList( >>>>> dataDelete.copy("id", 11), // id = 29 >>>>> dataDelete.copy("id", 12), // id = 89 >>>>> dataDelete.copy("id", 13) // id = 122 >>>>> ); >>>>> >>>>> DeleteFile eqDeletes = writeDeleteFile(table, >>>>> Files.localOutput(tmpFile), dataDeletes, deleteRowSchema); >>>>> >>>>> table.newRowDelta() >>>>> .addDeletes(eqDeletes) >>>>> .commit(); >>>>> } >>>>> >>>>> private static DeleteFile writeDeleteFile(Table table, OutputFile out, >>>>> List<Record> deletes, Schema >>>>> deleteRowSchema) throws IOException { >>>>> EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out) >>>>> .forTable(table) >>>>> .withPartition(Row.of("20201221")) >>>>> .rowSchema(deleteRowSchema) >>>>> .createWriterFunc(GenericParquetWriter::buildWriter) >>>>> .overwrite() >>>>> >>>>> .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray()) >>>>> .buildEqualityWriter(); >>>>> >>>>> try (Closeable toClose = writer) { >>>>> writer.deleteAll(deletes); >>>>> } >>>>> >>>>> return writer.toDeleteFile(); >>>>> } >>>>> >>>>> liubo07199 >>>>> liubo07...@hellobike.com >>>>> >>>>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=liubo07199&uid=liubo07199%40hellobike.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22liubo07199%40hellobike.com%22%5D> >>>>> >>>> >> >> -- >> Chen Song >> >> -- Chen Song