This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 6ecb23b [Fix] Fixed the problem that retry may get stuck when fe
hangs up (#331)
6ecb23b is described below
commit 6ecb23b69a2e39d9a8fc0556b0716bdc6fec7851
Author: wudi <[email protected]>
AuthorDate: Tue Jul 22 17:57:37 2025 +0800
[Fix] Fixed the problem that retry may get stuck when fe hangs up (#331)
---
.../doris/spark/client/DorisBackendHttpClient.java | 10 +++-
.../doris/spark/client/DorisFrontendClient.java | 8 +++-
.../apache/doris/spark/client/entity/Backend.java | 4 ++
.../client/write/AbstractStreamLoadProcessor.java | 3 +-
.../doris/spark/client/write/DorisWriter.java | 7 ++-
.../java/org/apache/doris/spark/util/HttpUtil.java | 56 ++++++++++++++++++++++
.../apache/doris/spark/write/DorisDataWriter.scala | 4 +-
7 files changed, 84 insertions(+), 8 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
index 54b5659..1f28ae5 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
@@ -18,6 +18,7 @@
package org.apache.doris.spark.client;
import org.apache.doris.spark.client.entity.Backend;
+import org.apache.doris.spark.util.HttpUtil;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
@@ -46,15 +47,22 @@ public class DorisBackendHttpClient implements Serializable
{
Exception ex = null;
for (Backend backend : backends) {
try {
- return reqFunc.apply(backend, httpClient);
+ if(HttpUtil.tryHttpConnection(backend.hostHttpPortString())){
+ return reqFunc.apply(backend, httpClient);
+ }
} catch (Exception e) {
log.warn("Failed to execute request on backend: {}:{}",
backend.getHost(), backend.getHttpPort(), e);
ex = e;
}
}
+
+ if (ex == null) {
+ ex = new Exception("All backends failed to execute request.");
+ }
throw ex;
}
+
public void close() {
if (httpClient != null) {
try {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index cefb063..b0d1dc1 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -26,6 +26,7 @@ import
org.apache.doris.spark.exception.OptionRequiredException;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.HttpUtil;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
@@ -157,12 +158,17 @@ public class DorisFrontendClient implements Serializable {
Exception ex = null;
for (Frontend frontEnd : frontEnds) {
try {
- return reqFunc.apply(frontEnd, httpClient);
+ if(HttpUtil.tryHttpConnection(frontEnd.hostHttpPortString())){
+ return reqFunc.apply(frontEnd, httpClient);
+ }
} catch (Exception e) {
LOG.warn("fe http request on {} failed, err: {}",
frontEnd.hostHttpPortString(), e.getMessage());
ex = e;
}
}
+ if (ex == null) {
+ ex = new Exception("All frontends failed to execute request.");
+ }
throw ex;
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
index 36b8e73..5a4ce6c 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Backend.java
@@ -77,4 +77,8 @@ public class Backend implements Serializable {
return String.format("%s:%d", host, rpcPort);
}
+ public String hostHttpPortString() {
+ return host + ":" + httpPort;
+ }
+
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index f64d201..c9a7bb9 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -90,8 +90,6 @@ public abstract class AbstractStreamLoadProcessor<R> extends
DorisWriter<R> impl
private byte[] lineDelimiter;
private String groupCommit;
private PipedOutputStream output;
- private boolean createNewBatch = true;
- private boolean isFirstRecordOfBatch = true;
private transient ExecutorService executor;
private Future<StreamLoadResponse> requestFuture = null;
@@ -426,6 +424,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
logger.error("stream load exception", e);
unexpectedException = e;
currentThread.interrupt();
+ throw e;
}
return streamLoadResponse;
});
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
index ada89c7..7ba7f9b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/DorisWriter.java
@@ -25,8 +25,9 @@ import java.io.Serializable;
public abstract class DorisWriter<R> implements Serializable {
protected int batchSize;
-
protected int currentBatchCount = 0;
+ protected boolean createNewBatch = true;
+ protected boolean isFirstRecordOfBatch = true;
public DorisWriter(int batchSize) {
if (batchSize <= 0) {
@@ -50,7 +51,9 @@ public abstract class DorisWriter<R> implements Serializable {
}
public void resetBatchCount() {
- currentBatchCount = 0;
+ this.currentBatchCount = 0;
+ this.createNewBatch = true;
+ this.isFirstRecordOfBatch = true;
}
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
new file mode 100644
index 0000000..fe3228c
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/HttpUtil.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class HttpUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpUtil.class);
+
+ public static boolean tryHttpConnection(String host) {
+ try {
+ LOG.debug("try to connect host {}", host);
+ host = "http://" + host;
+ URL url = new URL(host);
+ HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.setConnectTimeout(60000);
+ connection.setReadTimeout(60000);
+ int responseCode = connection.getResponseCode();
+ String responseMessage = connection.getResponseMessage();
+ connection.disconnect();
+ if (responseCode < 500) {
+ // code greater than 500 means a server-side exception.
+ return true;
+ }
+ LOG.warn(
+ "Failed to connect host {}, responseCode={}, msg={}",
+ host,
+ responseCode,
+ responseMessage);
+ return false;
+ } catch (Exception ex) {
+ LOG.warn("Failed to connect to host:{}, cause {}", host,
ex.getMessage());
+ return false;
+ }
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index bc92a93..0020c3d 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -84,10 +84,10 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
Retry.exec[Unit, Exception](retries,
Duration.ofMillis(retryIntervalMs.toLong), log) {
if (isRetrying) {
// retrying, reload data from buffer
- do {
+ while (writer.getBatchCount < recordBuffer.size){
val idx = writer.getBatchCount
writer.load(recordBuffer(idx))
- } while (writer.getBatchCount < recordBuffer.size)
+ }
isRetrying = false
}
if (writer.endOfBatch()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]