ahmedabu98 commented on code in PR #37782:
URL: https://github.com/apache/beam/pull/37782#discussion_r2896013329
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java:
##########
@@ -74,11 +71,9 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
- // Keep FileIO open for the lifetime of this writer to avoid
- // premature shutdown of underlying client pools (e.g., S3),
- // which manifests as "Connection pool shut down" (Issue #36438).
- this.io = table.io();
- OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+ // table.io() returns the catalog's shared FileIO instance.
Review Comment:
nit to avoid confusion
```suggestion
// table.io() returns the shared FileIO instance.
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java:
##########
@@ -403,33 +406,50 @@ public boolean write(WindowedValue<IcebergDestination>
icebergDestination, Row r
*/
@Override
public void close() throws IOException {
- for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
- windowedDestinationAndState : destinations.entrySet()) {
- DestinationState state = windowedDestinationAndState.getValue();
+ try {
+ for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
+ windowedDestinationAndState : destinations.entrySet()) {
+ DestinationState state = windowedDestinationAndState.getValue();
- // removing writers from the state's cache will trigger the logic to
collect each writer's
- // data file.
- state.writers.invalidateAll();
- // first check for any exceptions swallowed by the cache
- if (!state.exceptions.isEmpty()) {
- IllegalStateException exception =
- new IllegalStateException(
- String.format("Encountered %s failed writer(s).",
state.exceptions.size()));
- for (Exception e : state.exceptions) {
- exception.addSuppressed(e);
+ // removing writers from the state's cache will trigger the logic to
collect each writer's
+ // data file.
+ state.writers.invalidateAll();
+ // first check for any exceptions swallowed by the cache
+ if (!state.exceptions.isEmpty()) {
+ IllegalStateException exception =
+ new IllegalStateException(
+ String.format("Encountered %s failed writer(s).",
state.exceptions.size()));
+ for (Exception e : state.exceptions) {
+ exception.addSuppressed(e);
+ }
+ throw exception;
}
- throw exception;
- }
- if (state.dataFiles.isEmpty()) {
- continue;
- }
+ if (state.dataFiles.isEmpty()) {
+ continue;
+ }
- totalSerializableDataFiles.put(
- windowedDestinationAndState.getKey(), new
ArrayList<>(state.dataFiles));
- state.dataFiles.clear();
+ totalSerializableDataFiles.put(
+ windowedDestinationAndState.getKey(), new
ArrayList<>(state.dataFiles));
+ state.dataFiles.clear();
+ }
+ } finally {
+ // Close unique FileIO instances now that all writers are done.
+ // table.io() returns the catalog's shared FileIO; we deduplicate by
identity
Review Comment:
nit to avoid confusion
```suggestion
// table.io() returns the shared FileIO; we deduplicate by identity
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]