This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 328f3c6  [improve] Check the status of fe and be before connect (#311)
328f3c6 is described below

commit 328f3c6d5ec2a9b906b2858d73528ae3bb9f3298
Author: Hong Liu <[email protected]>
AuthorDate: Tue Jan 30 16:40:35 2024 +0800

    [improve] Check the status of fe and be before connect (#311)
---
 .../java/org/apache/doris/flink/rest/RestService.java     |  9 ++++++++-
 .../java/org/apache/doris/flink/sink/BackendUtil.java     |  9 +++++----
 .../apache/doris/flink/sink/schema/SchemaManagerTest.java | 15 ++++++++++++++-
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0fcc01d..fcf31c3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -38,6 +38,7 @@ import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
@@ -280,7 +281,13 @@ public class RestService implements Serializable {
         }
         List<String> nodes = Arrays.asList(feNodes.split(","));
         Collections.shuffle(nodes);
-        return nodes.get(0).trim();
+        for (String feNode : nodes) {
+            if (BackendUtil.tryHttpConnection(feNode)) {
+                return feNode;
+            }
+        }
+        throw new DorisRuntimeException(
+                "No Doris FE is available, please check configuration or 
cluster status.");
     }
 
     /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index f909bb6..9a45ff0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -87,17 +87,18 @@ public class BackendUtil {
         throw new DorisRuntimeException("no available backend.");
     }
 
-    public static boolean tryHttpConnection(String backend) {
+    public static boolean tryHttpConnection(String host) {
         try {
-            backend = "http://"; + backend;
-            URL url = new URL(backend);
+            LOG.info("try to connect host {}", host);
+            host = "http://"; + host;
+            URL url = new URL(host);
             HttpURLConnection co = (HttpURLConnection) url.openConnection();
             co.setConnectTimeout(60000);
             co.connect();
             co.disconnect();
             return true;
         } catch (Exception ex) {
-            LOG.warn("Failed to connect to backend:{}", backend, ex);
+            LOG.warn("Failed to connect to host:{}", host, ex);
             return false;
         }
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 5b553fc..4563876 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.HttpEntityMock;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.http.ProtocolVersion;
@@ -28,6 +29,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,7 +72,8 @@ public class SchemaManagerTest {
 
     HttpEntityMock entityMock;
     SchemaChangeManager schemaChangeManager;
-    static MockedStatic<HttpClients> httpClientMockedStatic = 
mockStatic(HttpClients.class);
+    static MockedStatic<HttpClients> httpClientMockedStatic;
+    static MockedStatic<BackendUtil> backendUtilMockedStatic;
 
     @Before
     public void setUp() throws IOException {
@@ -89,7 +92,11 @@ public class SchemaManagerTest {
         when(httpResponse.getStatusLine()).thenReturn(normalLine);
         when(httpResponse.getEntity()).thenReturn(entityMock);
 
+        httpClientMockedStatic = mockStatic(HttpClients.class);
         httpClientMockedStatic.when(() -> 
HttpClients.createDefault()).thenReturn(httpClient);
+
+        backendUtilMockedStatic = mockStatic(BackendUtil.class);
+        backendUtilMockedStatic.when(() -> 
BackendUtil.tryHttpConnection(any())).thenReturn(true);
     }
 
     @Test
@@ -140,4 +147,10 @@ public class SchemaManagerTest {
         Assert.assertEquals(
                 "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` 
`col_new`", renameColumnDDL);
     }
+
+    @After
+    public void after() {
+        httpClientMockedStatic.close();
+        backendUtilMockedStatic.close();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to