gemini-code-assist[bot] commented on code in PR #38930:
URL: https://github.com/apache/beam/pull/38930#discussion_r3399242836
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java:
##########
@@ -130,24 +139,32 @@ public void processElement(
destination
.getFileFormat()
.addExtension(String.format("%s-%s", filePrefix,
UUID.randomUUID()));
-
- RecordWriter writer =
- new RecordWriter(table, destination.getFileFormat(), fileName,
partitionData);
- try {
+ System.out.println(partitionData + fileName);
Review Comment:

Remove the leftover debug `System.out.println` statement.
```suggestion
.addExtension(String.format("%s-%s", filePrefix,
UUID.randomUUID()));
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java:
##########
@@ -130,24 +139,32 @@ public void processElement(
destination
.getFileFormat()
.addExtension(String.format("%s-%s", filePrefix,
UUID.randomUUID()));
-
- RecordWriter writer =
- new RecordWriter(table, destination.getFileFormat(), fileName,
partitionData);
- try {
+ System.out.println(partitionData + fileName);
+
+ long maxFileSize =
+ PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ WindowedValue<IcebergDestination> windowedDestination =
+ WindowedValues.of(destination, window.maxTimestamp(), window,
paneInfo);
+ RecordWriterManager writer =
+ new RecordWriterManager(catalogConfig, filePrefix, maxFileSize,
Integer.MAX_VALUE);
+ try (writer) {
for (Row row : element.getValue()) {
- Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(),
row);
- writer.write(record);
+ writer.write(windowedDestination, row);
}
- } finally {
- writer.close();
}
- SerializableDataFile sdf =
SerializableDataFile.from(writer.getDataFile(), partitionPath);
- out.output(
- FileWriteResult.builder()
- .setTableIdentifier(destination.getTableIdentifier())
- .setSerializableDataFile(sdf)
- .build());
+ List<SerializableDataFile> serializableDataFiles =
+
checkStateNotNull(writer.getSerializableDataFiles().get(windowedDestination));
+ for (SerializableDataFile dataFile : serializableDataFiles) {
+ out.output(
+ FileWriteResult.builder()
+ .setTableIdentifier(destination.getTableIdentifier())
+ .setSerializableDataFile(dataFile)
+ .build());
+ }
Review Comment:

If `element.getValue()` is empty, no files will be written for this
destination, and `writer.getSerializableDataFiles().get(windowedDestination)`
will return `null`. Using `checkStateNotNull` will crash the pipeline in this
case. It is safer to perform a null check and skip outputting if no files were
written.
```java
List<SerializableDataFile> serializableDataFiles =
writer.getSerializableDataFiles().get(windowedDestination);
if (serializableDataFiles != null) {
for (SerializableDataFile dataFile : serializableDataFiles) {
out.output(
FileWriteResult.builder()
.setTableIdentifier(destination.getTableIdentifier())
.setSerializableDataFile(dataFile)
.build());
}
}
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java:
##########
@@ -130,24 +139,32 @@ public void processElement(
destination
.getFileFormat()
.addExtension(String.format("%s-%s", filePrefix,
UUID.randomUUID()));
-
- RecordWriter writer =
- new RecordWriter(table, destination.getFileFormat(), fileName,
partitionData);
- try {
+ System.out.println(partitionData + fileName);
+
+ long maxFileSize =
+ PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ WindowedValue<IcebergDestination> windowedDestination =
+ WindowedValues.of(destination, window.maxTimestamp(), window,
paneInfo);
+ RecordWriterManager writer =
+ new RecordWriterManager(catalogConfig, filePrefix, maxFileSize,
Integer.MAX_VALUE);
+ try (writer) {
for (Row row : element.getValue()) {
- Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(),
row);
- writer.write(record);
+ writer.write(windowedDestination, row);
}
- } finally {
- writer.close();
}
Review Comment:

Using `try (writer)` (try-with-resources with an existing variable) is a
Java 9+ feature. Since Apache Beam maintains Java 8 compatibility, this will
cause compilation failures. Use a standard `try-finally` block instead to
ensure compatibility and consistency with the rest of the codebase.
```suggestion
RecordWriterManager writer =
new RecordWriterManager(catalogConfig, filePrefix, maxFileSize,
Integer.MAX_VALUE);
try {
for (Row row : element.getValue()) {
writer.write(windowedDestination, row);
}
} finally {
writer.close();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]