Thank you for your response Ryan. We will evaluate your suggestions to
sticking with a query engine and also I will try to code you share with me.



On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote:

> Hi Taher,
>
> It looks like you’re writing something in Java to work with the data
> directly. That’s well supported, but you may want to consider using a
> compute engine to make this process a bit easier. Most of the issues that
> you’re hitting would probably be solved automatically because those engines
> will translate correctly to and from Iceberg and other formats.
>
> Assuming that you want to move forward with Java, I think the issue you’re
> hitting is that you’re not using the same in-memory object model to read
> and write and are then trying to translate values by hand. Instead, I
> recommend using Iceberg readers for both reading and writing so that you
> get consistent in-memory records.
>
> To do that, you should use the Parquet class to instantiate both the
> reader and writer. You’ll need to use an Iceberg schema from the table
> you’re writing into (which has assigned the field IDs). For the reader,
> you’ll also need to pass a name mapping (withNameMapping) that you can
> generate from the schema with MappingUtil.create(icebergSchema). That
> name mapping enables you to read Parquet files without field IDs.
>
> Once you have it set up, it should look something like this:
>
> schema  = ParquetSchemaUtil.convert(parquetSchema)
> table = catalog.createTable(identifier, schema)
> nameMapping = MappingUtil.create(table.schema())
>
> try (CloseableIterable<Record> reader = 
> Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build())
>  {
>   try (FileAppender<Record> writer = 
> Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build())
>  {
>     for (Record record : reader) {
>       writer.add(record);
>     }
>   }
> }
>
> Ryan
>
> On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com>
> wrote:
>
>> Hi All,
>>          Please can someone guide me regarding the above email?
>>
>> Regards,
>> Taher Koitawala
>>
>> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>         I am creating an iceberg writer over temporal service that
>>> converts CDC parquet files to Iceberg format. That means that the file will
>>> have a record and corresponding timestamp flags like `inserted_at`,
>>> `deleted_at` and `updated_at`, each of which will have a value defining the
>>> action.
>>>
>>> Initially, when there is no table in the iceberg catalog, the plan is to
>>> use the Parquet footer schema and map that directly to the Iceberg schema
>>> using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType
>>> parquetSchema).* However, the issue that I am facing is that I am also
>>> having to convert Parquet datatypes to Iceberg datatypes, specifically the
>>> timestamp types when inserting into the table.
>>>
>>> When using the Parquet reader with the simple group, I see the timestamp
>>> as long and when inserted to iceberg, it expects it to be
>>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be
>>> cast to OffsetDateTime`
>>>
>>> I have 2 questions on this use case:
>>> 1. Is there an easy way to insert parquet to iceberg records directly
>>> without me having to do a type conversion since the goal is to make it all
>>> happen within temporal?
>>> 2. Need suggestions to handle updates. As for updates I'm having to
>>> commit inserts and then commit deletes and then create a new writer again
>>> to proceed.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to