This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ae9f51b8289 Add embedded-tests for iceberg extension (#19143)
ae9f51b8289 is described below

commit ae9f51b8289da5d64a887cd49c8207b5bc41ea74
Author: Ben Smithgall <[email protected]>
AuthorDate: Mon Mar 16 00:20:03 2026 -0400

    Add embedded-tests for iceberg extension (#19143)
    
    Changes:
    - Add a new `IcebergRestCatalogResource` which uses the `iceberg-rest` 
docker image.
    - Add a new test `IcebergRestCatalogIngestionTest`
---
 embedded-tests/pom.xml                             | 186 ++++++++++++--------
 .../iceberg/IcebergRestCatalogIngestionTest.java   | 180 ++++++++++++++++++++
 .../iceberg/IcebergRestCatalogResource.java        | 187 +++++++++++++++++++++
 3 files changed, 486 insertions(+), 67 deletions(-)

diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index dc5ca44937d..4ae8a5a293e 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -29,6 +29,8 @@
 
   <properties>
     <surefire.rerunFailingTestsCount>0</surefire.rerunFailingTestsCount>
+    <!-- Must match iceberg.core.version in 
extensions-contrib/druid-iceberg-extensions/pom.xml -->
+    <iceberg.version>1.6.1</iceberg.version>
   </properties>
 
   <parent>
@@ -43,12 +45,6 @@
       <artifactId>druid-processing</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.druid</groupId>
-      <artifactId>druid-services</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-indexing-service</artifactId>
@@ -79,42 +75,6 @@
       <artifactId>druid-sql</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.druid.integration-tests</groupId>
-      <artifactId>druid-it-tools</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>mysql-metadata-storage</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>postgresql-metadata-storage</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>druid-s3-extensions</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>druid-protobuf-extensions</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>druid-avro-extensions</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.druid.extensions</groupId>
       <artifactId>druid-testcontainers</artifactId>
@@ -125,31 +85,6 @@
       <artifactId>druid-testing-tools</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions</groupId>
-      <artifactId>druid-basic-security</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions.contrib</groupId>
-      <artifactId>druid-exact-count-bitmap</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.druid.extensions.contrib</groupId>
-      <artifactId>druid-thrift-extensions</artifactId>
-      <version>${project.parent.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>com.googlecode.json-simple</groupId>
-          <artifactId>json-simple</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
@@ -249,12 +184,24 @@
       <version>${project.parent.version}</version>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-services</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-indexing-service</artifactId>
       <version>${project.parent.version}</version>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid.integration-tests</groupId>
+      <artifactId>druid-it-tools</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid.extensions</groupId>
       <artifactId>druid-kafka-indexing-service</artifactId>
@@ -268,6 +215,91 @@
       <version>${project.parent.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>mysql-metadata-storage</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>postgresql-metadata-storage</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-s3-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-protobuf-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-avro-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions</groupId>
+      <artifactId>druid-basic-security</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions.contrib</groupId>
+      <artifactId>druid-exact-count-bitmap</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions.contrib</groupId>
+      <artifactId>druid-thrift-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.googlecode.json-simple</groupId>
+          <artifactId>json-simple</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid.extensions.contrib</groupId>
+      <artifactId>druid-iceberg-extensions</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-api</artifactId>
+      <version>${iceberg.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-core</artifactId>
+      <version>${iceberg.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-parquet</artifactId>
+      <version>${iceberg.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <!-- Let parquet version be driven by druid-parquet-extensions 
(1.15.2) -->
+        <exclusion>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.druid.extensions.contrib</groupId>
       <artifactId>kafka-emitter</artifactId>
@@ -334,6 +366,14 @@
       <version>${parent.version}</version>
       <scope>test</scope>
     </dependency>
+    <!-- Pin protobuf-java-util to the version from druid-protobuf-extensions 
to satisfy
+         the enforcer's RequireUpperBoundDeps rule (google-cloud-storage 
brings a lower version). -->
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf.version}</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- Some embedded tests use the router and the web-console for debugging 
-->
     <dependency>
@@ -642,6 +682,18 @@
 
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <ignoredUsedUndeclaredDependencies>
+            <!-- Configuration class comes from hadoop-common, which is 
re-exported via hadoop-client-api.
+                 Declaring hadoop-common directly pulls in banned transitive 
dependencies. -->
+            
<ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common</ignoredUsedUndeclaredDependency>
+            
<ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-client-api</ignoredUsedUndeclaredDependency>
+          </ignoredUsedUndeclaredDependencies>
+        </configuration>
+      </plugin>
       <!-- Skip this module from jacoco coverage as it contains only tests -->
       <plugin>
         <groupId>org.jacoco</groupId>
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogIngestionTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogIngestionTest.java
new file mode 100644
index 00000000000..e8c61a197d4
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogIngestionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.parquet.ParquetExtensionsModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Ingestion test for Iceberg tables via a REST catalog.
+ * Exercises the classloader-sensitive {@code DynConstructors} path in
+ * {@code RestIcebergCatalog.setupCatalog()} (see #18015, #18017).
+ */
+public class IcebergRestCatalogIngestionTest extends EmbeddedClusterTestBase
+{
+  private static final String ICEBERG_NAMESPACE = "default";
+  private static final String ICEBERG_TABLE_NAME = "test_events";
+
+  private static final Schema ICEBERG_SCHEMA = new Schema(
+      Types.NestedField.required(1, "event_time", Types.StringType.get()),
+      Types.NestedField.required(2, "name", Types.StringType.get()),
+      Types.NestedField.required(3, "value", Types.LongType.get())
+  );
+
+  private final IcebergRestCatalogResource icebergCatalog = new 
IcebergRestCatalogResource();
+
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+      .setServerMemory(300_000_000L)
+      .addProperty("druid.worker.capacity", "2");
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addResource(icebergCatalog)
+        .addExtension(ParquetExtensionsModule.class)
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(indexer)
+        .addServer(broker)
+        .addServer(new EmbeddedHistorical());
+  }
+
+  @BeforeAll
+  public void setupIcebergTable() throws IOException
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+
+    icebergCatalog.createNamespace(ICEBERG_NAMESPACE);
+    final Table table = icebergCatalog.createTable(ICEBERG_NAMESPACE, 
ICEBERG_TABLE_NAME, ICEBERG_SCHEMA);
+
+    final ImmutableList<ImmutableMap<String, Object>> rows = ImmutableList.of(
+        ImmutableMap.of("event_time", "2024-01-01T00:00:00.000Z", "name", 
"alice", "value", 100L),
+        ImmutableMap.of("event_time", "2024-01-01T01:00:00.000Z", "name", 
"bob", "value", 200L),
+        ImmutableMap.of("event_time", "2024-01-01T02:00:00.000Z", "name", 
"charlie", "value", 300L)
+    );
+
+    final String filepath = table.location() + "/data/" + UUID.randomUUID() + 
".parquet";
+    final OutputFile file = table.io().newOutputFile(filepath);
+
+    final DataWriter<GenericRecord> writer;
+    try (DataWriter<GenericRecord> w = Parquet.writeData(file)
+                                              .schema(ICEBERG_SCHEMA)
+                                              
.createWriterFunc(GenericParquetWriter::buildWriter)
+                                              .overwrite()
+                                              .withSpec(table.spec())
+                                              .build()) {
+      for (final ImmutableMap<String, Object> row : rows) {
+        final GenericRecord record = GenericRecord.create(ICEBERG_SCHEMA);
+        record.set(0, row.get("event_time"));
+        record.set(1, row.get("name"));
+        record.set(2, row.get("value"));
+        w.write(record);
+      }
+      writer = w;
+    }
+
+    table.newAppend().appendFile(writer.toDataFile()).commit();
+  }
+
+  @AfterAll
+  public void tearDownIceberg()
+  {
+    icebergCatalog.dropTable(ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME);
+    icebergCatalog.dropNamespace(ICEBERG_NAMESPACE);
+  }
+
+  @Test
+  public void testIngestFromIcebergRestCatalog()
+  {
+    final String catalogUri = icebergCatalog.getCatalogUri();
+
+    final String sql = StringUtils.format(
+        "INSERT INTO %s\n"
+        + "SELECT\n"
+        + "  TIME_PARSE(\"event_time\") AS __time,\n"
+        + "  \"name\",\n"
+        + "  \"value\"\n"
+        + "FROM TABLE(\n"
+        + "  EXTERN(\n"
+        + "    '{\"type\":\"iceberg\","
+        + "\"tableName\":\"%s\","
+        + "\"namespace\":\"%s\","
+        + "\"icebergCatalog\":{\"type\":\"rest\",\"catalogUri\":\"%s\","
+        + 
"\"catalogProperties\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\"}},"
+        + "\"warehouseSource\":{\"type\":\"local\"}}',\n"
+        + "    '{\"type\":\"parquet\"}',\n"
+        + "    '[{\"type\":\"string\",\"name\":\"event_time\"},"
+        + "{\"type\":\"string\",\"name\":\"name\"},"
+        + "{\"type\":\"long\",\"name\":\"value\"}]'\n"
+        + "  )\n"
+        + ")\n"
+        + "PARTITIONED BY ALL TIME",
+        dataSource,
+        ICEBERG_TABLE_NAME,
+        ICEBERG_NAMESPACE,
+        catalogUri
+    );
+
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, \"name\", \"value\" FROM %s ORDER BY __time",
+        dataSource,
+        "2024-01-01T00:00:00.000Z,alice,100\n"
+        + "2024-01-01T01:00:00.000Z,bob,200\n"
+        + "2024-01-01T02:00:00.000Z,charlie,300"
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java
new file mode 100644
index 00000000000..caad4a7894b
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/iceberg/IcebergRestCatalogResource.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.iceberg;
+
+import org.apache.druid.iceberg.common.IcebergDruidModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.TestcontainerResource;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Testcontainer resource wrapping {@code tabulario/iceberg-rest} with a local
+ * filesystem warehouse shared via bind mount. The warehouse directory is
+ * obtained from the cluster's {@link 
org.apache.druid.testing.embedded.TestFolder}.
+ */
+public class IcebergRestCatalogResource extends 
TestcontainerResource<GenericContainer<?>>
+{
+  // 1.6.0 is the latest available tag; the REST protocol is compatible with 
the 1.6.1 client libraries
+  private static final String ICEBERG_REST_IMAGE = 
"tabulario/iceberg-rest:1.6.0";
+  private static final int REST_CATALOG_PORT = 8181;
+  private static final String CONTAINER_WAREHOUSE_PATH = 
"/tmp/iceberg-warehouse";
+
+  @Nullable
+  private File warehouseDir;
+
+  @Nullable
+  private RESTCatalog clientCatalog;
+
+  @Override
+  public void beforeStart(EmbeddedDruidCluster cluster)
+  {
+    warehouseDir = 
cluster.getTestFolder().getOrCreateFolder("iceberg-warehouse");
+  }
+
+  @Override
+  protected GenericContainer<?> createContainer()
+  {
+    if (warehouseDir == null) {
+      throw new IllegalStateException("warehouseDir has not been initialized; 
beforeStart() must be called first");
+    }
+    // Run as root so the container can write to the bind-mounted warehouse 
directory
+    // regardless of host directory ownership (the default image user is 
iceberg:iceberg).
+    return new GenericContainer<>(ICEBERG_REST_IMAGE)
+        .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"))
+        .withExposedPorts(REST_CATALOG_PORT)
+        .withFileSystemBind(warehouseDir.getAbsolutePath(), 
CONTAINER_WAREHOUSE_PATH, BindMode.READ_WRITE)
+        .withEnv("CATALOG_WAREHOUSE", CONTAINER_WAREHOUSE_PATH)
+        .withEnv("CATALOG_IO__IMPL", "org.apache.iceberg.hadoop.HadoopFileIO")
+        .waitingFor(Wait.forHttp("/v1/config").forPort(REST_CATALOG_PORT));
+  }
+
+  @Override
+  public void onStarted(EmbeddedDruidCluster cluster)
+  {
+    cluster.addExtension(IcebergDruidModule.class);
+  }
+
+  @Override
+  public void stop()
+  {
+    if (clientCatalog != null) {
+      try {
+        clientCatalog.close();
+      }
+      catch (Exception e) {
+        // Best-effort cleanup
+      }
+      clientCatalog = null;
+    }
+    super.stop();
+  }
+
+  /**
+   * Returns the REST catalog URI accessible from the host (via mapped port).
+   */
+  public String getCatalogUri()
+  {
+    ensureRunning();
+    return StringUtils.format(
+        "http://%s:%d";,
+        getContainer().getHost(),
+        getContainer().getMappedPort(REST_CATALOG_PORT)
+    );
+  }
+
+  /**
+   * Returns the host-side warehouse directory.
+   */
+  public File getWarehouseDir()
+  {
+    if (warehouseDir == null) {
+      throw new IllegalStateException("warehouseDir has not been initialized; 
beforeStart() must be called first");
+    }
+    return warehouseDir;
+  }
+
+  /**
+   * Creates a namespace in the REST catalog.
+   */
+  public void createNamespace(String namespace)
+  {
+    getClientCatalog().createNamespace(Namespace.of(namespace));
+  }
+
+  /**
+   * Drops a namespace from the REST catalog. 
+   */
+  public void dropNamespace(String namespace)
+  {
+    getClientCatalog().dropNamespace(Namespace.of(namespace));
+  }
+
+  /**
+   * Creates a table in the given namespace and returns it.
+   */
+  public Table createTable(String namespace, String tableName, Schema schema)
+  {
+    final TableIdentifier tableId = TableIdentifier.of(namespace, tableName);
+    return getClientCatalog().createTable(tableId, schema);
+  }
+
+  /**
+   * Drops a table from the REST catalog. Best-effort; ignores errors.
+   */
+  public void dropTable(String namespace, String tableName)
+  {
+    try {
+      getClientCatalog().dropTable(TableIdentifier.of(namespace, tableName));
+    }
+    catch (Exception e) {
+      // Best-effort cleanup
+    }
+  }
+
+  private RESTCatalog getClientCatalog()
+  {
+    ensureRunning();
+    if (clientCatalog == null) {
+      clientCatalog = createClientCatalog();
+    }
+    return clientCatalog;
+  }
+
+  private RESTCatalog createClientCatalog()
+  {
+    // RESTCatalog.initialize() may mutate the properties map, so a mutable 
map is required
+    final Map<String, String> properties = new HashMap<>();
+    properties.put(CatalogProperties.URI, getCatalogUri());
+    properties.put(CatalogProperties.WAREHOUSE_LOCATION, 
warehouseDir.getAbsolutePath());
+    properties.put(CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.hadoop.HadoopFileIO");
+
+    final RESTCatalog catalog = new RESTCatalog();
+    catalog.setConf(new org.apache.hadoop.conf.Configuration());
+    catalog.initialize("test-client", properties);
+    return catalog;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to