This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 10e9697d8a8 [fix](routine-load) fix get kafka offset timeout may too
long (#33502) (#33760)
10e9697d8a8 is described below
commit 10e9697d8a8e1845619260c5fec6ebb82cf61c31
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Apr 17 16:15:30 2024 +0800
[fix](routine-load) fix get kafka offset timeout may too long (#33502)
(#33760)
---
be/src/runtime/routine_load/data_consumer.cpp | 20 ++++++++++++++------
be/src/runtime/routine_load/data_consumer.h | 4 ++--
.../routine_load/routine_load_task_executor.cpp | 10 ++++++----
.../routine_load/routine_load_task_executor.h | 6 ++++--
be/src/service/internal_service.cpp | 7 +++++--
.../java/org/apache/doris/common/util/KafkaUtil.java | 9 +++++----
gensrc/proto/internal_service.proto | 1 +
7 files changed, 37 insertions(+), 20 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 3706e31fb49..4d751751003 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -336,7 +336,7 @@ Status
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
// corresponding partition.
// See librdkafka/rdkafkacpp.h##offsetsForTimes()
Status KafkaDataConsumer::get_offsets_for_times(const
std::vector<PIntegerPair>& times,
- std::vector<PIntegerPair>*
offsets) {
+ std::vector<PIntegerPair>*
offsets, int timeout) {
// create topic partition
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (const auto& entry : times) {
@@ -351,8 +351,8 @@ Status KafkaDataConsumer::get_offsets_for_times(const
std::vector<PIntegerPair>&
}};
// get offsets for times
- RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions,
5000);
- if (err != RdKafka::ERR_NO_ERROR) {
+ RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions,
timeout);
+ if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
std::stringstream ss;
ss << "failed to get offsets for times: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
@@ -371,13 +371,21 @@ Status KafkaDataConsumer::get_offsets_for_times(const
std::vector<PIntegerPair>&
// get latest offsets for given partitions
Status KafkaDataConsumer::get_latest_offsets_for_partitions(
- const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>*
offsets) {
+ const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>*
offsets,
+ int timeout) {
+ MonotonicStopWatch watch;
+ watch.start();
for (int32_t partition_id : partition_ids) {
int64_t low = 0;
int64_t high = 0;
+ auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() /
1000 / 1000);
+ if (UNLIKELY(timeout_ms <= 0)) {
+ return Status::InternalError("get kafka latest offsets for
partitions timeout");
+ }
+
RdKafka::ErrorCode err =
- _k_consumer->query_watermark_offsets(_topic, partition_id,
&low, &high, 5000);
- if (err != RdKafka::ERR_NO_ERROR) {
+ _k_consumer->query_watermark_offsets(_topic, partition_id,
&low, &high, timeout_ms);
+ if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
std::stringstream ss;
ss << "failed to get latest offset for partition: " << partition_id
<< ", err: " << RdKafka::err2str(err);
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index 2a3f65f1993..596817b6b5d 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -151,10 +151,10 @@ public:
Status get_partition_meta(std::vector<int32_t>* partition_ids);
// get offsets for times
Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
- std::vector<PIntegerPair>* offsets);
+ std::vector<PIntegerPair>* offsets, int
timeout);
// get latest offsets for partitions
Status get_latest_offsets_for_partitions(const std::vector<int32_t>&
partition_ids,
- std::vector<PIntegerPair>*
offsets);
+ std::vector<PIntegerPair>*
offsets, int timeout);
private:
std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 53260252f4d..7226dcfa484 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -128,7 +128,8 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
}
Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
- const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>*
partition_offsets) {
+ const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>*
partition_offsets,
+ int timeout) {
CHECK(request.has_kafka_info());
// This context is meaningless, just for unifing the interface
@@ -140,7 +141,7 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
Status st =
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
std::vector<PIntegerPair>(request.offset_times().begin(),
request.offset_times().end()),
- partition_offsets);
+ partition_offsets, timeout);
if (st.ok()) {
_data_consumer_pool.return_consumer(consumer);
}
@@ -148,7 +149,8 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
}
Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
- const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>*
partition_offsets) {
+ const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>*
partition_offsets,
+ int timeout) {
CHECK(request.has_kafka_info());
// This context is meaningless, just for unifing the interface
@@ -163,7 +165,7 @@ Status
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
->get_latest_offsets_for_partitions(
std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(),
request.partition_id_for_latest_offsets().end()),
- partition_offsets);
+ partition_offsets, timeout);
if (st.ok()) {
_data_consumer_pool.return_consumer(consumer);
}
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 90c1a06400f..cc10f4a3bff 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -58,10 +58,12 @@ public:
std::vector<int32_t>* partition_ids);
Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest&
request,
- std::vector<PIntegerPair>*
partition_offsets);
+ std::vector<PIntegerPair>*
partition_offsets,
+ int timeout);
Status get_kafka_latest_offsets_for_partitions(const
PKafkaMetaProxyRequest& request,
- std::vector<PIntegerPair>*
partition_offsets);
+ std::vector<PIntegerPair>*
partition_offsets,
+ int timeout);
private:
// execute the task
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 6d19a278a68..1aa5771afc0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -850,6 +850,7 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
// Currently it supports 2 kinds of requests:
// 1. get all kafka partition ids for given topic
// 2. get all kafka partition offsets for given topic and timestamp.
+ int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 5 * 1000;
if (request->has_kafka_meta_request()) {
const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
if (!kafka_request.partition_id_for_latest_offsets().empty()) {
@@ -857,7 +858,8 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
std::vector<PIntegerPair> partition_offsets;
Status st = _exec_env->routine_load_task_executor()
->get_kafka_latest_offsets_for_partitions(
- request->kafka_meta_request(),
&partition_offsets);
+ request->kafka_meta_request(),
&partition_offsets,
+ timeout_ms);
if (st.ok()) {
PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
for (const auto& entry : partition_offsets) {
@@ -873,7 +875,8 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
std::vector<PIntegerPair> partition_offsets;
Status st = _exec_env->routine_load_task_executor()
->get_kafka_partition_offsets_for_times(
- request->kafka_meta_request(),
&partition_offsets);
+ request->kafka_meta_request(),
&partition_offsets,
+ timeout_ms);
if (st.ok()) {
PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
for (const auto& entry : partition_offsets) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 40041502ca8..6538c1fb040 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
+ private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
public static List<Integer> getAllKafkaPartitions(String brokerList,
String topic,
Map<String, String> convertedCustomProperties) throws
UserException {
@@ -124,11 +125,11 @@ public class KafkaUtil {
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
- metaRequestBuilder).build();
+
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
// get info
Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result = future.get(5,
TimeUnit.SECONDS);
+ InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get offsets for times: " +
result.getStatus().getErrorMsgsList());
@@ -182,11 +183,11 @@ public class KafkaUtil {
metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
}
InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
- metaRequestBuilder).build();
+
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
// get info
Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
- InternalService.PProxyResult result = future.get(5,
TimeUnit.SECONDS);
+ InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get latest offsets: " +
result.getStatus().getErrorMsgsList());
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index a1fd0e42a70..737362a96fa 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -369,6 +369,7 @@ message PKafkaMetaProxyRequest {
message PProxyRequest {
optional PKafkaMetaProxyRequest kafka_meta_request = 1;
+ optional int64 timeout_secs = 2;
};
message PKafkaMetaProxyResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]