My question is not about planning a scan. My question is around the CDC log
implementation, so if a writer is open and I get a insert and delete for a
record. If i do EQ delete that record is gone.

However if i do insert, delete and insert the exact same record while the
writer is currently open I will not see any records for the key because EQ
delete kicked in.

The above is resolved via a Positional delete. Then my question is let's
say if i get a Positional delete for a record that is already flushed to
some file. How do I tell the positional delete writer that go to this file
and offset and that needs to be put in the delete file. Do I have to do a
table scan for that? That or is there an efficient way around this?

On Thu, 1 Sep, 2022, 7:00 pm Zoltán Borók-Nagy,
<borokna...@cloudera.com.invalid> wrote:

> Hi Taher,
>
> I think most of your questions are answered in the Scan Planning section
> at the Iceberg spec page: https://iceberg.apache.org/spec/#scan-planning
>
> To give you some specific answers as well:
> Equality Deletes: data and delete files have sequence numbers from which
> readers can infer the relative age of the data. Delete files are only
> applied to older data files. This means if you insert data again with a key
> that was deleted earlier then Iceberg should show the newly inserted record.
>
> Position Deletes: When reading data files, the reader must keep track of
> the file position and only return rows that do not have a record in the
> delete files. Alternatively you can do a big ANTI JOIN between data files
> and delete files. This latter was our approach in Impala:
> https://docs.google.com/document/d/1WF_UOanQ61RUuQlM4LaiRWI0YXpPKZ2VEJ8gyJdDyoY/edit#heading=h.5bmfhbmb4qdk
>
> Cheers,
>     Zoltan
>
>
>
> On Thu, Sep 1, 2022 at 7:34 AM Taher Koitawala <taher...@gmail.com> wrote:
>
>> Thank you, Ryan and the iceberg community the suggestions really helped
>> progress a lot of development. On the same usecase, I hit another
>> block about doing CDC updates and deletes.
>>
>> I see two options for managing deletes, for now, EqualityDeletes and
>> PositionalDeletes:
>>
>>    1. EqaulityDeletes need me to delete a particular key that iceberg
>>    then matches at scan time to skip those records.
>>       1. The problem here is that when a record with DeleleKey 1 is
>>       inserted and then deleted and inserted again with key 1 iceberg shows 
>> no
>>       records. That is the intended way it has to work I guess. But that 
>> means I
>>       need to be more careful when writing to iceberg.
>>    2. PositionalDeletes are amazing because I can give an offset and the
>>    file name of where I want the record to be deleted and I can handle 
>> updates
>>    here by delete and insert and the above case is handled.
>>       1. What I am stuck here with is, after records have been inserted
>>       into a file and if I see a delete request or update how do I find the
>>       offset of the record that needs to be deleted in the inserts file?
>>       2. Do I need to do a table scan every time I get a delete request?
>>       that means I will do a lot of IO and CDC implementation will be crazy 
>> slow.
>>
>> Please can you suggest what is the correct way of applying CDC log files
>> correctly with a JVM task.
>>
>> Regards,
>> Taher Koitawala
>>
>>
>>
>>
>>
>> On Thu, Aug 25, 2022 at 9:39 AM Taher Koitawala <taher...@gmail.com>
>> wrote:
>>
>>> Thank you for your response Ryan. We will evaluate your suggestions to
>>> sticking with a query engine and also I will try to code you share with me.
>>>
>>>
>>>
>>> On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote:
>>>
>>>> Hi Taher,
>>>>
>>>> It looks like you’re writing something in Java to work with the data
>>>> directly. That’s well supported, but you may want to consider using a
>>>> compute engine to make this process a bit easier. Most of the issues that
>>>> you’re hitting would probably be solved automatically because those engines
>>>> will translate correctly to and from Iceberg and other formats.
>>>>
>>>> Assuming that you want to move forward with Java, I think the issue
>>>> you’re hitting is that you’re not using the same in-memory object model to
>>>> read and write and are then trying to translate values by hand. Instead, I
>>>> recommend using Iceberg readers for both reading and writing so that you
>>>> get consistent in-memory records.
>>>>
>>>> To do that, you should use the Parquet class to instantiate both the
>>>> reader and writer. You’ll need to use an Iceberg schema from the table
>>>> you’re writing into (which has assigned the field IDs). For the reader,
>>>> you’ll also need to pass a name mapping (withNameMapping) that you can
>>>> generate from the schema with MappingUtil.create(icebergSchema). That
>>>> name mapping enables you to read Parquet files without field IDs.
>>>>
>>>> Once you have it set up, it should look something like this:
>>>>
>>>> schema  = ParquetSchemaUtil.convert(parquetSchema)
>>>> table = catalog.createTable(identifier, schema)
>>>> nameMapping = MappingUtil.create(table.schema())
>>>>
>>>> try (CloseableIterable<Record> reader = 
>>>> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build())
>>>>  {
>>>>   try (FileAppender<Record> writer = 
>>>> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build())
>>>>  {
>>>>     for (Record record : reader) {
>>>>       writer.add(record);
>>>>     }
>>>>   }
>>>> }
>>>>
>>>> Ryan
>>>>
>>>> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>          Please can someone guide me regarding the above email?
>>>>>
>>>>> Regards,
>>>>> Taher Koitawala
>>>>>
>>>>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>         I am creating an iceberg writer over temporal service that
>>>>>> converts CDC parquet files to Iceberg format. That means that the file 
>>>>>> will
>>>>>> have a record and corresponding timestamp flags like `inserted_at`,
>>>>>> `deleted_at` and `updated_at`, each of which will have a value defining 
>>>>>> the
>>>>>> action.
>>>>>>
>>>>>> Initially, when there is no table in the iceberg catalog, the plan is
>>>>>> to use the Parquet footer schema and map that directly to the Iceberg
>>>>>> schema using 
>>>>>> *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>>>>> parquetSchema).* However, the issue that I am facing is that I am
>>>>>> also having to convert Parquet datatypes to Iceberg datatypes,
>>>>>> specifically the timestamp types when inserting into the table.
>>>>>>
>>>>>> When using the Parquet reader with the simple group, I see the
>>>>>> timestamp as long and when inserted to iceberg, it expects it to be
>>>>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>>>>> cast to OffsetDateTime`
>>>>>>
>>>>>> I have 2 questions on this use case:
>>>>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>>>>> without me having to do a type conversion since the goal is to make it 
>>>>>> all
>>>>>> happen within temporal?
>>>>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>>>>> commit inserts and then commit deletes and then create a new writer again
>>>>>> to proceed.
>>>>>>
>>>>>> Regards,
>>>>>> Taher Koitawala
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Tabular
>>>>
>>>

Reply via email to