Thank you, Ryan and the iceberg community the suggestions really helped
progress a lot of development. On the same usecase, I hit another
block about doing CDC updates and deletes.

I see two options for managing deletes, for now, EqualityDeletes and
PositionalDeletes:

   1. EqaulityDeletes need me to delete a particular key that iceberg then
   matches at scan time to skip those records.
      1. The problem here is that when a record with DeleleKey 1 is
      inserted and then deleted and inserted again with key 1 iceberg shows no
      records. That is the intended way it has to work I guess. But
that means I
      need to be more careful when writing to iceberg.
   2. PositionalDeletes are amazing because I can give an offset and the
   file name of where I want the record to be deleted and I can handle updates
   here by delete and insert and the above case is handled.
      1. What I am stuck here with is, after records have been inserted
      into a file and if I see a delete request or update how do I find the
      offset of the record that needs to be deleted in the inserts file?
      2. Do I need to do a table scan every time I get a delete request?
      that means I will do a lot of IO and CDC implementation will be
crazy slow.

Please can you suggest what is the correct way of applying CDC log files
correctly with a JVM task.

Regards,
Taher Koitawala





On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <taher...@gmail.com> wrote:

> Thank you for your response Ryan. We will evaluate your suggestions to
> sticking with a query engine and also I will try to code you share with me.
>
>
>
> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote:
>
>> Hi Taher,
>>
>> It looks like you’re writing something in Java to work with the data
>> directly. That’s well supported, but you may want to consider using a
>> compute engine to make this process a bit easier. Most of the issues that
>> you’re hitting would probably be solved automatically because those engines
>> will translate correctly to and from Iceberg and other formats.
>>
>> Assuming that you want to move forward with Java, I think the issue
>> you’re hitting is that you’re not using the same in-memory object model to
>> read and write and are then trying to translate values by hand. Instead, I
>> recommend using Iceberg readers for both reading and writing so that you
>> get consistent in-memory records.
>>
>> To do that, you should use the Parquet class to instantiate both the
>> reader and writer. You’ll need to use an Iceberg schema from the table
>> you’re writing into (which has assigned the field IDs). For the reader,
>> you’ll also need to pass a name mapping (withNameMapping) that you can
>> generate from the schema with MappingUtil.create(icebergSchema). That
>> name mapping enables you to read Parquet files without field IDs.
>>
>> Once you have it set up, it should look something like this:
>>
>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>> table = catalog.createTable(identifier, schema)
>> nameMapping = MappingUtil.create(table.schema())
>>
>> try (CloseableIterable<Record> reader = 
>> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build())
>>  {
>>   try (FileAppender<Record> writer = 
>> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build())
>>  {
>>     for (Record record : reader) {
>>       writer.add(record);
>>     }
>>   }
>> }
>>
>> Ryan
>>
>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>          Please can someone guide me regarding the above email?
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>         I am creating an iceberg writer over temporal service that
>>>> converts CDC parquet files to Iceberg format. That means that the file will
>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>>> action.
>>>>
>>>> Initially, when there is no table in the iceberg catalog, the plan is
>>>> to use the Parquet footer schema and map that directly to the Iceberg
>>>> schema using 
>>>> *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>> parquetSchema).* However, the issue that I am facing is that I am also
>>>> having to convert Parquet datatypes to Iceberg datatypes, specifically the
>>>> timestamp types when inserting into the table.
>>>>
>>>> When using the Parquet reader with the simple group, I see the
>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>> cast to OffsetDateTime`
>>>>
>>>> I have 2 questions on this use case:
>>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>>> without me having to do a type conversion since the goal is to make it all
>>>> happen within temporal?
>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>> commit inserts and then commit deletes and then create a new writer again
>>>> to proceed.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

Reply via email to