This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 11188387fd Flink: Dynamic Sink: Handle NoSuchNamespaceException
properly (#14812)
11188387fd is described below
commit 11188387fd8e9b8f6944b760d294f262bc7bff32
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Dec 10 14:10:08 2025 +0100
Flink: Dynamic Sink: Handle NoSuchNamespaceException properly (#14812)
---
.../flink/sink/dynamic/TableMetadataCache.java | 5 ++-
.../flink/sink/dynamic/TestTableMetadataCache.java | 24 +++++++++++
.../flink/sink/dynamic/TestTableUpdater.java | 47 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 8a8362a419..e790d9a929 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.slf4j.Logger;
@@ -198,8 +199,8 @@ class TableMetadataCache {
Table table = catalog.loadTable(identifier);
update(identifier, table);
return EXISTS;
- } catch (NoSuchTableException e) {
- LOG.debug("Table doesn't exist {}", identifier, e);
+ } catch (NoSuchTableException | NoSuchNamespaceException e) {
+ LOG.debug("Table or namespace doesn't exist {}", identifier, e);
tableCache.put(
identifier, new CacheItem(cacheRefreshClock.millis(), false, null,
null, null, 1));
return Tuple2.of(false, e);
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index 7f91d2f8d5..72b420c345 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -130,4 +130,28 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
assertThat(cacheItem).isNotNull();
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
}
+
+ @Test
+ void testNoSuchNamespaceExceptionHandling() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier =
TableIdentifier.of("nonexistent_namespace", "myTable");
+ TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+
+ TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA, false);
+
+ assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
+ assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
+ }
+
+ @Test
+ void testNoSuchTableExceptionHandling() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier =
TableIdentifier.parse("default.nonexistent_table");
+ TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+
+ TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA, false);
+
+ assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
+ assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
+ }
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
index bcc5d80645..a49624a65e 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
import org.apache.iceberg.inmemory.InMemoryCatalog;
@@ -211,4 +212,50 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
assertThat(tableSchema.findField("data")).isNotNull();
assertThat(tableSchema.findField("extra")).isNull();
}
+
+ @Test
+ void testNamespaceAndTableCreation() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
+ TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace",
"myTable");
+ TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+
+
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse();
+ assertThat(catalog.tableExists(tableIdentifier)).isFalse();
+
+ Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+ tableUpdater.update(
+ tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(),
TableCreator.DEFAULT);
+
+
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isTrue();
+
+ assertThat(catalog.tableExists(tableIdentifier)).isTrue();
+ assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
+
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+ }
+
+ @Test
+ void testTableCreationWithExistingNamespace() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
+ Namespace namespace = Namespace.of("existing_namespace");
+ namespaceCatalog.createNamespace(namespace);
+
+ TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace",
"myTable");
+ TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+
+ assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
+ assertThat(catalog.tableExists(tableIdentifier)).isFalse();
+
+ Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+ tableUpdater.update(
+ tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(),
TableCreator.DEFAULT);
+
+ assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
+ assertThat(catalog.tableExists(tableIdentifier)).isTrue();
+ assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
+
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+ }
}