HI,everyone:
Now, when I use Iceberg native Java API to write data to Iceberg table, only 
one piece of data can be written into a file,The next piece of data doesn't fit 
into the file. Therefore, I need a streaming writing scene to write one piece 
of data into a file,I also did not find the corresponding test class in the 
source code,Thank you.
The following code:

 Configuration conf = new Configuration();
        String warehousePath = 
"hdfs://192.168.88.110:8020/warehouse_path_iceberg";
        HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
        //  catalog.createNamespace(Namespace.of("seatunnel"));
        catalog.setConf(conf);
        Schema schema = new Schema(
                Types.NestedField.required(1, "level", Types.StringType.get()),
                Types.NestedField.required(2, "message", 
Types.IntegerType.get())
        );


     /*   PartitionSpec spec = PartitionSpec.builderFor(schema)
                .hour("event_time")
                .build();*/

        TableIdentifier name = TableIdentifier.of("seatunnel5", "firsttable5");
        Table table = catalog.loadTable(name);
        // Transaction t = table.newTransaction();


// commit operations to the transaction
        // t.newDelete().deleteFromRowFilter(filter).commit();
        String location = 
"hdfs://192.168.88.110:8020/warehouse_path_iceberg/lyhoutput";
        String filename = "9_8_lyh.orc";
        Path path = new Path(location, filename);
        FileFormat fileFormat = FileFormat.fromFileName(filename);
        Preconditions.checkNotNull(fileFormat, "Cannot determine format for 
file: %s", filename);


        FileAppenderFactory<Record> appenderFactory =
                new GenericAppenderFactory(schema);
        ArrayList<GenericRecord> genericRecords = new ArrayList<>();
        GenericRecord genericRecord = GenericRecord.create(schema);
        GenericRecord genericRecord2 = GenericRecord.create(schema);
        genericRecord.set(0, "ddddff");
        genericRecord.set(1, 5555);
        genericRecord2.set(0, "hhhhh");
        genericRecord2.set(1, 4444);
        genericRecords.add(genericRecord2);
        genericRecords.add(genericRecord);
        FileAppender<Record> appender = 
appenderFactory.newAppender(fromPath(path, conf), fileFormat);
        for (GenericRecord record : genericRecords) {
            try (FileAppender<Record> closeableAppender = appender) {
                closeableAppender.add(record);
                //  closeableAppender.addAll((Iterable<Record>) genericRecord);
            }
        }


        DataFile data = DataFiles.builder(PartitionSpec.unpartitioned())
                .withInputFile(HadoopInputFile.fromPath(path, conf))
                .withMetrics(appender.metrics())
                .build();
            table.newAppend().appendFile(data).commit();
Best,
YeHAN Liu (2013650523)

| |
liuyehan
|
|
lyh1067341...@163.com
|

Reply via email to