Hi Taher, It looks like you’re writing something in Java to work with the data directly. That’s well supported, but you may want to consider using a compute engine to make this process a bit easier. Most of the issues that you’re hitting would probably be solved automatically because those engines will translate correctly to and from Iceberg and other formats.
Assuming that you want to move forward with Java, I think the issue you’re hitting is that you’re not using the same in-memory object model to read and write and are then trying to translate values by hand. Instead, I recommend using Iceberg readers for both reading and writing so that you get consistent in-memory records. To do that, you should use the Parquet class to instantiate both the reader and writer. You’ll need to use an Iceberg schema from the table you’re writing into (which has assigned the field IDs). For the reader, you’ll also need to pass a name mapping (withNameMapping) that you can generate from the schema with MappingUtil.create(icebergSchema). That name mapping enables you to read Parquet files without field IDs. Once you have it set up, it should look something like this: schema = ParquetSchemaUtil.convert(parquetSchema) table = catalog.createTable(identifier, schema) nameMapping = MappingUtil.create(table.schema()) try (CloseableIterable<Record> reader = Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) { try (FileAppender<Record> writer = Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) { for (Record record : reader) { writer.add(record); } } } Ryan On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com> wrote: > Hi All, > Please can someone guide me regarding the above email? > > Regards, > Taher Koitawala > > On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com> > wrote: > >> Hi All, >> I am creating an iceberg writer over temporal service that >> converts CDC parquet files to Iceberg format. That means that the file will >> have a record and corresponding timestamp flags like `inserted_at`, >> `deleted_at` and `updated_at`, each of which will have a value defining the >> action. >> >> Initially, when there is no table in the iceberg catalog, the plan is to >> use the Parquet footer schema and map that directly to the Iceberg schema >> using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType >> parquetSchema).* However, the issue that I am facing is that I am also >> having to convert Parquet datatypes to Iceberg datatypes, specifically the >> timestamp types when inserting into the table. >> >> When using the Parquet reader with the simple group, I see the timestamp >> as long and when inserted to iceberg, it expects it to be >> *java.time.OffsetDateTime*, specific error I get is `Long cannot be cast >> to OffsetDateTime` >> >> I have 2 questions on this use case: >> 1. Is there an easy way to insert parquet to iceberg records directly >> without me having to do a type conversion since the goal is to make it all >> happen within temporal? >> 2. Need suggestions to handle updates. As for updates I'm having to >> commit inserts and then commit deletes and then create a new writer again >> to proceed. >> >> Regards, >> Taher Koitawala >> > -- Ryan Blue Tabular