This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 11188387fd Flink: Dynamic Sink: Handle NoSuchNamespaceException 
properly (#14812)
11188387fd is described below

commit 11188387fd8e9b8f6944b760d294f262bc7bff32
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Dec 10 14:10:08 2025 +0100

    Flink: Dynamic Sink: Handle NoSuchNamespaceException properly (#14812)
---
 .../flink/sink/dynamic/TableMetadataCache.java     |  5 ++-
 .../flink/sink/dynamic/TestTableMetadataCache.java | 24 +++++++++++
 .../flink/sink/dynamic/TestTableUpdater.java       | 47 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 8a8362a419..e790d9a929 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.slf4j.Logger;
@@ -198,8 +199,8 @@ class TableMetadataCache {
       Table table = catalog.loadTable(identifier);
       update(identifier, table);
       return EXISTS;
-    } catch (NoSuchTableException e) {
-      LOG.debug("Table doesn't exist {}", identifier, e);
+    } catch (NoSuchTableException | NoSuchNamespaceException e) {
+      LOG.debug("Table or namespace doesn't exist {}", identifier, e);
       tableCache.put(
           identifier, new CacheItem(cacheRefreshClock.millis(), false, null, 
null, null, 1));
       return Tuple2.of(false, e);
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index 7f91d2f8d5..72b420c345 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -130,4 +130,28 @@ public class TestTableMetadataCache extends 
TestFlinkIcebergSinkBase {
     assertThat(cacheItem).isNotNull();
     assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
   }
+
+  @Test
+  void testNoSuchNamespaceExceptionHandling() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = 
TableIdentifier.of("nonexistent_namespace", "myTable");
+    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+
+    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA, false);
+
+    assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
+    assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
+  }
+
+  @Test
+  void testNoSuchTableExceptionHandling() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    TableIdentifier tableIdentifier = 
TableIdentifier.parse("default.nonexistent_table");
+    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+
+    TableMetadataCache.ResolvedSchemaInfo result = 
cache.schema(tableIdentifier, SCHEMA, false);
+
+    assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
+    assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
+  }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
index bcc5d80645..a49624a65e 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
 import org.apache.iceberg.inmemory.InMemoryCatalog;
@@ -211,4 +212,50 @@ public class TestTableUpdater extends 
TestFlinkIcebergSinkBase {
     assertThat(tableSchema.findField("data")).isNotNull();
     assertThat(tableSchema.findField("extra")).isNull();
   }
+
+  @Test
+  void testNamespaceAndTableCreation() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
+    TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace", 
"myTable");
+    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+
+    
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse();
+    assertThat(catalog.tableExists(tableIdentifier)).isFalse();
+
+    Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+        tableUpdater.update(
+            tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), 
TableCreator.DEFAULT);
+
+    
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isTrue();
+
+    assertThat(catalog.tableExists(tableIdentifier)).isTrue();
+    assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
+    
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+  }
+
+  @Test
+  void testTableCreationWithExistingNamespace() {
+    Catalog catalog = CATALOG_EXTENSION.catalog();
+    SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
+    Namespace namespace = Namespace.of("existing_namespace");
+    namespaceCatalog.createNamespace(namespace);
+
+    TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace", 
"myTable");
+    TableMetadataCache cache = new TableMetadataCache(catalog, 10, 
Long.MAX_VALUE, 10);
+    TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+
+    assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
+    assertThat(catalog.tableExists(tableIdentifier)).isFalse();
+
+    Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+        tableUpdater.update(
+            tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), 
TableCreator.DEFAULT);
+
+    assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
+    assertThat(catalog.tableExists(tableIdentifier)).isTrue();
+    assertThat(result.f0.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
+    
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+  }
 }

Reply via email to