This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4c9f47d208 Kafka-connect: Handle namespace creation for auto table
creation (#10186)
4c9f47d208 is described below
commit 4c9f47d208b16921f825a66e24d0693f2b76b03b
Author: Ajantha Bhat <[email protected]>
AuthorDate: Thu May 16 00:44:20 2024 +0530
Kafka-connect: Handle namespace creation for auto table creation (#10186)
---
.../iceberg/connect/data/IcebergWriterFactory.java | 24 ++++++++++++++++++++++
.../connect/data/IcebergWriterFactoryTest.java | 19 ++++++++++++++---
2 files changed, 40 insertions(+), 3 deletions(-)
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
index 67d0e850e6..47dcddcb99 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java
@@ -18,14 +18,18 @@
*/
package org.apache.iceberg.connect.data;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Type;
@@ -83,6 +87,8 @@ public class IcebergWriterFactory {
org.apache.iceberg.Schema schema = new
org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);
+ createNamespaceIfNotExist(catalog, identifier.namespace());
+
List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
@@ -112,4 +118,22 @@ public class IcebergWriterFactory {
});
return result.get();
}
+
+ @VisibleForTesting
+ static void createNamespaceIfNotExist(Catalog catalog, Namespace
identifierNamespace) {
+ if (!(catalog instanceof SupportsNamespaces)) {
+ return;
+ }
+
+ String[] levels = identifierNamespace.levels();
+ for (int index = 0; index < levels.length; index++) {
+ Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index +
1));
+ try {
+ ((SupportsNamespaces) catalog).createNamespace(namespace);
+ } catch (AlreadyExistsException | ForbiddenException ex) {
+ // Ignoring the error as forcefully creating the namespace even if it
exists
+ // to avoid double namespaceExists() check.
+ }
+ }
+ }
}
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
index 93d1d2fa6b..ab8bbdd02c 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/IcebergWriterFactoryTest.java
@@ -21,13 +21,18 @@ package org.apache.iceberg.connect.data;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.TableSinkConfig;
@@ -47,7 +52,7 @@ public class IcebergWriterFactoryTest {
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
public void testAutoCreateTable(boolean partitioned) {
- Catalog catalog = mock(Catalog.class);
+ Catalog catalog = mock(Catalog.class,
withSettings().extraInterfaces(SupportsNamespaces.class));
when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such
table"));
TableSinkConfig tableConfig = mock(TableSinkConfig.class);
@@ -63,7 +68,7 @@ public class IcebergWriterFactoryTest {
when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data",
"foo2"));
IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
- factory.autoCreateTable("db.tbl", record);
+ factory.autoCreateTable("foo1.foo2.foo3.bar", record);
ArgumentCaptor<TableIdentifier> identCaptor =
ArgumentCaptor.forClass(TableIdentifier.class);
ArgumentCaptor<Schema> schemaCaptor =
ArgumentCaptor.forClass(Schema.class);
@@ -77,10 +82,18 @@ public class IcebergWriterFactoryTest {
specCaptor.capture(),
propsCaptor.capture());
- assertThat(identCaptor.getValue()).isEqualTo(TableIdentifier.of("db",
"tbl"));
+ assertThat(identCaptor.getValue())
+ .isEqualTo(TableIdentifier.of(Namespace.of("foo1", "foo2", "foo3"),
"bar"));
assertThat(schemaCaptor.getValue().findField("id").type()).isEqualTo(LongType.get());
assertThat(schemaCaptor.getValue().findField("data").type()).isEqualTo(StringType.get());
assertThat(specCaptor.getValue().isPartitioned()).isEqualTo(partitioned);
assertThat(propsCaptor.getValue()).containsKey("test-prop");
+
+ ArgumentCaptor<Namespace> namespaceCaptor =
ArgumentCaptor.forClass(Namespace.class);
+ verify((SupportsNamespaces) catalog,
times(3)).createNamespace(namespaceCaptor.capture());
+ List<Namespace> capturedArguments = namespaceCaptor.getAllValues();
+ assertThat(capturedArguments.get(0)).isEqualTo(Namespace.of("foo1"));
+ assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1",
"foo2"));
+ assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1",
"foo2", "foo3"));
}
}