This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 2522df04b8ad9acc84090e45774dcf3f213e1c31 Author: Mingyu Chen <[email protected]> AuthorDate: Wed Oct 13 11:39:01 2021 +0800 [Optimize][RoutineLoad] Avoid sending tasks if there is no data to be consumed (#6805) 1 Avoid sending tasks if there is no data to be consumed By fetching latest offset of partition before sending tasks.(Fix [Optimize] Avoid too many abort task in routine load job #6803 ) 2 Add a preCheckNeedSchedule phase in update() of routine load. To avoid taking write lock of job for long time when getting all kafka partitions from kafka server. 3 Upgrade librdkafka's version to 1.7.0 to fix a bug of "Local: Unknown partition" See offsetsForTimes fails with 'Local: Unknown partition' edenhill/librdkafka#3295 4 Avoid unnecessary storage migration task if there is no that storage medium on BE. Fix [Bug] Too many unnecessary storage migration tasks #6804 --- be/src/runtime/routine_load/data_consumer.cpp | 29 +++++- be/src/runtime/routine_load/data_consumer.h | 3 + .../routine_load/routine_load_task_executor.cpp | 23 ++++- .../routine_load/routine_load_task_executor.h | 3 + be/src/service/internal_service.cpp | 18 +++- conf/fe.conf | 2 +- .../java/org/apache/doris/catalog/DiskInfo.java | 9 +- .../main/java/org/apache/doris/common/Config.java | 2 +- .../org/apache/doris/common/util/KafkaUtil.java | 70 +++++++++++++- .../apache/doris/journal/bdbje/BDBJEJournal.java | 2 +- .../load/routineload/KafkaRoutineLoadJob.java | 106 +++++++++++++++++---- .../doris/load/routineload/KafkaTaskInfo.java | 21 +++- .../doris/load/routineload/RoutineLoadJob.java | 23 +++-- .../doris/load/routineload/RoutineLoadManager.java | 20 ++-- .../load/routineload/RoutineLoadTaskInfo.java | 4 +- .../load/routineload/RoutineLoadTaskScheduler.java | 12 ++- .../org/apache/doris/master/ReportHandler.java | 10 ++ .../main/java/org/apache/doris/system/Backend.java | 4 + .../java/org/apache/doris/catalog/BackendTest.java | 11 ++- .../load/routineload/RoutineLoadManagerTest.java | 2 +- .../routineload/RoutineLoadTaskSchedulerTest.java | 2 +- gensrc/proto/internal_service.proto | 3 + thirdparty/vars.sh | 10 +- 23 files changed, 322 insertions(+), 67 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 34cba02..c387b07 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -172,7 +172,6 @@ Status KafkaDataConsumer::assign_topic_partitions( Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms) { - _last_visit_time = time(nullptr); int64_t left_time = max_running_time_ms; LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id << ", max running time(ms): " << left_time; @@ -338,7 +337,30 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& PIntegerPair pair; pair.set_key(topic_partition->partition()); pair.set_val(topic_partition->offset()); - offsets->push_back(pair); + offsets->push_back(std::move(pair)); + } + + return Status::OK(); +} + +// get latest offsets for given partitions +Status KafkaDataConsumer::get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, + std::vector<PIntegerPair>* offsets) { + for (int32_t partition_id : partition_ids) { + int64_t low = 0; + int64_t high = 0; + RdKafka::ErrorCode err = _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, 5000); + if (err != RdKafka::ERR_NO_ERROR) { + std::stringstream ss; + ss << "failed to get latest offset for partition: " << partition_id << ", err: " << RdKafka::err2str(err); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + PIntegerPair pair; + pair.set_key(partition_id); + pair.set_val(high); + offsets->push_back(std::move(pair)); } return Status::OK(); @@ -358,6 +380,9 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { Status KafkaDataConsumer::reset() { std::unique_lock<std::mutex> l(_lock); _cancelled = false; + // reset will be called before this consumer being returned to the pool. + // so update _last_visit_time is reasonable. + _last_visit_time = time(nullptr); return Status::OK(); } diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index fcbd017..17b1810 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -145,6 +145,9 @@ public: // get offsets for times Status get_offsets_for_times(const std::vector<PIntegerPair>& times, std::vector<PIntegerPair>* offsets); + // get latest offsets for partitions + Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, + std::vector<PIntegerPair>* offsets); private: std::string _brokers; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 0e7ae45..8cc68a6 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -39,7 +39,7 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) : _exec_env(exec_env), _thread_pool(config::routine_load_thread_pool_size, config::routine_load_thread_pool_size), - _data_consumer_pool(10) { + _data_consumer_pool(config::routine_load_thread_pool_size) { REGISTER_HOOK_METRIC(routine_load_task_count, [this]() { std::lock_guard<std::mutex> l(_lock); return _task_map.size(); @@ -126,6 +126,27 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const PKaf return st; } +Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(const PKafkaMetaProxyRequest& request, + std::vector<PIntegerPair>* partition_offsets) { + CHECK(request.has_kafka_info()); + + // This context is meaningless, just for unifing the interface + StreamLoadContext ctx(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); + + std::shared_ptr<DataConsumer> consumer; + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); + + Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_latest_offsets_for_partitions( + std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(), + request.partition_id_for_latest_offsets().end()), + partition_offsets); + if (st.ok()) { + _data_consumer_pool.return_consumer(consumer); + } + return st; +} + Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock<std::mutex> l(_lock); if (_task_map.find(task.id) != _task_map.end()) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index e5c7939..41f20b1 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -55,6 +55,9 @@ public: Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets); + Status get_kafka_latest_offsets_for_partitions(const PKafkaMetaProxyRequest& request, + std::vector<PIntegerPair>* partition_offsets); + private: // execute the task void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 89dd7ef..cd98ea0 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -208,7 +208,23 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll // 2. get all kafka partition offsets for given topic and timestamp. if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.offset_times().empty()) { + if (!kafka_request.partition_id_for_latest_offsets().empty()) { + // get latest offsets for specified partition ids + std::vector<PIntegerPair> partition_offsets; + Status st = + _exec_env->routine_load_task_executor()->get_kafka_latest_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } + } + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.offset_times().empty()) { // if offset_times() has elements, which means this request is to get offset by timestamp. std::vector<PIntegerPair> partition_offsets; Status st = diff --git a/conf/fe.conf b/conf/fe.conf index 9298619..723221d 100644 --- a/conf/fe.conf +++ b/conf/fe.conf @@ -58,7 +58,7 @@ mysql_service_nio_enabled = true # log_roll_size_mb = 1024 # sys_log_dir = ${DORIS_HOME}/log # sys_log_roll_num = 10 -# sys_log_verbose_modules = +# sys_log_verbose_modules = org.apache.doris # audit_log_dir = ${DORIS_HOME}/log # audit_log_modules = slow_query, query # audit_log_roll_num = 10 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index ee08790..e18b07f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -24,6 +24,8 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TStorageMedium; +import com.google.gson.annotations.SerializedName; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -31,8 +33,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import com.google.gson.annotations.SerializedName; - public class DiskInfo implements Writable { private static final Logger LOG = LogManager.getLogger(DiskInfo.class); @@ -53,7 +53,6 @@ public class DiskInfo implements Writable { private long diskAvailableCapacityB; @SerializedName("state") private DiskState state; - // path hash and storage medium are reported from Backend and no need to persist private long pathHash = 0; private TStorageMedium storageMedium; @@ -129,6 +128,10 @@ public class DiskInfo implements Writable { return pathHash != 0; } + public boolean isStorageMediumMatch(TStorageMedium storageMedium) { + return this.storageMedium == storageMedium; + } + public TStorageMedium getStorageMedium() { return storageMedium; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2b66c81..d319d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1168,7 +1168,7 @@ public class Config extends ConfigBase { * not work to avoid OOM. */ @ConfField(mutable = true, masterOnly = true) - public static long metadata_checkpoint_memory_threshold = 60; + public static long metadata_checkpoint_memory_threshold = 70; /** * If set to true, the checkpoint thread will make the checkpoint regardless of the jvm memory used percent. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 6691af6..ecbb897 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -144,7 +145,7 @@ public class KafkaUtil { InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { - throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); + throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList()); } else { List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); @@ -166,6 +167,73 @@ public class KafkaUtil { } } } + + public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic, + Map<String, String> convertedCustomProperties, + List<Integer> partitionIds) throws LoadException { + BackendService.Client client = null; + TNetworkAddress address = null; + LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", + partitionIds, topic, taskId, jobId); + boolean ok = false; + try { + List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get latest offsets. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + // create request + InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = + InternalService.PKafkaMetaProxyRequest.newBuilder() + .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() + .setBrokers(brokerList) + .setTopic(topic) + .addAllProperties( + convertedCustomProperties.entrySet().stream().map( + e -> InternalService.PStringPair.newBuilder() + .setKey(e.getKey()) + .setVal(e.getValue()) + .build() + ).collect(Collectors.toList()) + ) + ); + for (Integer partitionId : partitionIds) { + metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); + } + InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( + metaRequestBuilder).build(); + + // get info + Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); + InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList()); + } else { + List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList(); + List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.create(pair.getKey(), pair.getVal())); + } + LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", + partitionOffsets, topic, taskId, jobId); + return partitionOffsets; + } + } catch (Exception e) { + LOG.warn("failed to get latest offsets.", e); + throw new LoadException( + "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage()); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 553aa22..d1c29fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -446,4 +446,4 @@ public class BDBJEJournal implements Journal { return dbNames; } -} \ No newline at end of file +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 7a3b87b..1be8030 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -91,6 +91,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private Map<String, String> customProperties = Maps.newHashMap(); private Map<String, String> convertedCustomProperties = Maps.newHashMap(); + // The latest offset of each partition fetched from kafka server. + // Will be updated periodically by calling hasMoreDataToConsume() + private Map<Integer, Long> cachedPartitionWithLatestOffsets = Maps.newConcurrentMap(); + + // The kafka partition fetch from kafka server. + // Will be updated periodically by calling updateKafkaPartitions(); + private List<Integer> newCurrentKafkaPartition = Lists.newArrayList(); + public KafkaRoutineLoadJob() { // for serialization, id is dummy super(-1, LoadDataSourceType.KAFKA); @@ -281,6 +289,34 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { updateNewPartitionProgress(); } + @Override + protected void preCheckNeedSchedule() throws UserException { + // If user does not specify kafka partition, + // We will fetch partition from kafka server periodically + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (customKafkaPartitions == null && !customKafkaPartitions.isEmpty()) { + return; + } + updateKafkaPartitions(); + } + } + + private void updateKafkaPartitions() throws UserException { + try { + this.newCurrentKafkaPartition = getAllKafkaPartitions(); + } catch (Exception e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) + .build(), e); + if (this.state == JobState.NEED_SCHEDULE) { + unprotectUpdateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.PARTITIONS_ERR, + "Job failed to fetch all current partition with error " + e.getMessage()), + false /* not replay */); + } + } + } + // if customKafkaPartition is not null, then return false immediately // else if kafka partitions of topic has been changed, return true. // else return false @@ -294,24 +330,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { currentKafkaPartitions = customKafkaPartitions; return false; } else { - List<Integer> newCurrentKafkaPartition; - try { - newCurrentKafkaPartition = getAllKafkaPartitions(); - } catch (Exception e) { - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) - .build(), e); - if (this.state == JobState.NEED_SCHEDULE) { - unprotectUpdateState(JobState.PAUSED, - new ErrorReason(InternalErrorCode.PARTITIONS_ERR, - "Job failed to fetch all current partition with error " + e.getMessage()), - false /* not replay */); - } - return false; - } - if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { - currentKafkaPartitions = newCurrentKafkaPartition; + // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() + Preconditions.checkNotNull(this.newCurrentKafkaPartition); + if (currentKafkaPartitions.containsAll(this.newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { + currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) @@ -323,7 +346,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return false; } } else { - currentKafkaPartitions = newCurrentKafkaPartition; + currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) @@ -624,4 +647,49 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { LOG.error("failed to replay modify kafka routine load job: {}", id, e); } } + + // check if given partitions has more data to consume. + // 'partitionIdToOffset' to the offset to be consumed. + public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdToOffset) { + for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) { + if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) + && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { + // "entry.getValue()" is the offset to be consumed. + // "cachedPartitionWithLatestOffsets.get(entry.getKey())" is the "next" offset of this partition. + // (because librdkafa's query_watermark_offsets() will return the next offset. + // For example, there 4 msg in partition with offset 0,1,2,3, + // query_watermark_offsets() will return 4.) + LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", + partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); + return true; + } + } + + try { + // all offsets to be consumed are newer than offsets in cachedPartitionWithLatestOffsets, + // maybe the cached offset is out-of-date, fetch from kafka server again + List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, taskId, getBrokerList(), + getTopic(), getConvertedCustomProperties(), Lists.newArrayList(partitionIdToOffset.keySet())); + for (Pair<Integer, Long> pair : tmp) { + cachedPartitionWithLatestOffsets.put(pair.first, pair.second); + } + } catch (Exception e) { + LOG.warn("failed to get latest partition offset. {}", e.getMessage(), e); + return false; + } + + // check again + for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) { + if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) + && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { + LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", + partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); + return true; + } + } + + LOG.debug("no more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", + partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index f4d4d6e..c7dfd60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -20,6 +20,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -31,20 +32,28 @@ import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.gson.Gson; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; public class KafkaTaskInfo extends RoutineLoadTaskInfo { + private static final Logger LOG = LogManager.getLogger(KafkaTaskInfo.class); private RoutineLoadManager routineLoadManager = Catalog.getCurrentCatalog().getRoutineLoadManager(); - // <partitionId, beginOffsetOfPartitionId> + // <partitionId, offset to be consumed> private Map<Integer, Long> partitionIdToOffset; + // Last fetched and cached latest partition offsets. + private List<Pair<Integer, Long>> cachedPartitionWithLatestOffsets = Lists.newArrayList(); + public KafkaTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, Map<Integer, Long> partitionIdToOffset) { super(id, jobId, clusterName, timeoutMs); this.partitionIdToOffset = partitionIdToOffset; @@ -79,8 +88,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tRoutineLoadTask.setLabel(label); tRoutineLoadTask.setAuthCode(routineLoadJob.getAuthCode()); TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo(); - tKafkaLoadInfo.setTopic((routineLoadJob).getTopic()); - tKafkaLoadInfo.setBrokers((routineLoadJob).getBrokerList()); + tKafkaLoadInfo.setTopic(routineLoadJob.getTopic()); + tKafkaLoadInfo.setBrokers(routineLoadJob.getBrokerList()); tKafkaLoadInfo.setPartitionBeginOffset(partitionIdToOffset); tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties()); tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo); @@ -103,6 +112,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { return gson.toJson(partitionIdToOffset); } + @Override + boolean hasMoreDataToConsume() { + KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); + return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset); + } + private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws UserException { TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); // plan for each task, in case table has change(rollup or schema change) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 734c8b3..b566029 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -637,11 +637,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadTaskInfoList) { if (routineLoadTaskInfo.getBeId() != -1L) { long beId = routineLoadTaskInfo.getBeId(); - if (beIdConcurrentTasksNum.containsKey(beId)) { - beIdConcurrentTasksNum.put(beId, beIdConcurrentTasksNum.get(beId) + 1); - } else { - beIdConcurrentTasksNum.put(beId, 1); - } + beIdConcurrentTasksNum.put(beId, beIdConcurrentTasksNum.getOrDefault(beId, 0) + 1); } } return beIdConcurrentTasksNum; @@ -1205,7 +1201,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl try { if (!state.isFinalState()) { unprotectUpdateState(JobState.CANCELLED, - new ErrorReason(InternalErrorCode.TABLE_ERR, "table not exist"), false /* not replay */); + new ErrorReason(InternalErrorCode.TABLE_ERR, "table does not exist"), false /* not replay */); } return; } finally { @@ -1213,13 +1209,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } } - // check if partition has been changed + preCheckNeedSchedule(); + writeLock(); try { if (unprotectNeedReschedule()) { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("msg", "Job need to be rescheduled") - .build()); + .add("msg", "Job need to be rescheduled") + .build()); unprotectUpdateProgress(); executeNeedSchedule(); } @@ -1228,6 +1225,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } } + // Call this before calling unprotectUpdateProgress(). + // Because unprotectUpdateProgress() is protected by writelock. + // So if there are time-consuming operations, they should be done in this method. + // (Such as getAllKafkaPartitions() in KafkaRoutineLoad) + protected void preCheckNeedSchedule() throws UserException { + + } + protected void unprotectUpdateProgress() throws UserException { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index e6a5263..4d283e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -41,14 +41,14 @@ import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -58,7 +58,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -355,7 +354,6 @@ public class RoutineLoadManager implements Writable { readLock(); try { int result = 0; - updateBeIdToMaxConcurrentTasks(); Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap(); for (Map.Entry<Long, Integer> entry : beIdToMaxConcurrentTasks.entrySet()) { if (beIdToConcurrentTasks.containsKey(entry.getKey())) { @@ -439,7 +437,6 @@ public class RoutineLoadManager implements Writable { } // 2. The given BE id does not have available slots, find a BE with min tasks - updateBeIdToMaxConcurrentTasks(); int idleTaskNum = 0; long resultBeId = -1L; int maxIdleSlotNum = 0; @@ -558,13 +555,12 @@ public class RoutineLoadManager implements Writable { } } - public boolean checkTaskInJob(UUID taskId) { - for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { - if (routineLoadJob.containsTask(taskId)) { - return true; - } + public boolean checkTaskInJob(RoutineLoadTaskInfo task) { + RoutineLoadJob routineLoadJob = idToRoutineLoadJob.get(task.getJobId()); + if (routineLoadJob == null) { + return false; } - return false; + return routineLoadJob.containsTask(task.getId()); } public List<RoutineLoadJob> getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> desiredStates) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 71bdb99..b535b94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -207,7 +207,9 @@ public abstract class RoutineLoadTaskInfo { } abstract String getTaskDataSourceProperties(); - + + abstract boolean hasMoreDataToConsume(); + @Override public boolean equals(Object obj) { if (obj instanceof RoutineLoadTaskInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index e204155..b83fa13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -87,8 +87,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { } private void process() throws UserException, InterruptedException { + // update the max slot num of each backend periodically updateBackendSlotIfNecessary(); + long start = System.currentTimeMillis(); // if size of queue is zero, tasks will be submitted by batch int idleSlotNum = routineLoadManager.getClusterIdleSlotNum(); // scheduler will be blocked when there is no slot for task in cluster @@ -114,8 +116,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception { routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis()); + LOG.debug("schedule routine load task info {} for job {}", routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId()); // check if task has been abandoned - if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getId())) { + if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo)) { // task has been abandoned while renew task has been added in queue // or database has been deleted LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()) @@ -124,6 +127,12 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { return; } + // check if topic has more data to consume + if (!routineLoadTaskInfo.hasMoreDataToConsume()) { + needScheduleTasksQueue.put(routineLoadTaskInfo); + return; + } + // allocate BE slot for this task. // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. try { @@ -294,3 +303,4 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { return true; } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index df0e599..8ee321e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -759,9 +759,19 @@ public class ReportHandler extends Daemon { private static void handleMigration(ListMultimap<TStorageMedium, Long> tabletMetaMigrationMap, long backendId) { TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + SystemInfoService infoService = Catalog.getCurrentSystemInfo(); + Backend be = infoService.getBackend(backendId); + if (be == null) { + return; + } AgentBatchTask batchTask = new AgentBatchTask(); for (TStorageMedium storageMedium : tabletMetaMigrationMap.keySet()) { List<Long> tabletIds = tabletMetaMigrationMap.get(storageMedium); + if (!be.hasSpecifiedStorageMedium(storageMedium)) { + LOG.warn("no specified storage medium {} on backend {}, skip storage migration." + + " sample tablet id: {}", storageMedium, backendId, tabletIds.isEmpty() ? "-1" : tabletIds.get(0)); + continue; + } List<TabletMeta> tabletMetaList = invertedIndex.getTabletMetaList(tabletIds); for (int i = 0; i < tabletMetaList.size(); i++) { long tabletId = tabletIds.get(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 0c5d66d..03fd69a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -330,6 +330,10 @@ public class Backend implements Writable { return disksRef.values().stream().allMatch(DiskInfo::hasPathHash); } + public boolean hasSpecifiedStorageMedium(TStorageMedium storageMedium) { + return disksRef.values().stream().anyMatch(d -> d.isStorageMediumMatch(storageMedium)); + } + public long getTotalCapacityB() { ImmutableMap<String, DiskInfo> disks = disksRef; long totalCapacityB = 0L; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java index 34c4fb4..c4f6913 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -21,9 +21,14 @@ import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.common.FeConstants; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -34,10 +39,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - public class BackendTest { private Backend backend; private long backendId = 9999; @@ -107,6 +108,8 @@ public class BackendTest { backend.updateDisks(diskInfos); Assert.assertEquals(disk2.getDiskTotalCapacity(), backend.getTotalCapacityB()); Assert.assertEquals(disk2.getDiskAvailableCapacity() + 1, backend.getAvailableCapacityB()); + Assert.assertFalse(backend.hasSpecifiedStorageMedium(TStorageMedium.SSD)); + Assert.assertTrue(backend.hasSpecifiedStorageMedium(TStorageMedium.HDD)); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 0512e3e..95f4889 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -417,7 +417,7 @@ public class RoutineLoadManagerTest { }; Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); - + routineLoadManager.updateBeIdToMaxConcurrentTasks(); Assert.assertEquals(Config.max_routine_load_task_num_per_be * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 133dcb8..8cb98d6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -95,7 +95,7 @@ public class RoutineLoadTaskSchedulerTest { routineLoadManager.getClusterIdleSlotNum(); minTimes = 0; result = 1; - routineLoadManager.checkTaskInJob((UUID) any); + routineLoadManager.checkTaskInJob((RoutineLoadTaskInfo) any); minTimes = 0; result = true; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 016f4d1..5ecbb69 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -239,7 +239,10 @@ message PKafkaLoadInfo { message PKafkaMetaProxyRequest { optional PKafkaLoadInfo kafka_info = 1; + // optional for getting offsets for times repeated PIntegerPair offset_times = 3; + // optional for getting latest offsets of partitons + repeated int32 partition_id_for_latest_offsets = 4; }; message PProxyRequest { diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh index f202d14..5aa6efa 100644 --- a/thirdparty/vars.sh +++ b/thirdparty/vars.sh @@ -202,11 +202,11 @@ ROCKSDB_NAME=rocksdb-5.14.2.tar.gz ROCKSDB_SOURCE=rocksdb-5.14.2 ROCKSDB_MD5SUM="b72720ea3b1e9ca9e4ed0febfef65b14" -# librdkafka-1.6.1 -LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/v1.6.1-RC3.tar.gz" -LIBRDKAFKA_NAME=librdkafka-1.6.1-RC3.tar.gz -LIBRDKAFKA_SOURCE=librdkafka-1.6.1-RC3 -LIBRDKAFKA_MD5SUM="11b02507db989a0fb5220fcc4df8f79a" +# librdkafka-1.7.0 +LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/refs/tags/v1.7.0.tar.gz" +LIBRDKAFKA_NAME=v1.7.0.tar.gz +LIBRDKAFKA_SOURCE=librdkafka-1.7.0 +LIBRDKAFKA_MD5SUM="fe3c45deb182bd9c644b6bc6375bffc3" # zstd ZSTD_DOWNLOAD="https://github.com/facebook/zstd/archive/v1.3.7.tar.gz" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
