This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 399200af7cb HIVE-29391: Enable HMS REST Catalog to scale independently
of HMS (#6270)
399200af7cb is described below
commit 399200af7cb11cf6ee3329ebdabe17792e5e7e85
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Sun Feb 15 08:50:05 2026 -0500
HIVE-29391: Enable HMS REST Catalog to scale independently of HMS (#6270)
---
.../hive/cli/TestStandaloneRESTCatalogServer.java | 227 +++++++++++++++++++++
.../org/apache/iceberg/rest/HMSCatalogFactory.java | 11 +-
.../standalone/StandaloneRESTCatalogServer.java | 208 +++++++++++++++++++
.../hadoop/hive/metastore/HiveMetaStore.java | 12 +-
4 files changed, 455 insertions(+), 3 deletions(-)
diff --git
a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java
new file mode 100644
index 00000000000..a5ec398d4b2
--- /dev/null
+++
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.iceberg.rest.standalone.StandaloneRESTCatalogServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for Standalone REST Catalog Server.
+ *
+ * Tests that the standalone server can:
+ * 1. Start independently of HMS
+ * 2. Connect to an external HMS instance
+ * 3. Serve REST Catalog requests
+ * 4. Provide health check endpoint
+ */
+public class TestStandaloneRESTCatalogServer {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStandaloneRESTCatalogServer.class);
+
+ private Configuration hmsConf;
+ private Configuration restCatalogConf;
+ private int hmsPort;
+ private StandaloneRESTCatalogServer restCatalogServer;
+ private File warehouseDir;
+ private File hmsTempDir;
+
+ @Before
+ public void setup() throws Exception {
+ // Setup temporary directories
+ hmsTempDir = new File(System.getProperty("java.io.tmpdir"), "test-hms-" +
System.currentTimeMillis());
+ hmsTempDir.mkdirs();
+ warehouseDir = new File(hmsTempDir, "warehouse");
+ warehouseDir.mkdirs();
+
+ // Configure and start embedded HMS
+ hmsConf = MetastoreConf.newMetastoreConf();
+ MetaStoreTestUtils.setConfForStandloneMode(hmsConf);
+
+ String jdbcUrl = String.format("jdbc:derby:memory:%s;create=true",
+ new File(hmsTempDir, "metastore_db").getAbsolutePath());
+ MetastoreConf.setVar(hmsConf, ConfVars.CONNECT_URL_KEY, jdbcUrl);
+ MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE,
warehouseDir.getAbsolutePath());
+ MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE_EXTERNAL,
warehouseDir.getAbsolutePath());
+
+ // Start HMS
+ hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry(
+ HadoopThriftAuthBridge.getBridge(), hmsConf, true, false, false,
false);
+ LOG.info("Started embedded HMS on port: {}", hmsPort);
+
+ // Configure standalone REST Catalog server
+ restCatalogConf = MetastoreConf.newMetastoreConf();
+ String hmsUri = "thrift://localhost:" + hmsPort;
+ MetastoreConf.setVar(restCatalogConf, ConfVars.THRIFT_URIS, hmsUri);
+ MetastoreConf.setVar(restCatalogConf, ConfVars.WAREHOUSE,
warehouseDir.getAbsolutePath());
+ MetastoreConf.setVar(restCatalogConf, ConfVars.WAREHOUSE_EXTERNAL,
warehouseDir.getAbsolutePath());
+
+ // Configure REST Catalog servlet
+ int restPort = MetaStoreTestUtils.findFreePort();
+ MetastoreConf.setLongVar(restCatalogConf, ConfVars.CATALOG_SERVLET_PORT,
restPort);
+ MetastoreConf.setVar(restCatalogConf,
ConfVars.ICEBERG_CATALOG_SERVLET_PATH, "iceberg");
+ MetastoreConf.setVar(restCatalogConf, ConfVars.CATALOG_SERVLET_AUTH,
"none");
+
+ // Start standalone REST Catalog server
+ restCatalogServer = new StandaloneRESTCatalogServer(restCatalogConf);
+ restCatalogServer.start();
+ LOG.info("Started standalone REST Catalog server on port: {}",
restCatalogServer.getPort());
+ }
+
+ @After
+ public void teardown() {
+ if (restCatalogServer != null) {
+ restCatalogServer.stop();
+ }
+ if (hmsPort > 0) {
+ MetaStoreTestUtils.close(hmsPort);
+ }
+ if (hmsTempDir != null && hmsTempDir.exists()) {
+ deleteDirectory(hmsTempDir);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testHealthCheck() throws Exception {
+ LOG.info("=== Test: Health Check ===");
+
+ String healthUrl = "http://localhost:" + restCatalogServer.getPort() +
"/health";
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpGet request = new HttpGet(healthUrl);
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ assertEquals("Health check should return 200", 200,
response.getStatusLine().getStatusCode());
+ LOG.info("Health check passed");
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRESTCatalogConfig() throws Exception {
+ LOG.info("=== Test: REST Catalog Config Endpoint ===");
+
+ String configUrl = restCatalogServer.getRestEndpoint() + "/v1/config";
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpGet request = new HttpGet(configUrl);
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ assertEquals("Config endpoint should return 200", 200,
response.getStatusLine().getStatusCode());
+
+ String responseBody = EntityUtils.toString(response.getEntity());
+ LOG.info("Config response: {}", responseBody);
+ // ConfigResponse should contain endpoints, defaults, and overrides
+ assertTrue("Response should contain endpoints",
responseBody.contains("endpoints"));
+ assertTrue("Response should be valid JSON",
responseBody.startsWith("{") && responseBody.endsWith("}"));
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRESTCatalogNamespaceOperations() throws Exception {
+ LOG.info("=== Test: REST Catalog Namespace Operations ===");
+
+ String namespacesUrl = restCatalogServer.getRestEndpoint() +
"/v1/namespaces";
+ String namespaceName = "testdb";
+
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ // List namespaces (before creation)
+ HttpGet listRequest = new HttpGet(namespacesUrl);
+ listRequest.setHeader("Content-Type", "application/json");
+ try (CloseableHttpResponse response = httpClient.execute(listRequest)) {
+ assertEquals("List namespaces should return 200", 200,
response.getStatusLine().getStatusCode());
+ }
+
+ // Create namespace - REST Catalog API requires JSON body with namespace
array
+ HttpPost createRequest = new HttpPost(namespacesUrl);
+ createRequest.setHeader("Content-Type", "application/json");
+ String jsonBody = "{\"namespace\":[\"" + namespaceName + "\"]}";
+ createRequest.setEntity(new StringEntity(jsonBody, "UTF-8"));
+
+ try (CloseableHttpResponse response = httpClient.execute(createRequest))
{
+ assertEquals("Create namespace should return 200", 200,
response.getStatusLine().getStatusCode());
+ }
+
+ // Verify namespace exists by checking it in the list
+ HttpGet listAfterRequest = new HttpGet(namespacesUrl);
+ listAfterRequest.setHeader("Content-Type", "application/json");
+ try (CloseableHttpResponse response =
httpClient.execute(listAfterRequest)) {
+ assertEquals("List namespaces after creation should return 200",
+ 200, response.getStatusLine().getStatusCode());
+
+ String responseBody = EntityUtils.toString(response.getEntity());
+ LOG.info("Namespaces list response: {}", responseBody);
+ assertTrue("Response should contain created namespace",
responseBody.contains(namespaceName));
+ }
+
+ // Verify namespace exists by getting it directly
+ String getNamespaceUrl = restCatalogServer.getRestEndpoint() +
"/v1/namespaces/" + namespaceName;
+ HttpGet getRequest = new HttpGet(getNamespaceUrl);
+ getRequest.setHeader("Content-Type", "application/json");
+ try (CloseableHttpResponse response = httpClient.execute(getRequest)) {
+ assertEquals("Get namespace should return 200",
+ 200, response.getStatusLine().getStatusCode());
+ String responseBody = EntityUtils.toString(response.getEntity());
+ LOG.info("Get namespace response: {}", responseBody);
+ assertTrue("Response should contain namespace",
responseBody.contains(namespaceName));
+ }
+ }
+
+ LOG.info("Namespace operations passed");
+ }
+
+ @Test(timeout = 60000)
+ public void testServerPort() {
+ LOG.info("=== Test: Server Port ===");
+ assertTrue("Server port should be > 0", restCatalogServer.getPort() > 0);
+ assertNotNull("REST endpoint should not be null",
restCatalogServer.getRestEndpoint());
+ LOG.info("Server port: {}, Endpoint: {}", restCatalogServer.getPort(),
restCatalogServer.getRestEndpoint());
+ }
+
+ private void deleteDirectory(File directory) {
+ if (directory.exists()) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ }
+ directory.delete();
+ }
+ }
+}
diff --git
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
index 4b085e9d34c..d21f239f341 100644
---
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
+++
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
@@ -69,9 +69,14 @@ public String getPath() {
*/
private Catalog createCatalog() {
final Map<String, String> properties = new TreeMap<>();
- MetastoreConf.setVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS,
"");
final String configUri = MetastoreConf.getVar(configuration,
MetastoreConf.ConfVars.THRIFT_URIS);
- if (configUri != null) {
+ // Clear THRIFT_URIS so HiveCatalog doesn't accidentally use Thrift
connection
+ // when REST Catalog is embedded in HMS (same JVM). HiveCatalog reads from
Configuration
+ // as fallback, so clearing it ensures it uses embedded connection when
"uri" is not set.
+ MetastoreConf.setVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS,
"");
+ // Only set "uri" property if THRIFT_URIS was configured (standalone mode)
+ // This tells HiveCatalog to use Thrift connection to external HMS
+ if (configUri != null && !configUri.isEmpty()) {
properties.put("uri", configUri);
}
final String configWarehouse = MetastoreConf.getVar(configuration,
MetastoreConf.ConfVars.WAREHOUSE);
@@ -81,6 +86,8 @@ private Catalog createCatalog() {
final String configExtWarehouse = MetastoreConf.getVar(configuration,
MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL);
if (configExtWarehouse != null) {
properties.put("external-warehouse", configExtWarehouse);
+ // HiveCatalog reads this property directly from Configuration, not from
properties map
+
configuration.set(MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL.getHiveName(),
configExtWarehouse);
}
if (configuration.get(SERVLET_ID_KEY) != null) {
// For the testing purpose. HiveCatalog caches a metastore client in a
static field. As our tests can spin up
diff --git
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java
new file mode 100644
index 00000000000..79c89b2cae8
--- /dev/null
+++
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.rest.standalone;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.ServletServerBuilder;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.iceberg.rest.HMSCatalogFactory;
+import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Standalone REST Catalog Server.
+ *
+ * <p>This server runs independently of HMS and provides a REST API for
Iceberg catalog operations.
+ * It connects to an external HMS instance via Thrift.
+ *
+ * <p>Designed for Kubernetes deployment with load balancer/API gateway in
front:
+ * <pre>
+ * Client → Load Balancer/API Gateway → StandaloneRESTCatalogServer → HMS
+ * </pre>
+ *
+ * <p>Multiple instances can run behind a Kubernetes Service for load
balancing.
+ */
+public class StandaloneRESTCatalogServer {
+ private static final Logger LOG =
LoggerFactory.getLogger(StandaloneRESTCatalogServer.class);
+
+ private final Configuration conf;
+ private Server server;
+ private int port;
+
+ public StandaloneRESTCatalogServer(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Starts the standalone REST Catalog server.
+ */
+ public void start() {
+ // Validate required configuration
+ String thriftUris = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS);
+ if (thriftUris == null || thriftUris.isEmpty()) {
+ throw new IllegalArgumentException("metastore.thrift.uris must be
configured to connect to HMS");
+ }
+
+ int servletPort = MetastoreConf.getIntVar(conf,
ConfVars.CATALOG_SERVLET_PORT);
+ String servletPath = MetastoreConf.getVar(conf,
ConfVars.ICEBERG_CATALOG_SERVLET_PATH);
+
+ if (servletPath == null || servletPath.isEmpty()) {
+ servletPath = "iceberg"; // Default path
+ MetastoreConf.setVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH,
servletPath);
+ }
+
+ LOG.info("Starting Standalone REST Catalog Server");
+ LOG.info(" HMS Thrift URIs: {}", thriftUris);
+ LOG.info(" Servlet Port: {}", servletPort);
+ LOG.info(" Servlet Path: /{}", servletPath);
+
+ // Create servlet using factory
+ ServletServerBuilder.Descriptor catalogDescriptor =
HMSCatalogFactory.createServlet(conf);
+ if (catalogDescriptor == null) {
+ throw new IllegalStateException("Failed to create REST Catalog servlet.
" +
+ "Check that metastore.catalog.servlet.port and
metastore.iceberg.catalog.servlet.path are configured.");
+ }
+
+ // Create health check servlet
+ HealthCheckServlet healthServlet = new HealthCheckServlet();
+
+ // Build and start server
+ ServletServerBuilder builder = new ServletServerBuilder(conf);
+ builder.addServlet(catalogDescriptor);
+ builder.addServlet(servletPort, "health", healthServlet);
+
+ server = builder.start(LOG);
+ if (server == null || !server.isStarted()) {
+ // Server failed to start - likely a port conflict
+ throw new IllegalStateException(String.format(
+ "Failed to start REST Catalog server on port %d. Port may already be
in use. ", servletPort));
+ }
+
+ // Get actual port (may be auto-assigned)
+ port = catalogDescriptor.getPort();
+ LOG.info("Standalone REST Catalog Server started successfully on port {}",
port);
+ LOG.info(" REST Catalog endpoint: http://localhost:{}/{}", port,
servletPath);
+ LOG.info(" Health check endpoint: http://localhost:{}/health", port);
+ }
+
+ /**
+ * Stops the server.
+ */
+ public void stop() {
+ if (server != null && server.isStarted()) {
+ try {
+ LOG.info("Stopping Standalone REST Catalog Server");
+ server.stop();
+ server.join();
+ LOG.info("Standalone REST Catalog Server stopped");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Server stop interrupted", e);
+ } catch (Exception e) {
+ LOG.error("Error stopping server", e);
+ }
+ }
+ }
+
+ /**
+ * Gets the port the server is listening on.
+ * @return the port number
+ */
+ @VisibleForTesting
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Gets the REST Catalog endpoint URL.
+ * @return the endpoint URL
+ */
+ public String getRestEndpoint() {
+ String servletPath = MetastoreConf.getVar(conf,
ConfVars.ICEBERG_CATALOG_SERVLET_PATH);
+ if (servletPath == null || servletPath.isEmpty()) {
+ servletPath = "iceberg";
+ }
+ return "http://localhost:" + port + "/" + servletPath;
+ }
+
+ /**
+ * Simple health check servlet for Kubernetes readiness/liveness probes.
+ */
+ private static final class HealthCheckServlet extends HttpServlet {
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
+ try {
+ resp.setContentType("application/json");
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.getWriter().println("{\"status\":\"healthy\"}");
+ } catch (IOException e) {
+ LOG.warn("Failed to write health check response", e);
+ }
+ }
+ }
+
+ /**
+ * Main method for running as a standalone application.
+ * @param args command line arguments
+ */
+ public static void main(String[] args) {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+
+ // Load configuration from command line args or environment
+ // Format: -Dkey=value or use system properties
+ for (String arg : args) {
+ if (arg.startsWith("-D")) {
+ String[] kv = arg.substring(2).split("=", 2);
+ if (kv.length == 2) {
+ conf.set(kv[0], kv[1]);
+ }
+ }
+ }
+
+ StandaloneRESTCatalogServer server = new StandaloneRESTCatalogServer(conf);
+
+ // Add shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ LOG.info("Shutdown hook triggered");
+ server.stop();
+ }));
+
+ try {
+ server.start();
+ LOG.info("Server running. Press Ctrl+C to stop.");
+
+ // Keep server running
+ server.server.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Server stop interrupted", e);
+ } catch (Exception e) {
+ LOG.error("Failed to start server", e);
+ System.exit(1);
+ }
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 46c20ce985c..2ffe2351dee 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -727,11 +727,21 @@ public static void startMetaStore(int port,
HadoopThriftAuthBridge bridge,
// optionally create and start the property and Iceberg REST server
ServletServerBuilder builder = new ServletServerBuilder(conf);
ServletServerBuilder.Descriptor properties =
builder.addServlet(PropertyServlet.createServlet(conf));
- builder.addServlet(createCatalogServlet(conf));
+ ServletServerBuilder.Descriptor catalogDescriptor =
builder.addServlet(createCatalogServlet(conf));
+
servletServer = builder.start(LOG);
if (servletServer != null && properties != null) {
propertyServletPort = properties.getPort();
}
+
+ // If REST Catalog server was required but failed to start, fail HMS
startup
+ int servletPort = MetastoreConf.getIntVar(conf,
ConfVars.CATALOG_SERVLET_PORT);
+ boolean restCatalogRequired = catalogDescriptor != null && servletPort >=
0;
+
+ if (restCatalogRequired && servletServer == null) {
+ throw new IllegalStateException(String.format(
+ "Failed to start embedded REST Catalog server on port %d. Port may
already be in use.", servletPort));
+ }
// main server
thriftServer.start();