This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new dd18652861b [branch-2.1](routine-load) make get Kafka meta timeout
configurable (#37399)
dd18652861b is described below
commit dd18652861bc2879f230404d0285788d75a82b41
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 10:39:17 2024 +0800
[branch-2.1](routine-load) make get Kafka meta timeout configurable (#37399)
pick #36619
---
be/src/service/internal_service.cpp | 2 +-
.../main/java/org/apache/doris/common/Config.java | 6 ++++++
.../org/apache/doris/datasource/kafka/KafkaUtil.java | 20 ++++++++++----------
3 files changed, 17 insertions(+), 11 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index c6bedc630e8..f4831d08d29 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1190,7 +1190,7 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
// Currently it supports 2 kinds of requests:
// 1. get all kafka partition ids for given topic
// 2. get all kafka partition offsets for given topic and timestamp.
- int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 5 * 1000;
+ int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 60 * 1000;
if (request->has_kafka_meta_request()) {
const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
if (!kafka_request.offset_flags().empty()) {
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 94d5725c38a..1be7b871d68 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1181,6 +1181,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_num_per_be = 1024;
+ /**
+ * the max timeout of get kafka meta.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int max_get_kafka_meta_timeout_second = 60;
+
/**
* The max number of files store in SmallFileMgr
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 656ebf65152..c0c932bb8ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.kafka;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
@@ -42,8 +43,6 @@ import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
- private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
- private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;
public static List<Integer> getAllKafkaPartitions(String brokerList,
String topic,
Map<String, String> convertedCustomProperties) throws
UserException {
@@ -59,7 +58,8 @@ public class KafkaUtil {
)
)
).build();
- return getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
+ return getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second)
+ .getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic +
" error: " + e.getMessage());
@@ -96,8 +96,8 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
- InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
+
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -141,8 +141,8 @@ public class KafkaUtil {
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
- InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
+
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -201,8 +201,8 @@ public class KafkaUtil {
.setVal(pair.second).build());
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
- InternalService.PProxyResult result = getInfoRequest(request,
MAX_GET_OFFSET_TIMEOUT_SECOND);
+
metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
+ InternalService.PProxyResult result = getInfoRequest(request,
Config.max_get_kafka_meta_timeout_second);
List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
@@ -236,7 +236,7 @@ public class KafkaUtil {
try {
future = BackendServiceProxy.getInstance().getInfo(address,
request);
- result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND,
TimeUnit.SECONDS);
+ result = future.get(Config.max_get_kafka_meta_timeout_second,
TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err "
+ e.getMessage());
retryTimes++;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]