Thank you, Ryan and the iceberg community the suggestions really helped progress a lot of development. On the same usecase, I hit another block about doing CDC updates and deletes.
I see two options for managing deletes, for now, EqualityDeletes and PositionalDeletes: 1. EqaulityDeletes need me to delete a particular key that iceberg then matches at scan time to skip those records. 1. The problem here is that when a record with DeleleKey 1 is inserted and then deleted and inserted again with key 1 iceberg shows no records. That is the intended way it has to work I guess. But that means I need to be more careful when writing to iceberg. 2. PositionalDeletes are amazing because I can give an offset and the file name of where I want the record to be deleted and I can handle updates here by delete and insert and the above case is handled. 1. What I am stuck here with is, after records have been inserted into a file and if I see a delete request or update how do I find the offset of the record that needs to be deleted in the inserts file? 2. Do I need to do a table scan every time I get a delete request? that means I will do a lot of IO and CDC implementation will be crazy slow. Please can you suggest what is the correct way of applying CDC log files correctly with a JVM task. Regards, Taher Koitawala On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <taher...@gmail.com> wrote: > Thank you for your response Ryan. We will evaluate your suggestions to > sticking with a query engine and also I will try to code you share with me. > > > > On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote: > >> Hi Taher, >> >> It looks like you’re writing something in Java to work with the data >> directly. That’s well supported, but you may want to consider using a >> compute engine to make this process a bit easier. Most of the issues that >> you’re hitting would probably be solved automatically because those engines >> will translate correctly to and from Iceberg and other formats. >> >> Assuming that you want to move forward with Java, I think the issue >> you’re hitting is that you’re not using the same in-memory object model to >> read and write and are then trying to translate values by hand. Instead, I >> recommend using Iceberg readers for both reading and writing so that you >> get consistent in-memory records. >> >> To do that, you should use the Parquet class to instantiate both the >> reader and writer. You’ll need to use an Iceberg schema from the table >> you’re writing into (which has assigned the field IDs). For the reader, >> you’ll also need to pass a name mapping (withNameMapping) that you can >> generate from the schema with MappingUtil.create(icebergSchema). That >> name mapping enables you to read Parquet files without field IDs. >> >> Once you have it set up, it should look something like this: >> >> schema = ParquetSchemaUtil.convert(parquetSchema) >> table = catalog.createTable(identifier, schema) >> nameMapping = MappingUtil.create(table.schema()) >> >> try (CloseableIterable<Record> reader = >> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) >> { >> try (FileAppender<Record> writer = >> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) >> { >> for (Record record : reader) { >> writer.add(record); >> } >> } >> } >> >> Ryan >> >> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com> >> wrote: >> >>> Hi All, >>> Please can someone guide me regarding the above email? >>> >>> Regards, >>> Taher Koitawala >>> >>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> I am creating an iceberg writer over temporal service that >>>> converts CDC parquet files to Iceberg format. That means that the file will >>>> have a record and corresponding timestamp flags like `inserted_at`, >>>> `deleted_at` and `updated_at`, each of which will have a value defining the >>>> action. >>>> >>>> Initially, when there is no table in the iceberg catalog, the plan is >>>> to use the Parquet footer schema and map that directly to the Iceberg >>>> schema using >>>> *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType >>>> parquetSchema).* However, the issue that I am facing is that I am also >>>> having to convert Parquet datatypes to Iceberg datatypes, specifically the >>>> timestamp types when inserting into the table. >>>> >>>> When using the Parquet reader with the simple group, I see the >>>> timestamp as long and when inserted to iceberg, it expects it to be >>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be >>>> cast to OffsetDateTime` >>>> >>>> I have 2 questions on this use case: >>>> 1. Is there an easy way to insert parquet to iceberg records directly >>>> without me having to do a type conversion since the goal is to make it all >>>> happen within temporal? >>>> 2. Need suggestions to handle updates. As for updates I'm having to >>>> commit inserts and then commit deletes and then create a new writer again >>>> to proceed. >>>> >>>> Regards, >>>> Taher Koitawala >>>> >>> >> >> -- >> Ryan Blue >> Tabular >> >