This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e7056c52ac4 [branch-2.0](routine-load) add retry when get Kafka meta
info and make get Kafka meta timeout configurable (#37458)
e7056c52ac4 is described below
commit e7056c52ac4d7c58b159810a7d94e4ba3bf2efae
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 16:59:32 2024 +0800
[branch-2.0](routine-load) add retry when get Kafka meta info and make get
Kafka meta timeout configurable (#37458)
---
be/src/service/internal_service.cpp | 2 +-
.../main/java/org/apache/doris/common/Config.java | 6 +
.../org/apache/doris/common/util/KafkaUtil.java | 144 ++++++++++-----------
3 files changed, 72 insertions(+), 80 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 3677c1210a5..9591c1928ee 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -933,7 +933,7 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
// Currently it supports 2 kinds of requests:
// 1. get all kafka partition ids for given topic
// 2. get all kafka partition offsets for given topic and timestamp.
- int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 5 * 1000;
+ int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 60 * 1000;
if (request->has_kafka_meta_request()) {
const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
if (!kafka_request.partition_id_for_latest_offsets().empty()) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index dcdcc7dd035..0b4aa1cfd3a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1141,6 +1141,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_num_per_be = 5;
+ /**
+ * the max timeout of get kafka meta.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int max_get_kafka_meta_timeout_second = 60;
+
/**
* The max number of files store in SmallFileMgr
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 60f423773e7..f6342e1a6fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -18,6 +18,7 @@
package org.apache.doris.common.util;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
@@ -41,25 +42,10 @@ import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
- private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
- private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
public static List<Integer> getAllKafkaPartitions(String brokerList,
String topic,
Map<String, String> convertedCustomProperties) throws
UserException {
- TNetworkAddress address = null;
- Backend be = null;
- long beId = -1L;
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get all partitions. No
alive backends");
- }
- Collections.shuffle(backendIds);
- beId = backendIds.get(0);
- be = Env.getCurrentSystemInfo().getBackend(beId);
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -71,21 +57,11 @@ public class KafkaUtil {
)
)
).build();
-
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get kafka partition info: "
+ result.getStatus().getErrorMsgsList());
- } else {
- return result.getKafkaMetaResult().getPartitionIdsList();
- }
+ return getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second)
+ .getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
- LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
- "Failed to get all partitions of kafka topic: " + topic +
" from backend[" + beId
- + "]. error: " + e.getMessage());
+ "Failed to get all partitions of kafka topic: " + topic +
" error: " + e.getMessage());
}
}
@@ -95,18 +71,10 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getOffsetsForTimes(String
brokerList, String topic,
Map<String, String> convertedCustomProperties, List<Pair<Integer,
Long>> timestampOffsets)
throws LoadException {
- TNetworkAddress address = null;
- LOG.debug("begin to get offsets for times of topic: {}, {}", topic,
timestampOffsets);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("begin to get offsets for times of topic: {}, {}",
topic, timestampOffsets);
+ }
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get offset for times. No
alive backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -127,23 +95,18 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second);
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get offsets for times: " +
result.getStatus().getErrorMsgsList());
- } else {
- List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
- List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
- for (InternalService.PIntegerPair pair : pairs) {
- partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
- }
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+ }
+ if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}",
topic, partitionOffsets);
- return partitionOffsets;
}
+ return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
@@ -154,19 +117,11 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID
taskId, String brokerList, String topic,
Map<String,
String> convertedCustomProperties,
List<Integer>
partitionIds) throws LoadException {
- TNetworkAddress address = null;
- LOG.debug("begin to get latest offsets for partitions {} in topic: {},
task {}, job {}",
- partitionIds, topic, taskId, jobId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("begin to get latest offsets for partitions {} in topic:
{}, task {}, job {}",
+ partitionIds, topic, taskId, jobId);
+ }
try {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (backendIds.isEmpty()) {
- throw new LoadException("Failed to get latest offsets. No
alive backends");
- }
- Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-
- // create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
@@ -185,28 +140,59 @@ public class KafkaUtil {
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second);
- // get info
- Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
- if (code != TStatusCode.OK) {
- throw new UserException("failed to get latest offsets: " +
result.getStatus().getErrorMsgsList());
- } else {
- List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
- List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
- for (InternalService.PIntegerPair pair : pairs) {
- partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
- }
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
+ }
+ if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in
topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
- return partitionOffsets;
}
+ return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
"Failed to get latest offsets of kafka topic: " + topic +
". error: " + e.getMessage());
}
}
+
+ private static InternalService.PProxyResult
getInfoRequest(InternalService.PProxyRequest request, int timeout)
+ throws LoadException {
+ int retryTimes = 0;
+ TNetworkAddress address = null;
+ Future<InternalService.PProxyResult> future = null;
+ InternalService.PProxyResult result = null;
+ while (retryTimes < 3) {
+ List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (backendIds.isEmpty()) {
+ throw new LoadException("Failed to get info. No alive
backends");
+ }
+ Collections.shuffle(backendIds);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+ try {
+ future = BackendServiceProxy.getInstance().getInfo(address,
request);
+ result = future.get(Config.max_get_kafka_meta_timeout_second,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("failed to get info request to " + address + " err "
+ e.getMessage());
+ retryTimes++;
+ continue;
+ }
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ LOG.warn("failed to get info request to "
+ + address + " err " +
result.getStatus().getErrorMsgsList());
+ retryTimes++;
+ } else {
+ return result;
+ }
+ }
+
+ throw new LoadException("Failed to get info");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]