HI,everyone: Now, when I use Iceberg native Java API to write data to Iceberg table, only one piece of data can be written into a file,The next piece of data doesn't fit into the file. Therefore, I need a streaming writing scene to write one piece of data into a file,I also did not find the corresponding test class in the source code,Thank you. The following code:
Configuration conf = new Configuration(); String warehousePath = "hdfs://192.168.88.110:8020/warehouse_path_iceberg"; HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath); // catalog.createNamespace(Namespace.of("seatunnel")); catalog.setConf(conf); Schema schema = new Schema( Types.NestedField.required(1, "level", Types.StringType.get()), Types.NestedField.required(2, "message", Types.IntegerType.get()) ); /* PartitionSpec spec = PartitionSpec.builderFor(schema) .hour("event_time") .build();*/ TableIdentifier name = TableIdentifier.of("seatunnel5", "firsttable5"); Table table = catalog.loadTable(name); // Transaction t = table.newTransaction(); // commit operations to the transaction // t.newDelete().deleteFromRowFilter(filter).commit(); String location = "hdfs://192.168.88.110:8020/warehouse_path_iceberg/lyhoutput"; String filename = "9_8_lyh.orc"; Path path = new Path(location, filename); FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); FileAppenderFactory<Record> appenderFactory = new GenericAppenderFactory(schema); ArrayList<GenericRecord> genericRecords = new ArrayList<>(); GenericRecord genericRecord = GenericRecord.create(schema); GenericRecord genericRecord2 = GenericRecord.create(schema); genericRecord.set(0, "ddddff"); genericRecord.set(1, 5555); genericRecord2.set(0, "hhhhh"); genericRecord2.set(1, 4444); genericRecords.add(genericRecord2); genericRecords.add(genericRecord); FileAppender<Record> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); for (GenericRecord record : genericRecords) { try (FileAppender<Record> closeableAppender = appender) { closeableAppender.add(record); // closeableAppender.addAll((Iterable<Record>) genericRecord); } } DataFile data = DataFiles.builder(PartitionSpec.unpartitioned()) .withInputFile(HadoopInputFile.fromPath(path, conf)) .withMetrics(appender.metrics()) .build(); table.newAppend().appendFile(data).commit(); Best, YeHAN Liu (2013650523) | | liuyehan | | lyh1067341...@163.com |