This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 328f3c6 [improve] Check the status of fe and be before connect (#311)
328f3c6 is described below
commit 328f3c6d5ec2a9b906b2858d73528ae3bb9f3298
Author: Hong Liu <[email protected]>
AuthorDate: Tue Jan 30 16:40:35 2024 +0800
[improve] Check the status of fe and be before connect (#311)
---
.../java/org/apache/doris/flink/rest/RestService.java | 9 ++++++++-
.../java/org/apache/doris/flink/sink/BackendUtil.java | 9 +++++----
.../apache/doris/flink/sink/schema/SchemaManagerTest.java | 15 ++++++++++++++-
3 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0fcc01d..fcf31c3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -38,6 +38,7 @@ import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
@@ -280,7 +281,13 @@ public class RestService implements Serializable {
}
List<String> nodes = Arrays.asList(feNodes.split(","));
Collections.shuffle(nodes);
- return nodes.get(0).trim();
+ for (String feNode : nodes) {
+ if (BackendUtil.tryHttpConnection(feNode)) {
+ return feNode;
+ }
+ }
+ throw new DorisRuntimeException(
+ "No Doris FE is available, please check configuration or
cluster status.");
}
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index f909bb6..9a45ff0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -87,17 +87,18 @@ public class BackendUtil {
throw new DorisRuntimeException("no available backend.");
}
- public static boolean tryHttpConnection(String backend) {
+ public static boolean tryHttpConnection(String host) {
try {
- backend = "http://" + backend;
- URL url = new URL(backend);
+ LOG.info("try to connect host {}", host);
+ host = "http://" + host;
+ URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(60000);
co.connect();
co.disconnect();
return true;
} catch (Exception ex) {
- LOG.warn("Failed to connect to backend:{}", backend, ex);
+ LOG.warn("Failed to connect to host:{}", host, ex);
return false;
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 5b553fc..4563876 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.ProtocolVersion;
@@ -28,6 +29,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -70,7 +72,8 @@ public class SchemaManagerTest {
HttpEntityMock entityMock;
SchemaChangeManager schemaChangeManager;
- static MockedStatic<HttpClients> httpClientMockedStatic =
mockStatic(HttpClients.class);
+ static MockedStatic<HttpClients> httpClientMockedStatic;
+ static MockedStatic<BackendUtil> backendUtilMockedStatic;
@Before
public void setUp() throws IOException {
@@ -89,7 +92,11 @@ public class SchemaManagerTest {
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
+ httpClientMockedStatic = mockStatic(HttpClients.class);
httpClientMockedStatic.when(() ->
HttpClients.createDefault()).thenReturn(httpClient);
+
+ backendUtilMockedStatic = mockStatic(BackendUtil.class);
+ backendUtilMockedStatic.when(() ->
BackendUtil.tryHttpConnection(any())).thenReturn(true);
}
@Test
@@ -140,4 +147,10 @@ public class SchemaManagerTest {
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` RENAME COLUMN `col`
`col_new`", renameColumnDDL);
}
+
+ @After
+ public void after() {
+ httpClientMockedStatic.close();
+ backendUtilMockedStatic.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]