This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2439452e1d [rest] Make http client reusable to reduce connection cost 
(#5145)
2439452e1d is described below

commit 2439452e1d97ae5e5ab1efa66280ef48cf8d0bd9
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Feb 25 14:08:08 2025 +0800

    [rest] Make http client reusable to reduce connection cost (#5145)
---
 docs/content/concepts/rest-catalog.md              | 44 ++++++------
 .../java/org/apache/paimon/rest/HttpClient.java    | 78 +++++-----------------
 .../org/apache/paimon/rest/HttpClientOptions.java  | 78 ----------------------
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  2 +-
 .../org/apache/paimon/rest/RESTCatalogOptions.java | 30 ---------
 .../org/apache/paimon/rest/HttpClientTest.java     | 10 +--
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  3 -
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  1 -
 8 files changed, 43 insertions(+), 203 deletions(-)

diff --git a/docs/content/concepts/rest-catalog.md 
b/docs/content/concepts/rest-catalog.md
index 55613f5107..7092b44941 100644
--- a/docs/content/concepts/rest-catalog.md
+++ b/docs/content/concepts/rest-catalog.md
@@ -51,23 +51,23 @@ Paimon REST Catalog provides a lightweight implementation 
to access the catalog
 ```sql
 CREATE CATALOG `paimon-rest-catalog`
 WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'bear'
-'token' = '<token>'
+    'type' = 'paimon',
+    'uri' = '<catalog server url>',
+    'metastore' = 'rest',
+    'token.provider' = 'bear'
+    'token' = '<token>'
 );
 ```
 - DLF ak
 ```sql
 CREATE CATALOG `paimon-rest-catalog`
 WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf',
-'dlf.accessKeyId'='<accessKeyId>',
-'dlf.accessKeySecret'='<accessKeySecret>',
+    'type' = 'paimon',
+    'uri' = '<catalog server url>',
+    'metastore' = 'rest',
+    'token.provider' = 'dlf',
+    'dlf.accessKeyId'='<accessKeyId>',
+    'dlf.accessKeySecret'='<accessKeySecret>',
 );
 ```
 
@@ -75,13 +75,13 @@ WITH (
 ```sql
 CREATE CATALOG `paimon-rest-catalog`
 WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf',
-'dlf.accessKeyId'='<accessKeyId>',
-'dlf.accessKeySecret'='<accessKeySecret>',
-'dlf.securityToken'='<securityToken>'
+    'type' = 'paimon',
+    'uri' = '<catalog server url>',
+    'metastore' = 'rest',
+    'token.provider' = 'dlf',
+    'dlf.accessKeyId'='<accessKeyId>',
+    'dlf.accessKeySecret'='<accessKeySecret>',
+    'dlf.securityToken'='<securityToken>'
 );
 ```
 
@@ -89,10 +89,10 @@ WITH (
 ```sql
 CREATE CATALOG `paimon-rest-catalog`
 WITH (
-'type' = 'paimon',
-'uri' = '<catalog server url>',
-'metastore' = 'rest',
-'token.provider' = 'dlf'
+    'type' = 'paimon',
+    'uri' = '<catalog server url>',
+    'metastore' = 'rest',
+    'token.provider' = 'dlf'
 );
 ```
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 8aa3da86d8..bc708d2d5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.rest;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.auth.RESTAuthFunction;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.RESTException;
@@ -29,8 +28,6 @@ import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 
-import okhttp3.ConnectionPool;
-import okhttp3.Dispatcher;
 import okhttp3.Headers;
 import okhttp3.MediaType;
 import okhttp3.OkHttpClient;
@@ -38,16 +35,12 @@ import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 
-import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -55,31 +48,31 @@ import static okhttp3.ConnectionSpec.CLEARTEXT;
 import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
 import static okhttp3.ConnectionSpec.MODERN_TLS;
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
-import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
 /** HTTP client for REST catalog. */
 public class HttpClient implements RESTClient {
 
-    private static final String THREAD_NAME = 
"REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
+    private static final OkHttpClient HTTP_CLIENT =
+            new OkHttpClient.Builder()
+                    .retryOnConnectionFailure(true)
+                    .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, 
CLEARTEXT))
+                    .addInterceptor(new ExponentialHttpRetryInterceptor(5))
+                    .connectTimeout(Duration.ofMinutes(3))
+                    .readTimeout(Duration.ofMinutes(3))
+                    .build();
+
     private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
-    private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;
 
-    private final OkHttpClient okHttpClient;
     private final String uri;
 
     private ErrorHandler errorHandler;
 
-    public HttpClient(Options options) {
-        this(HttpClientOptions.create(options));
-    }
-
-    public HttpClient(HttpClientOptions httpClientOptions) {
-        if (httpClientOptions.uri() != null && 
httpClientOptions.uri().endsWith("/")) {
-            this.uri = httpClientOptions.uri().substring(0, 
httpClientOptions.uri().length() - 1);
+    public HttpClient(String uri) {
+        if (uri != null && uri.endsWith("/")) {
+            this.uri = uri.substring(0, uri.length() - 1);
         } else {
-            this.uri = httpClientOptions.uri();
+            this.uri = uri;
         }
-        this.okHttpClient = createHttpClient(httpClientOptions);
         this.errorHandler = DefaultErrorHandler.getInstance();
     }
 
@@ -160,14 +153,8 @@ public class HttpClient implements RESTClient {
         }
     }
 
-    @Override
-    public void close() throws IOException {
-        okHttpClient.dispatcher().cancelAll();
-        okHttpClient.connectionPool().evictAll();
-    }
-
     private <T extends RESTResponse> T exec(Request request, Class<T> 
responseType) {
-        try (Response response = okHttpClient.newCall(request).execute()) {
+        try (Response response = HTTP_CLIENT.newCall(request).execute()) {
             String responseBodyStr = response.body() != null ? 
response.body().string() : null;
             if (!response.isSuccessful()) {
                 ErrorResponse error;
@@ -203,38 +190,6 @@ public class HttpClient implements RESTClient {
         return RequestBody.create(body.getBytes(StandardCharsets.UTF_8), 
MEDIA_TYPE);
     }
 
-    private static OkHttpClient createHttpClient(HttpClientOptions 
httpClientOptions) {
-        BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
-        ExecutorService executorService =
-                createCachedThreadPool(httpClientOptions.threadPoolSize(), 
THREAD_NAME, workQueue);
-        ConnectionPool connectionPool =
-                new ConnectionPool(
-                        httpClientOptions.maxConnections(),
-                        CONNECTION_KEEP_ALIVE_DURATION_MS,
-                        TimeUnit.MILLISECONDS);
-        Dispatcher dispatcher = new Dispatcher(executorService);
-        // set max requests per host use max connections
-        dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
-        OkHttpClient.Builder builder =
-                new OkHttpClient.Builder()
-                        .dispatcher(dispatcher)
-                        .retryOnConnectionFailure(true)
-                        .connectionPool(connectionPool)
-                        .connectionSpecs(Arrays.asList(MODERN_TLS, 
COMPATIBLE_TLS, CLEARTEXT))
-                        .addInterceptor(
-                                new ExponentialHttpRetryInterceptor(
-                                        httpClientOptions.maxRetries()));
-        httpClientOptions
-                .connectTimeout()
-                .ifPresent(
-                        timeoutDuration -> {
-                            builder.connectTimeout(timeoutDuration);
-                            builder.readTimeout(timeoutDuration);
-                        });
-
-        return builder.build();
-    }
-
     private String getRequestUrl(String path) {
         return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
     }
@@ -274,4 +229,7 @@ public class HttpClient implements RESTClient {
                                         ));
         return Pair.of(resourcePath, parameters);
     }
+
+    @Override
+    public void close() {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
deleted file mode 100644
index 548a989568..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.options.Options;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.Optional;
-
-/** Options for Http Client. */
-public class HttpClientOptions {
-
-    private final String uri;
-    @Nullable private final Duration connectTimeout;
-    private final int threadPoolSize;
-    private final int maxConnections;
-    private final int maxRetries;
-
-    public HttpClientOptions(
-            String uri,
-            @Nullable Duration connectTimeout,
-            int threadPoolSize,
-            int maxConnections,
-            int maxRetries) {
-        this.uri = uri;
-        this.connectTimeout = connectTimeout;
-        this.threadPoolSize = threadPoolSize;
-        this.maxConnections = maxConnections;
-        this.maxRetries = maxRetries;
-    }
-
-    public static HttpClientOptions create(Options options) {
-        return new HttpClientOptions(
-                options.get(RESTCatalogOptions.URI),
-                options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
-                options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
-                options.get(RESTCatalogOptions.MAX_CONNECTIONS),
-                options.get(RESTCatalogOptions.MAX_RETIES));
-    }
-
-    public String uri() {
-        return uri;
-    }
-
-    public Optional<Duration> connectTimeout() {
-        return Optional.ofNullable(connectTimeout);
-    }
-
-    public int threadPoolSize() {
-        return threadPoolSize;
-    }
-
-    public int maxConnections() {
-        return maxConnections;
-    }
-
-    public int maxRetries() {
-        return Math.max(maxRetries, 0);
-    }
-}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 5e43762ac8..fedaa7251d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -118,7 +118,7 @@ public class RESTCatalog implements Catalog {
     }
 
     public RESTCatalog(CatalogContext context, boolean configRequired) {
-        this.client = new HttpClient(context.options());
+        this.client = new 
HttpClient(context.options().get(RESTCatalogOptions.URI));
         AuthSession catalogAuth = createAuthSession(context.options(), 
tokenRefreshExecutor());
         Options options = context.options();
         Map<String, String> baseHeaders = Collections.emptyMap();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 310e3335ec..c9dd0fcfac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -32,30 +32,6 @@ public class RESTCatalogOptions {
                     .noDefaultValue()
                     .withDescription("REST Catalog server's uri.");
 
-    public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
-            ConfigOptions.key("rest.client.connection-timeout")
-                    .durationType()
-                    .defaultValue(Duration.ofSeconds(180))
-                    .withDescription("REST Catalog http client connect 
timeout.");
-
-    public static final ConfigOption<Integer> MAX_CONNECTIONS =
-            ConfigOptions.key("rest.client.max-connections")
-                    .intType()
-                    .defaultValue(100)
-                    .withDescription("REST Catalog http client's max 
connections.");
-
-    public static final ConfigOption<Integer> MAX_RETIES =
-            ConfigOptions.key("rest.client.max-retries")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription("REST Catalog http client's max retry 
times.");
-
-    public static final ConfigOption<Integer> THREAD_POOL_SIZE =
-            ConfigOptions.key("rest.client.num-threads")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("REST Catalog http client thread num.");
-
     public static final ConfigOption<String> TOKEN =
             ConfigOptions.key("token")
                     .stringType()
@@ -98,12 +74,6 @@ public class RESTCatalogOptions {
                     .noDefaultValue()
                     .withDescription("REST Catalog auth DLF security token");
 
-    public static final ConfigOption<String> DLF_ROLE_SESSION_NAME =
-            ConfigOptions.key("dlf.roleSessionName")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("REST Catalog auth DLF role session 
name");
-
     public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
             ConfigOptions.key("data-token.enabled")
                     .booleanType()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 8600637c88..9fdbd529ea 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -34,7 +34,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -63,14 +62,12 @@ public class HttpClientTest {
         server = new TestHttpWebServer(MOCK_PATH);
         server.start();
         errorHandler = DefaultErrorHandler.getInstance();
-        HttpClientOptions httpClientOptions =
-                new HttpClientOptions(server.getBaseUrl(), 
Duration.ofSeconds(3), 1, 10, 2);
         mockResponseData = new MockRESTData(MOCK_PATH);
         mockResponseDataStr = server.createResponseBody(mockResponseData);
         errorResponseStr =
                 server.createResponseBody(
                         new ErrorResponse(ErrorResponseResourceType.DATABASE, 
"test", "test", 400));
-        httpClient = new HttpClient(httpClientOptions);
+        httpClient = new HttpClient(server.getBaseUrl());
         httpClient.setErrorHandler(errorHandler);
         AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
         AuthSession authSession = new AuthSession(authProvider);
@@ -134,10 +131,7 @@ public class HttpClientTest {
 
     @Test
     public void testRetry() {
-        HttpClient httpClient =
-                new HttpClient(
-                        new HttpClientOptions(
-                                server.getBaseUrl(), Duration.ofSeconds(30), 
1, 10, 2));
+        HttpClient httpClient = new HttpClient(server.getBaseUrl());
         server.enqueueResponse(mockResponseDataStr, 429);
         server.enqueueResponse(mockResponseDataStr, 200);
         assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, 
restAuthFunction));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 64389378bf..a0d17057a2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -66,7 +66,6 @@ class RESTCatalogTest extends CatalogTestBase {
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         this.catalog = new RESTCatalog(CatalogContext.create(options));
     }
 
@@ -90,7 +89,6 @@ class RESTCatalogTest extends CatalogTestBase {
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, "aaaaa");
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
         assertThatThrownBy(() -> new 
RESTCatalog(CatalogContext.create(options)))
                 .isInstanceOf(NotAuthorizedException.class);
@@ -134,7 +132,6 @@ class RESTCatalogTest extends CatalogTestBase {
         Options options = new Options();
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
         options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         this.catalog = new RESTCatalog(CatalogContext.create(options));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 145fcd0ba3..dc202ec872 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -101,7 +101,6 @@ class RESTCatalogITCase extends CatalogITCaseBase {
         options.put("metastore", "rest");
         options.put(RESTCatalogOptions.URI.key(), serverUrl);
         options.put(RESTCatalogOptions.TOKEN.key(), initToken);
-        options.put(RESTCatalogOptions.THREAD_POOL_SIZE.key(), "" + 1);
         options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.BEAR.identifier());
         return options;
     }

Reply via email to