My question is not about planning a scan. My question is around the CDC log implementation, so if a writer is open and I get a insert and delete for a record. If i do EQ delete that record is gone.
However if i do insert, delete and insert the exact same record while the writer is currently open I will not see any records for the key because EQ delete kicked in. The above is resolved via a Positional delete. Then my question is let's say if i get a Positional delete for a record that is already flushed to some file. How do I tell the positional delete writer that go to this file and offset and that needs to be put in the delete file. Do I have to do a table scan for that? That or is there an efficient way around this? On Thu, 1 Sep, 2022, 7:00 pm Zoltán Borók-Nagy, <borokna...@cloudera.com.invalid> wrote: > Hi Taher, > > I think most of your questions are answered in the Scan Planning section > at the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning > > To give you some specific answers as well: > Equality Deletes: data and delete files have sequence numbers from which > readers can infer the relative age of the data. Delete files are only > applied to older data files. This means if you insert data again with a key > that was deleted earlier then Iceberg should show the newly inserted record. > > Position Deletes: When reading data files, the reader must keep track of > the file position and only return rows that do not have a record in the > delete files. Alternatively you can do a big ANTI JOIN between data files > and delete files. This latter was our approach in Impala: > https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk > > Cheers, > Zoltan > > > > On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <taher...@gmail.com> wrote: > >> Thank you, Ryan and the iceberg community the suggestions really helped >> progress a lot of development. On the same usecase, I hit another >> block about doing CDC updates and deletes. >> >> I see two options for managing deletes, for now, EqualityDeletes and >> PositionalDeletes: >> >> 1. EqaulityDeletes need me to delete a particular key that iceberg >> then matches at scan time to skip those records. >> 1. The problem here is that when a record with DeleleKey 1 is >> inserted and then deleted and inserted again with key 1 iceberg shows >> no >> records. That is the intended way it has to work I guess. But that >> means I >> need to be more careful when writing to iceberg. >> 2. PositionalDeletes are amazing because I can give an offset and the >> file name of where I want the record to be deleted and I can handle >> updates >> here by delete and insert and the above case is handled. >> 1. What I am stuck here with is, after records have been inserted >> into a file and if I see a delete request or update how do I find the >> offset of the record that needs to be deleted in the inserts file? >> 2. Do I need to do a table scan every time I get a delete request? >> that means I will do a lot of IO and CDC implementation will be crazy >> slow. >> >> Please can you suggest what is the correct way of applying CDC log files >> correctly with a JVM task. >> >> Regards, >> Taher Koitawala >> >> >> >> >> >> On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <taher...@gmail.com> >> wrote: >> >>> Thank you for your response Ryan. We will evaluate your suggestions to >>> sticking with a query engine and also I will try to code you share with me. >>> >>> >>> >>> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote: >>> >>>> Hi Taher, >>>> >>>> It looks like you’re writing something in Java to work with the data >>>> directly. That’s well supported, but you may want to consider using a >>>> compute engine to make this process a bit easier. Most of the issues that >>>> you’re hitting would probably be solved automatically because those engines >>>> will translate correctly to and from Iceberg and other formats. >>>> >>>> Assuming that you want to move forward with Java, I think the issue >>>> you’re hitting is that you’re not using the same in-memory object model to >>>> read and write and are then trying to translate values by hand. Instead, I >>>> recommend using Iceberg readers for both reading and writing so that you >>>> get consistent in-memory records. >>>> >>>> To do that, you should use the Parquet class to instantiate both the >>>> reader and writer. You’ll need to use an Iceberg schema from the table >>>> you’re writing into (which has assigned the field IDs). For the reader, >>>> you’ll also need to pass a name mapping (withNameMapping) that you can >>>> generate from the schema with MappingUtil.create(icebergSchema). That >>>> name mapping enables you to read Parquet files without field IDs. >>>> >>>> Once you have it set up, it should look something like this: >>>> >>>> schema = ParquetSchemaUtil.convert(parquetSchema) >>>> table = catalog.createTable(identifier, schema) >>>> nameMapping = MappingUtil.create(table.schema()) >>>> >>>> try (CloseableIterable<Record> reader = >>>> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) >>>> { >>>> try (FileAppender<Record> writer = >>>> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) >>>> { >>>> for (Record record : reader) { >>>> writer.add(record); >>>> } >>>> } >>>> } >>>> >>>> Ryan >>>> >>>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> Please can someone guide me regarding the above email? >>>>> >>>>> Regards, >>>>> Taher Koitawala >>>>> >>>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> I am creating an iceberg writer over temporal service that >>>>>> converts CDC parquet files to Iceberg format. That means that the file >>>>>> will >>>>>> have a record and corresponding timestamp flags like `inserted_at`, >>>>>> `deleted_at` and `updated_at`, each of which will have a value defining >>>>>> the >>>>>> action. >>>>>> >>>>>> Initially, when there is no table in the iceberg catalog, the plan is >>>>>> to use the Parquet footer schema and map that directly to the Iceberg >>>>>> schema using >>>>>> *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType >>>>>> parquetSchema).* However, the issue that I am facing is that I am >>>>>> also having to convert Parquet datatypes to Iceberg datatypes, >>>>>> specifically the timestamp types when inserting into the table. >>>>>> >>>>>> When using the Parquet reader with the simple group, I see the >>>>>> timestamp as long and when inserted to iceberg, it expects it to be >>>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be >>>>>> cast to OffsetDateTime` >>>>>> >>>>>> I have 2 questions on this use case: >>>>>> 1. Is there an easy way to insert parquet to iceberg records directly >>>>>> without me having to do a type conversion since the goal is to make it >>>>>> all >>>>>> happen within temporal? >>>>>> 2. Need suggestions to handle updates. As for updates I'm having to >>>>>> commit inserts and then commit deletes and then create a new writer again >>>>>> to proceed. >>>>>> >>>>>> Regards, >>>>>> Taher Koitawala >>>>>> >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Tabular >>>> >>>