gemini-code-assist[bot] commented on code in PR #39158:
URL: https://github.com/apache/beam/pull/39158#discussion_r3493582486
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -152,15 +152,38 @@ public void processElement(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)),
window.maxTimestamp());
}
- // This works only when all files are using the same partition spec.
- private void appendDataFiles(Table table, Iterable<FileWriteResult>
fileWriteResults) {
- AppendFiles update = table.newAppend();
+ private void appendDataFiles(Table table, Iterable<FileWriteResult>
fileWriteResults)
+ throws IOException {
+ Map<Integer, PartitionSpec> specs = table.specs();
+ FileIO io = table.io();
+ String uuid = UUID.randomUUID().toString();
+
+ Map<String, List<DataFile>> byPartition = new HashMap<>();
for (FileWriteResult result : fileWriteResults) {
- DataFile dataFile = result.getDataFile(table.specs());
- update.appendFile(dataFile);
+ DataFile dataFile = result.getDataFile(specs);
+ String partitionPath =
result.getSerializableDataFile().getPartitionPath();
+ byPartition.computeIfAbsent(partitionPath, k -> new
ArrayList<>()).add(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
}
+
+ AppendFiles update = table.newAppend();
+ int manifestIdx = 0;
+ for (Map.Entry<String, List<DataFile>> entry : byPartition.entrySet()) {
+ List<DataFile> files = entry.getValue();
+ PartitionSpec spec =
+ files.get(0).specId() >= 0
+ ?
Preconditions.checkStateNotNull(specs.get(files.get(0).specId()))
+ : table.spec();
+ ManifestWriter<DataFile> writer =
+ createManifestWriter(
+ table.location(), uuid + "-" + manifestIdx++, spec, io);
+ for (DataFile file : files) {
+ writer.add(file);
+ }
+ writer.close();
+ update.appendManifest(writer.toManifestFile());
Review Comment:

There are two issues here:
1. `createManifestWriter` is not a standard Iceberg API and is not defined
in this class, which will cause a compilation error.
2. The `ManifestWriter` is not closed within a `try-finally` or
`try-with-resources` block. If an exception occurs while adding files, the
writer will leak resources.
We can resolve both issues by using the standard `ManifestFiles.write` API
from Iceberg and wrapping the write loop in a `try-finally` block to ensure the
writer is closed properly before retrieving the manifest file.
Note: You will need to import `org.apache.iceberg.ManifestFiles` and
`org.apache.iceberg.io.OutputFile`.
```java
OutputFile outputFile = io.newOutputFile(
table.location() + "/metadata/" + uuid + "-" + manifestIdx++ +
".avro");
ManifestWriter<DataFile> writer = ManifestFiles.write(spec,
outputFile);
try {
for (DataFile file : files) {
writer.add(file);
}
} finally {
writer.close();
}
update.appendManifest(writer.toManifestFile());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]