gemini-code-assist[bot] commented on code in PR #38149:
URL: https://github.com/apache/beam/pull/38149#discussion_r3091878336
##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -1201,4 +1202,44 @@ public void testGetOrCreateTable_refreshLogic() {
// Verify that refresh() WAS called exactly once because the entry was
stale.
verify(mockTable, times(1)).refresh();
}
+
+ /**
+ * Verifies that the shared FileIO survives across multiple bundles. This is
the core regression
+ * test: if RecordWriterManager.close() closed the FileIO, the second bundle
would fail with
+ * "Connection pool shut down".
+ */
+ @Test
+ public void testFileIOSurvivesAcrossBundles() throws IOException {
+ String tableName =
+ "table_survive_" + UUID.randomUUID().toString().replace("-",
"").substring(0, 6);
+ TableIdentifier tableId = TableIdentifier.of("default", tableName);
+
+ Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA);
+
+ CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io());
+ Table spyTable = Mockito.spy(realTable);
+ Mockito.doReturn(sharedIO).when(spyTable).io();
+
+ Catalog spyCatalog = Mockito.spy(catalog);
+ Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId);
+
+ WindowedValue<IcebergDestination> dest = getWindowedDestination(tableName,
null);
+ Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+
+ // Bundle 1: write and close
+ RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog,
"file_b1", 1000, 3);
+ assertTrue(bundle1.write(dest, row));
+ bundle1.close();
+ assertFalse("FileIO must survive after bundle 1 close", sharedIO.closed);
+ assertTrue(
+ "Bundle 1 should produce data files",
bundle1.getSerializableDataFiles().containsKey(dest));
+
+ // Bundle 2: write and close using the same catalog (simulates DoFn reuse)
+ RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog,
"file_b2", 1000, 3);
+ assertTrue(bundle2.write(dest, row));
+ bundle2.close();
+ assertFalse("FileIO must survive after bundle 2 close", sharedIO.closed);
+ assertTrue(
+ "Bundle 2 should produce data files",
bundle2.getSerializableDataFiles().containsKey(dest));
+ }
Review Comment:

The new test `testFileIOSurvivesAcrossBundles` verifies that
`RecordWriterManager` no longer closes the `FileIO`. However, it does not
verify the intended new lifecycle (closing the catalog in `@Teardown`). It
would be beneficial to add a test case that simulates the full `DoFn`
lifecycle, including a call to a teardown method that closes the catalog, and
verifies that the `FileIO` is indeed closed at that point. This would ensure
that the resource is eventually released.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java:
##########
@@ -54,7 +54,8 @@
@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogConfig.class);
- private transient @MonotonicNonNull Catalog cachedCatalog;
+ private static final ConcurrentHashMap<IcebergCatalogConfig, Catalog>
CATALOG_CACHE =
+ new ConcurrentHashMap<>();
Review Comment:

The introduction of a static `CATALOG_CACHE` without an eviction policy or
lifecycle management creates a potential memory and resource leak. In a
long-running Beam pipeline, especially one with dynamic destinations or varying
catalog properties, this map will grow indefinitely. Since `Catalog` objects
often hold open network connections or file handles, these resources will not
be released until the worker JVM exits. Consider using a cache with a size
limit or expiration policy, or a reference-counting mechanism if sharing across
`DoFn` instances is required.
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java:
##########
@@ -75,27 +76,28 @@ public static Builder builder() {
public abstract Builder toBuilder();
- public org.apache.iceberg.catalog.Catalog catalog() {
- if (cachedCatalog == null) {
- String catalogName = getCatalogName();
- if (catalogName == null) {
- catalogName = "apache-beam-" +
ReleaseInfo.getReleaseInfo().getVersion();
- }
- Map<String, String> catalogProps = getCatalogProperties();
- if (catalogProps == null) {
- catalogProps = Maps.newHashMap();
- }
- Map<String, String> confProps = getConfigProperties();
- if (confProps == null) {
- confProps = Maps.newHashMap();
- }
- Configuration config = new Configuration();
- for (Map.Entry<String, String> prop : confProps.entrySet()) {
- config.set(prop.getKey(), prop.getValue());
- }
- cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName,
catalogProps, config);
+ public Catalog catalog() {
+ return CATALOG_CACHE.computeIfAbsent(this,
IcebergCatalogConfig::buildCatalog);
Review Comment:

Using a static cache for `Catalog` instances is risky when combined with the
`@Teardown` logic mentioned in the PR description. If multiple `DoFn` instances
share the same cached `Catalog`, the first `DoFn` to be torn down will call
`catalog.close()`, which will break other active `DoFn` instances in the same
worker that are still using that shared catalog. To safely share a `Catalog`
across `DoFn` instances, you should implement reference counting or keep the
catalog instance-scoped to the `DoFn` (e.g., by reverting to the `transient`
field approach and ensuring the `DoFn` manages its own catalog instance).
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -190,17 +190,15 @@ private void appendManifestFiles(Table table,
Iterable<FileWriteResult> fileWrit
int specId = entry.getKey();
List<DataFile> files = entry.getValue();
PartitionSpec spec =
Preconditions.checkStateNotNull(specs.get(specId));
- ManifestWriter<DataFile> writer;
- try (FileIO io = table.io()) {
- writer = createManifestWriter(table.location(), uuid, spec, io);
- for (DataFile file : files) {
- writer.add(file);
- committedDataFileByteSize.update(file.fileSizeInBytes());
- committedDataFileRecordCount.update(file.recordCount());
- }
- writer.close();
- update.appendManifest(writer.toManifestFile());
+ FileIO io = table.io();
+ ManifestWriter<DataFile> writer =
createManifestWriter(table.location(), uuid, spec, io);
+ for (DataFile file : files) {
+ writer.add(file);
+ committedDataFileByteSize.update(file.fileSizeInBytes());
+ committedDataFileRecordCount.update(file.recordCount());
}
+ writer.close();
+ update.appendManifest(writer.toManifestFile());
Review Comment:

The removal of the `try-with-resources` block for `FileIO` is consistent
with the goal of managing its lifecycle at the catalog level. However, the
`ManifestWriter` should still be closed reliably even if an exception occurs
during the loop. The current implementation skips `writer.close()` if an
exception is thrown, which could lead to incomplete files or resource leaks.
Consider using a `try-finally` block to ensure the writer is closed.
```java
FileIO io = table.io();
ManifestWriter<DataFile> writer =
createManifestWriter(table.location(), uuid, spec, io);
try {
for (DataFile file : files) {
writer.add(file);
committedDataFileByteSize.update(file.fileSizeInBytes());
committedDataFileRecordCount.update(file.recordCount());
}
} finally {
writer.close();
}
update.appendManifest(writer.toManifestFile());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]