gemini-code-assist[bot] commented on code in PR #39041:
URL: https://github.com/apache/beam/pull/39041#discussion_r3445369367
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -428,24 +429,24 @@ private Callable<ProcessResult> createProcessTask(
paneInfo);
}
- // Synchronize table initialization
- if (table == null) {
- synchronized (this) {
- if (table == null) {
- try {
- table = getOrCreateTable(filePath, format);
- } catch (FileNotFoundException e) {
- return new ProcessResult(
- null,
- Row.withSchema(ERROR_SCHEMA)
- .addValues(filePath, checkStateNotNull(e.getMessage()))
- .build(),
- timestamp,
- window,
- paneInfo);
- }
- }
- }
+ try {
+ table =
+ TableCache.get(
+ catalogConfig,
+ TableIdentifier.parse(identifier),
+ () -> loadOrCreateTable(filePath, format));
Review Comment:

Since we are now using `TableCache.get` (which is thread-safe and cached),
we no longer need to store the table in a shared instance field. Storing it in
a shared field without synchronization (as done here) introduces a data race
when multiple threads execute this concurrently. Declaring `table` as a local
variable completely avoids this issue. Please also remove the `table` field
from the enclosing class if it is no longer used elsewhere.
```java
Table table;
try {
table =
TableCache.get(
catalogConfig,
TableIdentifier.parse(identifier),
() -> loadOrCreateTable(filePath, format));
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -428,24 +429,24 @@ private Callable<ProcessResult> createProcessTask(
paneInfo);
}
- // Synchronize table initialization
- if (table == null) {
- synchronized (this) {
- if (table == null) {
- try {
- table = getOrCreateTable(filePath, format);
- } catch (FileNotFoundException e) {
- return new ProcessResult(
- null,
- Row.withSchema(ERROR_SCHEMA)
- .addValues(filePath, checkStateNotNull(e.getMessage()))
- .build(),
- timestamp,
- window,
- paneInfo);
- }
- }
- }
+ try {
+ table =
+ TableCache.get(
+ catalogConfig,
+ TableIdentifier.parse(identifier),
+ () -> loadOrCreateTable(filePath, format));
+ } catch (Exception e) {
+ Throwable cause = e.getCause();
+ return new ProcessResult(
+ null,
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(
+ filePath,
+ checkStateNotNull(cause != null ? cause.getMessage() :
e.getMessage()))
+ .build(),
+ timestamp,
+ window,
+ paneInfo);
}
Review Comment:

If `cause.getMessage()` or `e.getMessage()` is null (which can happen for
certain exceptions like `NullPointerException`), `checkStateNotNull` will throw
an `IllegalStateException`. This will crash the pipeline instead of returning a
proper `ProcessResult` with the error. We should fall back to `toString()` if
the message is null to ensure robustness.
```suggestion
} catch (Exception e) {
Throwable cause = e.getCause();
String errorMsg = cause != null ? cause.getMessage() :
e.getMessage();
if (errorMsg == null) {
errorMsg = cause != null ? cause.toString() : e.toString();
}
return new ProcessResult(
null,
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, errorMsg)
.build(),
timestamp,
window,
paneInfo);
}
```
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -522,30 +523,56 @@ static <W, T> T transformValue(Transform<W, T> transform,
Type type, ByteBuffer
return transform.bind(type).apply(Conversions.fromByteBuffer(type,
bytes));
}
- private Table getOrCreateTable(String filePath, FileFormat format) throws
IOException {
+ private Table loadOrCreateTable(String filePath, FileFormat format) throws
IOException {
+ Catalog catalog = catalogConfig.catalog();
TableIdentifier tableId = TableIdentifier.parse(identifier);
- try {
- return catalogConfig.catalog().loadTable(tableId);
- } catch (NoSuchTableException e) {
- try {
- org.apache.iceberg.Schema schema = getSchema(filePath, format);
- PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
schema);
- SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields, schema);
-
- Catalog.TableBuilder builder =
- catalogConfig
- .catalog()
- .buildTable(tableId, schema)
- .withPartitionSpec(spec)
- .withSortOrder(sortOrder);
- if (tableProps != null) {
- builder.withProperties(tableProps);
+ Namespace namespace = tableId.namespace();
+
+ // Create namespace if it does not exist yet
+ if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
+ SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
+ if (!supportsNamespaces.namespaceExists(namespace)) {
+ try {
+ supportsNamespaces.createNamespace(namespace);
+ LOG.info("Created new namespace '{}'.", namespace);
+ } catch (AlreadyExistsException ignored) {
+ // race condition: another worker already created this namespace
}
- return builder.create();
- } catch (AlreadyExistsException e2) { // if table already exists, just
load it
- return
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
}
}
+
+ // If table exists, just load it
+ // Note: the implementation of catalog.tableExists() will load the table
to check its
+ // existence. We don't use it here to avoid double loadTable() calls.
+ try {
+ return catalog.loadTable(tableId);
+ } catch (NoSuchTableException e) { // Otherwise, create the table
+ org.apache.iceberg.Schema tableSchema = getSchema(filePath, format);
+ PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
tableSchema);
+ SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields,
tableSchema);
+
+ Catalog.TableBuilder builder =
+ catalogConfig
+ .catalog()
+ .buildTable(tableId, tableSchema)
+ .withPartitionSpec(spec)
+ .withSortOrder(sortOrder);
+ if (tableProps != null) {
+ builder.withProperties(tableProps);
+ }
+ Table table = builder.create();
+ LOG.info(
+ "Created Iceberg table '{}' with schema: {}\n"
+ + ", partition spec: {}, sort order: {}, table properties: {}",
+ tableId,
+ tableSchema,
+ spec,
+ sortOrder,
+ tableProps);
+ return table;
+ } catch (AlreadyExistsException e2) { // if table already exists, just
load it
+ return
catalogConfig.catalog().loadTable(TableIdentifier.parse(identifier));
+ }
Review Comment:

In Java, an exception thrown inside a `catch` block is not caught by sibling
`catch` blocks of the same `try` statement. Here, if
`catalog.loadTable(tableId)` throws `NoSuchTableException`, we enter the first
`catch` block. If `builder.create()` then throws `AlreadyExistsException` (due
to concurrent table creation by another worker), it will not be caught by the
sibling `catch (AlreadyExistsException e2)` block, causing the pipeline to
fail. We should nest the table creation in its own `try-catch` block to
properly handle this race condition. Additionally, we can use the already
resolved `catalog` variable instead of calling `catalogConfig.catalog()` again.
```java
try {
return catalog.loadTable(tableId);
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = getSchema(filePath, format);
PartitionSpec spec = PartitionUtils.toPartitionSpec(partitionFields,
tableSchema);
SortOrder sortOrder = SortOrderUtils.toSortOrder(sortFields,
tableSchema);
Catalog.TableBuilder builder =
catalog
.buildTable(tableId, tableSchema)
.withPartitionSpec(spec)
.withSortOrder(sortOrder);
if (tableProps != null) {
builder.withProperties(tableProps);
}
try {
Table table = builder.create();
LOG.info(
"Created Iceberg table '{}' with schema: {}\n"
+ ", partition spec: {}, sort order: {}, table properties:
{}",
tableId,
tableSchema,
spec,
sortOrder,
tableProps);
return table;
} catch (AlreadyExistsException e2) { // if table already exists,
just load it
return catalog.loadTable(tableId);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]