This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c9f47d208 Kafka-connect: Handle namespace creation for auto table 
creation (#10186)
4c9f47d208 is described below

commit 4c9f47d208b16921f825a66e24d0693f2b76b03b
Author: Ajantha Bhat <[email protected]>
AuthorDate: Thu May 16 00:44:20 2024 +0530

    Kafka-connect: Handle namespace creation for auto table creation (#10186)
---
 .../iceberg/connect/data/IcebergWriterFactory.java | 24 ++++++++++++++++++++++
 .../connect/data/IcebergWriterFactoryTest.java     | 19 ++++++++++++++---
 2 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
index 67d0e850e6..47dcddcb99 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
@@ -18,14 +18,18 @@
  */
 package org.apache.iceberg.connect.data;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.connect.IcebergSinkConfig;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.types.Type;
@@ -83,6 +87,8 @@ public class IcebergWriterFactory {
     org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
     TableIdentifier identifier = TableIdentifier.parse(tableName);
 
+    createNamespaceIfNotExist(catalog, identifier.namespace());
+
     List<String> partitionBy = config.tableConfig(tableName).partitionBy();
     PartitionSpec spec;
     try {
@@ -112,4 +118,22 @@ public class IcebergWriterFactory {
             });
     return result.get();
   }
+
+  @VisibleForTesting
+  static void createNamespaceIfNotExist(Catalog catalog, Namespace 
identifierNamespace) {
+    if (!(catalog instanceof SupportsNamespaces)) {
+      return;
+    }
+
+    String[] levels = identifierNamespace.levels();
+    for (int index = 0; index < levels.length; index++) {
+      Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index + 
1));
+      try {
+        ((SupportsNamespaces) catalog).createNamespace(namespace);
+      } catch (AlreadyExistsException | ForbiddenException ex) {
+        // Ignoring the error as forcefully creating the namespace even if it 
exists
+        // to avoid double namespaceExists() check.
+      }
+    }
+  }
 }
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
index 93d1d2fa6b..ab8bbdd02c 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
@@ -21,13 +21,18 @@ package org.apache.iceberg.connect.data;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.connect.IcebergSinkConfig;
 import org.apache.iceberg.connect.TableSinkConfig;
@@ -47,7 +52,7 @@ public class IcebergWriterFactoryTest {
   @ValueSource(booleans = {true, false})
   @SuppressWarnings("unchecked")
   public void testAutoCreateTable(boolean partitioned) {
-    Catalog catalog = mock(Catalog.class);
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
     when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
 
     TableSinkConfig tableConfig = mock(TableSinkConfig.class);
@@ -63,7 +68,7 @@ public class IcebergWriterFactoryTest {
     when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", 
"foo2"));
 
     IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
-    factory.autoCreateTable("db.tbl", record);
+    factory.autoCreateTable("foo1.foo2.foo3.bar", record);
 
     ArgumentCaptor<TableIdentifier> identCaptor = 
ArgumentCaptor.forClass(TableIdentifier.class);
     ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
@@ -77,10 +82,18 @@ public class IcebergWriterFactoryTest {
             specCaptor.capture(),
             propsCaptor.capture());
 
-    assertThat(identCaptor.getValue()).isEqualTo(TableIdentifier.of("db", 
"tbl"));
+    assertThat(identCaptor.getValue())
+        .isEqualTo(TableIdentifier.of(Namespace.of("foo1", "foo2", "foo3"), 
"bar"));
     
assertThat(schemaCaptor.getValue().findField("id").type()).isEqualTo(LongType.get());
     
assertThat(schemaCaptor.getValue().findField("data").type()).isEqualTo(StringType.get());
     assertThat(specCaptor.getValue().isPartitioned()).isEqualTo(partitioned);
     assertThat(propsCaptor.getValue()).containsKey("test-prop");
+
+    ArgumentCaptor<Namespace> namespaceCaptor = 
ArgumentCaptor.forClass(Namespace.class);
+    verify((SupportsNamespaces) catalog, 
times(3)).createNamespace(namespaceCaptor.capture());
+    List<Namespace> capturedArguments = namespaceCaptor.getAllValues();
+    assertThat(capturedArguments.get(0)).isEqualTo(Namespace.of("foo1"));
+    assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", 
"foo2"));
+    assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", 
"foo2", "foo3"));
   }
 }

Reply via email to