This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 2522df04b8ad9acc84090e45774dcf3f213e1c31
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Oct 13 11:39:01 2021 +0800

    [Optimize][RoutineLoad] Avoid sending tasks if there is no data to be 
consumed (#6805)
    
    1 Avoid sending tasks if there is no data to be consumed
    By fetching latest offset of partition before sending tasks.(Fix [Optimize] 
Avoid too many abort task in routine load job #6803 )
    
    2 Add a preCheckNeedSchedule phase in update() of routine load.
    To avoid taking write lock of job for long time when getting all kafka 
partitions from kafka server.
    
    3 Upgrade librdkafka's version to 1.7.0 to fix a bug of "Local: Unknown 
partition"
    See offsetsForTimes fails with 'Local: Unknown partition' 
edenhill/librdkafka#3295
    
    4 Avoid unnecessary storage migration task if there is no that storage 
medium on BE.
    Fix [Bug] Too many unnecessary storage migration tasks #6804
---
 be/src/runtime/routine_load/data_consumer.cpp      |  29 +++++-
 be/src/runtime/routine_load/data_consumer.h        |   3 +
 .../routine_load/routine_load_task_executor.cpp    |  23 ++++-
 .../routine_load/routine_load_task_executor.h      |   3 +
 be/src/service/internal_service.cpp                |  18 +++-
 conf/fe.conf                                       |   2 +-
 .../java/org/apache/doris/catalog/DiskInfo.java    |   9 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../org/apache/doris/common/util/KafkaUtil.java    |  70 +++++++++++++-
 .../apache/doris/journal/bdbje/BDBJEJournal.java   |   2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      | 106 +++++++++++++++++----
 .../doris/load/routineload/KafkaTaskInfo.java      |  21 +++-
 .../doris/load/routineload/RoutineLoadJob.java     |  23 +++--
 .../doris/load/routineload/RoutineLoadManager.java |  20 ++--
 .../load/routineload/RoutineLoadTaskInfo.java      |   4 +-
 .../load/routineload/RoutineLoadTaskScheduler.java |  12 ++-
 .../org/apache/doris/master/ReportHandler.java     |  10 ++
 .../main/java/org/apache/doris/system/Backend.java |   4 +
 .../java/org/apache/doris/catalog/BackendTest.java |  11 ++-
 .../load/routineload/RoutineLoadManagerTest.java   |   2 +-
 .../routineload/RoutineLoadTaskSchedulerTest.java  |   2 +-
 gensrc/proto/internal_service.proto                |   3 +
 thirdparty/vars.sh                                 |  10 +-
 23 files changed, 322 insertions(+), 67 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 34cba02..c387b07 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -172,7 +172,6 @@ Status KafkaDataConsumer::assign_topic_partitions(
 
 Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* 
queue,
                                         int64_t max_running_time_ms) {
-    _last_visit_time = time(nullptr);
     int64_t left_time = max_running_time_ms;
     LOG(INFO) << "start kafka consumer: " << _id << ", grp: " << _grp_id
               << ", max running time(ms): " << left_time;
@@ -338,7 +337,30 @@ Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>&
         PIntegerPair pair;
         pair.set_key(topic_partition->partition());
         pair.set_val(topic_partition->offset());
-        offsets->push_back(pair);
+        offsets->push_back(std::move(pair));
+    }
+
+    return Status::OK();
+}
+
+// get latest offsets for given partitions
+Status KafkaDataConsumer::get_latest_offsets_for_partitions(const 
std::vector<int32_t>& partition_ids,
+        std::vector<PIntegerPair>* offsets) {
+    for (int32_t partition_id : partition_ids) {
+        int64_t low = 0;
+        int64_t high = 0;
+        RdKafka::ErrorCode err = _k_consumer->query_watermark_offsets(_topic, 
partition_id, &low, &high, 5000);
+        if (err != RdKafka::ERR_NO_ERROR) {
+            std::stringstream ss;
+            ss << "failed to get latest offset for partition: " << 
partition_id << ", err: " << RdKafka::err2str(err);
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        
+        PIntegerPair pair;
+        pair.set_key(partition_id);
+        pair.set_val(high);
+        offsets->push_back(std::move(pair));
     }
 
     return Status::OK();
@@ -358,6 +380,9 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
 Status KafkaDataConsumer::reset() {
     std::unique_lock<std::mutex> l(_lock);
     _cancelled = false;
+    // reset will be called before this consumer being returned to the pool.
+    // so update _last_visit_time is reasonable.
+    _last_visit_time = time(nullptr);
     return Status::OK();
 }
 
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index fcbd017..17b1810 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -145,6 +145,9 @@ public:
     // get offsets for times
     Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
             std::vector<PIntegerPair>* offsets);
+    // get latest offsets for partitions
+    Status get_latest_offsets_for_partitions(const std::vector<int32_t>& 
partition_ids,
+            std::vector<PIntegerPair>* offsets);
 
 private:
     std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 0e7ae45..8cc68a6 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -39,7 +39,7 @@ RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* 
exec_env)
         : _exec_env(exec_env),
           _thread_pool(config::routine_load_thread_pool_size,
                        config::routine_load_thread_pool_size),
-          _data_consumer_pool(10) {
+          _data_consumer_pool(config::routine_load_thread_pool_size) {
     REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
         std::lock_guard<std::mutex> l(_lock);
         return _task_map.size();
@@ -126,6 +126,27 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const PKaf
     return st;
 }
 
+Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(const 
PKafkaMetaProxyRequest& request,
+        std::vector<PIntegerPair>* partition_offsets) {
+    CHECK(request.has_kafka_info());
+
+    // This context is meaningless, just for unifing the interface
+    StreamLoadContext ctx(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+
+    std::shared_ptr<DataConsumer> consumer;
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+
+    Status st = 
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_latest_offsets_for_partitions(
+            
std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(),
+                request.partition_id_for_latest_offsets().end()),
+            partition_offsets);
+    if (st.ok()) {
+        _data_consumer_pool.return_consumer(consumer);
+    }
+    return st;
+}
+
 Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
     std::unique_lock<std::mutex> l(_lock);
     if (_task_map.find(task.id) != _task_map.end()) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index e5c7939..41f20b1 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -55,6 +55,9 @@ public:
     Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& 
request,
         std::vector<PIntegerPair>* partition_offsets);
 
+    Status get_kafka_latest_offsets_for_partitions(const 
PKafkaMetaProxyRequest& request,
+        std::vector<PIntegerPair>* partition_offsets);
+
 private:
     // execute the task
     void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, 
ExecFinishCallback cb);
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 89dd7ef..cd98ea0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -208,7 +208,23 @@ void 
PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
     // 2. get all kafka partition offsets for given topic and timestamp.
     if (request->has_kafka_meta_request()) {
         const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-        if (!kafka_request.offset_times().empty()) {
+        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+            // get latest offsets for specified partition ids
+            std::vector<PIntegerPair> partition_offsets;
+            Status st =
+                    
_exec_env->routine_load_task_executor()->get_kafka_latest_offsets_for_partitions(
+                            request->kafka_meta_request(), &partition_offsets);
+            if (st.ok()) {
+                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                for (const auto& entry : partition_offsets) {
+                    PIntegerPair* res = part_offsets->add_offset_times();
+                    res->set_key(entry.key());
+                    res->set_val(entry.val());
+                }
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
+        } else if (!kafka_request.offset_times().empty()) {
             // if offset_times() has elements, which means this request is to 
get offset by timestamp.
             std::vector<PIntegerPair> partition_offsets;
             Status st =
diff --git a/conf/fe.conf b/conf/fe.conf
index 9298619..723221d 100644
--- a/conf/fe.conf
+++ b/conf/fe.conf
@@ -58,7 +58,7 @@ mysql_service_nio_enabled = true
 # log_roll_size_mb = 1024
 # sys_log_dir = ${DORIS_HOME}/log
 # sys_log_roll_num = 10
-# sys_log_verbose_modules = 
+# sys_log_verbose_modules = org.apache.doris
 # audit_log_dir = ${DORIS_HOME}/log
 # audit_log_modules = slow_query, query
 # audit_log_roll_num = 10
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index ee08790..e18b07f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -24,6 +24,8 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TStorageMedium;
 
+import com.google.gson.annotations.SerializedName;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -31,8 +33,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import com.google.gson.annotations.SerializedName;
-
 public class DiskInfo implements Writable {
     private static final Logger LOG = LogManager.getLogger(DiskInfo.class);
 
@@ -53,7 +53,6 @@ public class DiskInfo implements Writable {
     private long diskAvailableCapacityB;
     @SerializedName("state")
     private DiskState state;
-
     // path hash and storage medium are reported from Backend and no need to 
persist
     private long pathHash = 0;
     private TStorageMedium storageMedium;
@@ -129,6 +128,10 @@ public class DiskInfo implements Writable {
         return pathHash != 0;
     }
 
+    public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
+        return this.storageMedium == storageMedium;
+    }
+
     public TStorageMedium getStorageMedium() {
         return storageMedium;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2b66c81..d319d3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1168,7 +1168,7 @@ public class Config extends ConfigBase {
      * not work to avoid OOM.
      */
     @ConfField(mutable = true, masterOnly = true)
-    public static long metadata_checkpoint_memory_threshold = 60;
+    public static long metadata_checkpoint_memory_threshold = 70;
 
     /**
      * If set to true, the checkpoint thread will make the checkpoint 
regardless of the jvm memory used percent.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 6691af6..ecbb897 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -37,6 +37,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -144,7 +145,7 @@ public class KafkaUtil {
             InternalService.PProxyResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
-                throw new UserException("failed to get kafka partition info: " 
+ result.getStatus().getErrorMsgsList());
+                throw new UserException("failed to get offsets for times: " + 
result.getStatus().getErrorMsgsList());
             } else {
                 List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
                 List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
@@ -166,6 +167,73 @@ public class KafkaUtil {
             }
         }
     }
+
+    public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID 
taskId, String brokerList, String topic,
+                                                             Map<String, 
String> convertedCustomProperties,
+                                                             List<Integer> 
partitionIds) throws LoadException {
+        BackendService.Client client = null;
+        TNetworkAddress address = null;
+        LOG.debug("begin to get latest offsets for partitions {} in topic: {}, 
task {}, job {}",
+                partitionIds, topic, taskId, jobId);
+        boolean ok = false;
+        try {
+            List<Long> backendIds = 
Catalog.getCurrentSystemInfo().getBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get latest offsets. No 
alive backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            // create request
+            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
+                    InternalService.PKafkaMetaProxyRequest.newBuilder()
+                            
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+                                    .setBrokers(brokerList)
+                                    .setTopic(topic)
+                                    .addAllProperties(
+                                            
convertedCustomProperties.entrySet().stream().map(
+                                                    e -> 
InternalService.PStringPair.newBuilder()
+                                                            .setKey(e.getKey())
+                                                            
.setVal(e.getValue())
+                                                            .build()
+                                            ).collect(Collectors.toList())
+                                    )
+                            );
+            for (Integer partitionId : partitionIds) {
+                metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
+            }
+            InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+                    metaRequestBuilder).build();
+
+            // get info
+            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
+            InternalService.PProxyResult result = future.get(5, 
TimeUnit.SECONDS);
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new UserException("failed to get latest offsets: " + 
result.getStatus().getErrorMsgsList());
+            } else {
+                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
+                for (InternalService.PIntegerPair pair : pairs) {
+                    partitionOffsets.add(Pair.create(pair.getKey(), 
pair.getVal()));
+                }
+                LOG.debug("finish to get latest offsets for partitions {} in 
topic: {}, task {}, job {}",
+                        partitionOffsets, topic, taskId, jobId);
+                return partitionOffsets;
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get latest offsets.", e);
+            throw new LoadException(
+                    "Failed to get latest offsets of kafka topic: " + topic + 
". error: " + e.getMessage());
+        } finally {
+            if (ok) {
+                ClientPool.backendPool.returnObject(address, client);
+            } else {
+                ClientPool.backendPool.invalidateObject(address, client);
+            }
+        }
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 553aa22..d1c29fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -446,4 +446,4 @@ public class BDBJEJournal implements Journal {
 
         return dbNames;
     }
-}
\ No newline at end of file
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 7a3b87b..1be8030 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -91,6 +91,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     private Map<String, String> customProperties = Maps.newHashMap();
     private Map<String, String> convertedCustomProperties = Maps.newHashMap();
 
+    // The latest offset of each partition fetched from kafka server.
+    // Will be updated periodically by calling hasMoreDataToConsume()
+    private Map<Integer, Long> cachedPartitionWithLatestOffsets = 
Maps.newConcurrentMap();
+
+    // The kafka partition fetch from kafka server.
+    // Will be updated periodically by calling updateKafkaPartitions();
+    private List<Integer> newCurrentKafkaPartition = Lists.newArrayList();
+
     public KafkaRoutineLoadJob() {
         // for serialization, id is dummy
         super(-1, LoadDataSourceType.KAFKA);
@@ -281,6 +289,34 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         updateNewPartitionProgress();
     }
 
+    @Override
+    protected void preCheckNeedSchedule() throws UserException {
+        // If user does not specify kafka partition,
+        // We will fetch partition from kafka server periodically
+        if (this.state == JobState.RUNNING || this.state == 
JobState.NEED_SCHEDULE) {
+            if (customKafkaPartitions == null && 
!customKafkaPartitions.isEmpty()) {
+                return;
+            }
+            updateKafkaPartitions();
+        }
+    }
+
+    private void updateKafkaPartitions() throws UserException {
+        try {
+            this.newCurrentKafkaPartition = getAllKafkaPartitions();
+        } catch (Exception e) {
+            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                    .add("error_msg", "Job failed to fetch all current 
partition with error " + e.getMessage())
+                    .build(), e);
+            if (this.state == JobState.NEED_SCHEDULE) {
+                unprotectUpdateState(JobState.PAUSED,
+                        new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
+                                "Job failed to fetch all current partition 
with error " + e.getMessage()),
+                        false /* not replay */);
+            }
+        }
+    }
+
     // if customKafkaPartition is not null, then return false immediately
     // else if kafka partitions of topic has been changed, return true.
     // else return false
@@ -294,24 +330,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 currentKafkaPartitions = customKafkaPartitions;
                 return false;
             } else {
-                List<Integer> newCurrentKafkaPartition;
-                try {
-                    newCurrentKafkaPartition = getAllKafkaPartitions();
-                } catch (Exception e) {
-                    LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                            .add("error_msg", "Job failed to fetch all current 
partition with error " + e.getMessage())
-                            .build(), e);
-                    if (this.state == JobState.NEED_SCHEDULE) {
-                        unprotectUpdateState(JobState.PAUSED,
-                                new 
ErrorReason(InternalErrorCode.PARTITIONS_ERR,
-                                        "Job failed to fetch all current 
partition with error " + e.getMessage()),
-                                false /* not replay */);
-                    }
-                    return false;
-                }
-                if 
(currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) {
-                    if (currentKafkaPartitions.size() > 
newCurrentKafkaPartition.size()) {
-                        currentKafkaPartitions = newCurrentKafkaPartition;
+                // the newCurrentKafkaPartition should be already updated in 
preCheckNeedScheduler()
+                Preconditions.checkNotNull(this.newCurrentKafkaPartition);
+                if 
(currentKafkaPartitions.containsAll(this.newCurrentKafkaPartition)) {
+                    if (currentKafkaPartitions.size() > 
this.newCurrentKafkaPartition.size()) {
+                        currentKafkaPartitions = this.newCurrentKafkaPartition;
                         if (LOG.isDebugEnabled()) {
                             LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
id)
                                     .add("current_kafka_partitions", 
Joiner.on(",").join(currentKafkaPartitions))
@@ -323,7 +346,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                         return false;
                     }
                 } else {
-                    currentKafkaPartitions = newCurrentKafkaPartition;
+                    currentKafkaPartitions = this.newCurrentKafkaPartition;
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
                                 .add("current_kafka_partitions", 
Joiner.on(",").join(currentKafkaPartitions))
@@ -624,4 +647,49 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             LOG.error("failed to replay modify kafka routine load job: {}", 
id, e);
         }
     }
+
+    // check if given partitions has more data to consume.
+    // 'partitionIdToOffset' to the offset to be consumed.
+    public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) {
+        for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+            if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
+                    && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
+                // "entry.getValue()" is the offset to be consumed.
+                // "cachedPartitionWithLatestOffsets.get(entry.getKey())" is 
the "next" offset of this partition.
+                // (because librdkafa's query_watermark_offsets() will return 
the next offset.
+                //  For example, there 4 msg in partition with offset 0,1,2,3,
+                //  query_watermark_offsets() will return 4.)
+                LOG.debug("has more data to consume. offsets to be consumed: 
{}, latest offsets: {}, task {}, job {}",
+                        partitionIdToOffset, cachedPartitionWithLatestOffsets, 
taskId, id);
+                return true;
+            }
+        }
+
+        try {
+            // all offsets to be consumed are newer than offsets in 
cachedPartitionWithLatestOffsets,
+            // maybe the cached offset is out-of-date, fetch from kafka server 
again
+            List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, 
taskId, getBrokerList(),
+                    getTopic(), getConvertedCustomProperties(), 
Lists.newArrayList(partitionIdToOffset.keySet()));
+            for (Pair<Integer, Long> pair : tmp) {
+                cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get latest partition offset. {}", 
e.getMessage(), e);
+            return false;
+        }
+
+        // check again
+        for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+            if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
+                    && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
+                LOG.debug("has more data to consume. offsets to be consumed: 
{}, latest offsets: {}, task {}, job {}",
+                        partitionIdToOffset, cachedPartitionWithLatestOffsets, 
taskId, id);
+                return true;
+            }
+        }
+
+        LOG.debug("no more data to consume. offsets to be consumed: {}, latest 
offsets: {}, task {}, job {}",
+                partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, 
id);
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index f4d4d6e..c7dfd60 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -31,20 +32,28 @@ import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 public class KafkaTaskInfo extends RoutineLoadTaskInfo {
+    private static final Logger LOG = 
LogManager.getLogger(KafkaTaskInfo.class);
 
     private RoutineLoadManager routineLoadManager = 
Catalog.getCurrentCatalog().getRoutineLoadManager();
 
-    // <partitionId, beginOffsetOfPartitionId>
+    // <partitionId, offset to be consumed>
     private Map<Integer, Long> partitionIdToOffset;
 
+    // Last fetched and cached latest partition offsets.
+    private List<Pair<Integer, Long>> cachedPartitionWithLatestOffsets = 
Lists.newArrayList();
+
     public KafkaTaskInfo(UUID id, long jobId, String clusterName, long 
timeoutMs, Map<Integer, Long> partitionIdToOffset) {
         super(id, jobId, clusterName, timeoutMs);
         this.partitionIdToOffset = partitionIdToOffset;
@@ -79,8 +88,8 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         tRoutineLoadTask.setLabel(label);
         tRoutineLoadTask.setAuthCode(routineLoadJob.getAuthCode());
         TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo();
-        tKafkaLoadInfo.setTopic((routineLoadJob).getTopic());
-        tKafkaLoadInfo.setBrokers((routineLoadJob).getBrokerList());
+        tKafkaLoadInfo.setTopic(routineLoadJob.getTopic());
+        tKafkaLoadInfo.setBrokers(routineLoadJob.getBrokerList());
         tKafkaLoadInfo.setPartitionBeginOffset(partitionIdToOffset);
         
tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties());
         tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo);
@@ -103,6 +112,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         return gson.toJson(partitionIdToOffset);
     }
 
+    @Override
+    boolean hasMoreDataToConsume() {
+        KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) 
routineLoadManager.getJob(jobId);
+        return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
+    }
+
     private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) 
throws UserException {
         TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), 
id.getLeastSignificantBits());
         // plan for each task, in case table has change(rollup or schema 
change)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 734c8b3..b566029 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -637,11 +637,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             for (RoutineLoadTaskInfo routineLoadTaskInfo : 
routineLoadTaskInfoList) {
                 if (routineLoadTaskInfo.getBeId() != -1L) {
                     long beId = routineLoadTaskInfo.getBeId();
-                    if (beIdConcurrentTasksNum.containsKey(beId)) {
-                        beIdConcurrentTasksNum.put(beId, 
beIdConcurrentTasksNum.get(beId) + 1);
-                    } else {
-                        beIdConcurrentTasksNum.put(beId, 1);
-                    }
+                    beIdConcurrentTasksNum.put(beId, 
beIdConcurrentTasksNum.getOrDefault(beId, 0) + 1);
                 }
             }
             return beIdConcurrentTasksNum;
@@ -1205,7 +1201,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             try {
                 if (!state.isFinalState()) {
                     unprotectUpdateState(JobState.CANCELLED,
-                            new ErrorReason(InternalErrorCode.TABLE_ERR, 
"table not exist"), false /* not replay */);
+                            new ErrorReason(InternalErrorCode.TABLE_ERR, 
"table does not exist"), false /* not replay */);
                 }
                 return;
             } finally {
@@ -1213,13 +1209,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             }
         }
 
-        // check if partition has been changed
+        preCheckNeedSchedule();
+
         writeLock();
         try {
             if (unprotectNeedReschedule()) {
                 LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                                  .add("msg", "Job need to be rescheduled")
-                                  .build());
+                        .add("msg", "Job need to be rescheduled")
+                        .build());
                 unprotectUpdateProgress();
                 executeNeedSchedule();
             }
@@ -1228,6 +1225,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         }
     }
 
+    // Call this before calling unprotectUpdateProgress().
+    // Because unprotectUpdateProgress() is protected by writelock.
+    // So if there are time-consuming operations, they should be done in this 
method.
+    // (Such as getAllKafkaPartitions() in KafkaRoutineLoad)
+    protected void preCheckNeedSchedule() throws UserException {
+
+    }
+
     protected void unprotectUpdateProgress() throws UserException {
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index e6a5263..4d283e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -41,14 +41,14 @@ import 
org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -58,7 +58,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
@@ -355,7 +354,6 @@ public class RoutineLoadManager implements Writable {
         readLock();
         try {
             int result = 0;
-            updateBeIdToMaxConcurrentTasks();
             Map<Long, Integer> beIdToConcurrentTasks = 
getBeCurrentTasksNumMap();
             for (Map.Entry<Long, Integer> entry : 
beIdToMaxConcurrentTasks.entrySet()) {
                 if (beIdToConcurrentTasks.containsKey(entry.getKey())) {
@@ -439,7 +437,6 @@ public class RoutineLoadManager implements Writable {
             }
 
             // 2. The given BE id does not have available slots, find a BE 
with min tasks
-            updateBeIdToMaxConcurrentTasks();
             int idleTaskNum = 0;
             long resultBeId = -1L;
             int maxIdleSlotNum = 0;
@@ -558,13 +555,12 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
-    public boolean checkTaskInJob(UUID taskId) {
-        for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
-            if (routineLoadJob.containsTask(taskId)) {
-                return true;
-            }
+    public boolean checkTaskInJob(RoutineLoadTaskInfo task) {
+        RoutineLoadJob routineLoadJob = 
idToRoutineLoadJob.get(task.getJobId());
+        if (routineLoadJob == null) {
+            return false;
         }
-        return false;
+        return routineLoadJob.containsTask(task.getId());
     }
 
     public List<RoutineLoadJob> 
getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> desiredStates) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 71bdb99..b535b94 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -207,7 +207,9 @@ public abstract class RoutineLoadTaskInfo {
     }
 
     abstract String getTaskDataSourceProperties();
-    
+
+    abstract boolean hasMoreDataToConsume();
+
     @Override
     public boolean equals(Object obj) {
         if (obj instanceof RoutineLoadTaskInfo) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index e204155..b83fa13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -87,8 +87,10 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     }
 
     private void process() throws UserException, InterruptedException {
+        // update the max slot num of each backend periodically
         updateBackendSlotIfNecessary();
 
+        long start = System.currentTimeMillis();
         // if size of queue is zero, tasks will be submitted by batch
         int idleSlotNum = routineLoadManager.getClusterIdleSlotNum();
         // scheduler will be blocked when there is no slot for task in cluster
@@ -114,8 +116,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
 
     private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) 
throws Exception {
         routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis());
+        LOG.debug("schedule routine load task info {} for job {}", 
routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId());
         // check if task has been abandoned
-        if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo.getId())) {
+        if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo)) {
             // task has been abandoned while renew task has been added in queue
             // or database has been deleted
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, 
routineLoadTaskInfo.getId())
@@ -124,6 +127,12 @@ public class RoutineLoadTaskScheduler extends MasterDaemon 
{
             return;
         }
 
+        // check if topic has more data to consume
+        if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
+            needScheduleTasksQueue.put(routineLoadTaskInfo);
+            return;
+        }
+
         // allocate BE slot for this task.
         // this should be done before txn begin, or the txn may be begun 
successfully but failed to be allocated.
         try {
@@ -294,3 +303,4 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
         return true;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index df0e599..8ee321e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -759,9 +759,19 @@ public class ReportHandler extends Daemon {
     private static void handleMigration(ListMultimap<TStorageMedium, Long> 
tabletMetaMigrationMap,
                                         long backendId) {
         TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
+        SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+        Backend be = infoService.getBackend(backendId);
+        if (be == null) {
+            return;
+        }
         AgentBatchTask batchTask = new AgentBatchTask();
         for (TStorageMedium storageMedium : tabletMetaMigrationMap.keySet()) {
             List<Long> tabletIds = tabletMetaMigrationMap.get(storageMedium);
+            if (!be.hasSpecifiedStorageMedium(storageMedium)) {
+                LOG.warn("no specified storage medium {} on backend {}, skip 
storage migration." +
+                        " sample tablet id: {}", storageMedium, backendId, 
tabletIds.isEmpty() ? "-1" : tabletIds.get(0));
+                continue;
+            }
             List<TabletMeta> tabletMetaList = 
invertedIndex.getTabletMetaList(tabletIds);
             for (int i = 0; i < tabletMetaList.size(); i++) {
                 long tabletId = tabletIds.get(i);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 0c5d66d..03fd69a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -330,6 +330,10 @@ public class Backend implements Writable {
         return disksRef.values().stream().allMatch(DiskInfo::hasPathHash);
     }
 
+    public boolean hasSpecifiedStorageMedium(TStorageMedium storageMedium) {
+        return disksRef.values().stream().anyMatch(d -> 
d.isStorageMediumMatch(storageMedium));
+    }
+
     public long getTotalCapacityB() {
         ImmutableMap<String, DiskInfo> disks = disksRef;
         long totalCapacityB = 0L;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java
index 34c4fb4..c4f6913 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java
@@ -21,9 +21,14 @@ import org.apache.doris.analysis.AccessTestUtil;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDisk;
+import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -34,10 +39,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 public class BackendTest {
     private Backend backend;
     private long backendId = 9999;
@@ -107,6 +108,8 @@ public class BackendTest {
         backend.updateDisks(diskInfos);
         Assert.assertEquals(disk2.getDiskTotalCapacity(), 
backend.getTotalCapacityB());
         Assert.assertEquals(disk2.getDiskAvailableCapacity() + 1, 
backend.getAvailableCapacityB());
+        
Assert.assertFalse(backend.hasSpecifiedStorageMedium(TStorageMedium.SSD));
+        
Assert.assertTrue(backend.hasSpecifiedStorageMedium(TStorageMedium.HDD));
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 0512e3e..95f4889 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -417,7 +417,7 @@ public class RoutineLoadManagerTest {
         };
 
         Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", 
idToRoutineLoadJob);
-
+        routineLoadManager.updateBeIdToMaxConcurrentTasks();
         Assert.assertEquals(Config.max_routine_load_task_num_per_be * 2 - 1,
                 routineLoadManager.getClusterIdleSlotNum());
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 133dcb8..8cb98d6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -95,7 +95,7 @@ public class RoutineLoadTaskSchedulerTest {
                 routineLoadManager.getClusterIdleSlotNum();
                 minTimes = 0;
                 result = 1;
-                routineLoadManager.checkTaskInJob((UUID) any);
+                routineLoadManager.checkTaskInJob((RoutineLoadTaskInfo) any);
                 minTimes = 0;
                 result = true;
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 016f4d1..5ecbb69 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -239,7 +239,10 @@ message PKafkaLoadInfo {
 
 message PKafkaMetaProxyRequest {
     optional PKafkaLoadInfo kafka_info = 1;
+    // optional for getting offsets for times
     repeated PIntegerPair offset_times = 3;
+    // optional for getting latest offsets of partitons
+    repeated int32 partition_id_for_latest_offsets = 4;
 };
 
 message PProxyRequest {
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index f202d14..5aa6efa 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -202,11 +202,11 @@ ROCKSDB_NAME=rocksdb-5.14.2.tar.gz
 ROCKSDB_SOURCE=rocksdb-5.14.2
 ROCKSDB_MD5SUM="b72720ea3b1e9ca9e4ed0febfef65b14"
 
-# librdkafka-1.6.1
-LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/v1.6.1-RC3.tar.gz";
-LIBRDKAFKA_NAME=librdkafka-1.6.1-RC3.tar.gz
-LIBRDKAFKA_SOURCE=librdkafka-1.6.1-RC3
-LIBRDKAFKA_MD5SUM="11b02507db989a0fb5220fcc4df8f79a"
+# librdkafka-1.7.0
+LIBRDKAFKA_DOWNLOAD="https://github.com/edenhill/librdkafka/archive/refs/tags/v1.7.0.tar.gz";
+LIBRDKAFKA_NAME=v1.7.0.tar.gz
+LIBRDKAFKA_SOURCE=librdkafka-1.7.0
+LIBRDKAFKA_MD5SUM="fe3c45deb182bd9c644b6bc6375bffc3"
 
 # zstd
 ZSTD_DOWNLOAD="https://github.com/facebook/zstd/archive/v1.3.7.tar.gz";

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to