Thank you for your response Ryan. We will evaluate your suggestions to sticking with a query engine and also I will try to code you share with me.
On Thu, 25 Aug, 2022, 2:25 am Ryan Blue, <b...@tabular.io> wrote: > Hi Taher, > > It looks like you’re writing something in Java to work with the data > directly. That’s well supported, but you may want to consider using a > compute engine to make this process a bit easier. Most of the issues that > you’re hitting would probably be solved automatically because those engines > will translate correctly to and from Iceberg and other formats. > > Assuming that you want to move forward with Java, I think the issue you’re > hitting is that you’re not using the same in-memory object model to read > and write and are then trying to translate values by hand. Instead, I > recommend using Iceberg readers for both reading and writing so that you > get consistent in-memory records. > > To do that, you should use the Parquet class to instantiate both the > reader and writer. You’ll need to use an Iceberg schema from the table > you’re writing into (which has assigned the field IDs). For the reader, > you’ll also need to pass a name mapping (withNameMapping) that you can > generate from the schema with MappingUtil.create(icebergSchema). That > name mapping enables you to read Parquet files without field IDs. > > Once you have it set up, it should look something like this: > > schema = ParquetSchemaUtil.convert(parquetSchema) > table = catalog.createTable(identifier, schema) > nameMapping = MappingUtil.create(table.schema()) > > try (CloseableIterable<Record> reader = > Parquet.read(io.newInputFile("file.parquet")).project(table.schema()).withNameMapping(nameMapping).build()) > { > try (FileAppender<Record> writer = > Parquet.writeData(io.newOutputFile("new_file.parquet").forTable(table).build()) > { > for (Record record : reader) { > writer.add(record); > } > } > } > > Ryan > > On Wed, Aug 24, 2022 at 6:49 AM Taher Koitawala <taher...@gmail.com> > wrote: > >> Hi All, >> Please can someone guide me regarding the above email? >> >> Regards, >> Taher Koitawala >> >> On Tue, Aug 23, 2022 at 5:46 PM Taher Koitawala <taher...@gmail.com> >> wrote: >> >>> Hi All, >>> I am creating an iceberg writer over temporal service that >>> converts CDC parquet files to Iceberg format. That means that the file will >>> have a record and corresponding timestamp flags like `inserted_at`, >>> `deleted_at` and `updated_at`, each of which will have a value defining the >>> action. >>> >>> Initially, when there is no table in the iceberg catalog, the plan is to >>> use the Parquet footer schema and map that directly to the Iceberg schema >>> using *org.apache.iceberg.parquet.ParquetSchemaUtil.convert(MessageType >>> parquetSchema).* However, the issue that I am facing is that I am also >>> having to convert Parquet datatypes to Iceberg datatypes, specifically the >>> timestamp types when inserting into the table. >>> >>> When using the Parquet reader with the simple group, I see the timestamp >>> as long and when inserted to iceberg, it expects it to be >>> *java.time.OffsetDateTime*, specific error I get is `Long cannot be >>> cast to OffsetDateTime` >>> >>> I have 2 questions on this use case: >>> 1. Is there an easy way to insert parquet to iceberg records directly >>> without me having to do a type conversion since the goal is to make it all >>> happen within temporal? >>> 2. Need suggestions to handle updates. As for updates I'm having to >>> commit inserts and then commit deletes and then create a new writer again >>> to proceed. >>> >>> Regards, >>> Taher Koitawala >>> >> > > -- > Ryan Blue > Tabular >