chaoyli closed pull request #484: Add more detail logs to debug streaming load
URL: https://github.com/apache/incubator-doris/pull/484
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index a2902daa..5397bc1b 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -137,9 +137,12 @@ struct StreamLoadContext {
 
     std::string to_json() const;
 
+    std::string brief() const;
+
     void ref() { _refs.fetch_add(1); }
     // If unref() returns true, this object should be delete
     bool unref() { return _refs.fetch_sub(1) == 1; }
+
 private:
     std::atomic<int> _refs;
 };
@@ -156,6 +159,14 @@ std::string StreamLoadContext::to_json() const {
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
 
     writer.StartObject();
+    // txn id
+    writer.Key("TxnId");
+    writer.Int64(txn_id);
+
+    // label
+    writer.Key("Label");
+    writer.String(label.c_str());
+
     // status
     writer.Key("Status");
     switch (status.code()) {
@@ -196,6 +207,12 @@ std::string StreamLoadContext::to_json() const {
     return s.GetString();
 }
 
+std::string StreamLoadContext::brief() const {
+    std::stringstream ss;
+    ss << " id=" << id << ", txn id=" << txn_id << ", label=" << label;
+    return ss.str();
+}
+
 StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
     DorisMetrics::metrics()->register_metric("streaming_load_requests_total",
                                             &k_streaming_load_requests_total);
@@ -315,8 +332,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         ctx->label = generate_uuid_string();
     }
 
-    LOG(INFO) << "new income streaming load request, id=" << ctx->id
-        << ", db=" << ctx->db << ", table=" << ctx->table << ", label=" << 
ctx->label;
+    LOG(INFO) << "new income streaming load request." << ctx->brief()
+              << ", db: " << ctx->db << ", tbl: " << ctx->table;
 
     auto st = _on_header(req, ctx);
     if (!st.ok()) {
@@ -339,7 +356,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
 Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* 
ctx) {
     // auth information
     if (!parse_basic_auth(*http_req, &ctx->auth)) {
-        LOG(WARNING) << "parse basic authorization failed, id=" << ctx->id;
+        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
         return Status("no valid Basic authorization");
     }
     // check content length
@@ -348,7 +365,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
     if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
         ctx->body_bytes = 
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
         if (ctx->body_bytes > max_body_bytes) {
-            LOG(WARNING) << "body exceed max size, id=" << ctx->id;
+            LOG(WARNING) << "body exceed max size." << ctx->brief();
 
             std::stringstream ss;
             ss << "body exceed max size, max_body_bytes=" << max_body_bytes;
@@ -367,7 +384,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
     } else {
         ctx->format = parse_format(http_req->header(HTTP_FORMAT_KEY));
         if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
-            LOG(WARNING) << "unknown data format, id=" << ctx->id;
+            LOG(WARNING) << "unknown data format." << ctx->brief();
             std::stringstream ss;
             ss << "unknown data format, format=" << 
http_req->header(HTTP_FORMAT_KEY);
             return Status(ss.str());
@@ -396,8 +413,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
 #endif
         Status status(result.status);
         if (!status.ok()) {
-            LOG(WARNING) << "begin transaction failed, id=" << ctx->id
-                << "errmsg=" << status.get_error_msg();
+            LOG(WARNING) << "begin transaction failed, errmsg=" << 
status.get_error_msg()
+                    << ctx->brief();
             return status;
         }
         ctx->txn_id = result.txnId;
@@ -424,8 +441,8 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
         bb->flip();
         auto st = ctx->body_sink->append(bb);
         if (!st.ok()) {
-            LOG(WARNING) << "append body content failed, id=" << ctx->id
-                << ", errmsg=" << st.get_error_msg();
+            LOG(WARNING) << "append body content failed. errmsg=" << 
st.get_error_msg()
+                    << ctx->brief();
             ctx->status = st;
             return;
         }
@@ -502,9 +519,8 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
 #endif
     Status plan_status(ctx->put_result.status);
     if (!plan_status.ok()) {
-        LOG(WARNING) << "plan streaming load failed, id=" << ctx->id
-            << ", txn_id=" << ctx->txn_id
-            << ", errmsg=" << plan_status.get_error_msg();
+        LOG(WARNING) << "plan streaming load failed. errmsg=" << 
plan_status.get_error_msg()
+                << ctx->brief();
         return plan_status;
     }
     VLOG(3) << "params is " << 
apache::thrift::ThriftDebugString(ctx->put_result.params);
@@ -539,10 +555,10 @@ Status 
StreamLoadAction::_execute_plan_fragment(StreamLoadContext* ctx) {
                         executor->runtime_state()->get_error_log_file_path());
                 }
             } else {
-                LOG(WARNING) << "fragment execute failed, load_id=" << ctx->id
-                    << ", txn_id=" << ctx->txn_id
+                LOG(WARNING) << "fragment execute failed"
                     << ", query_id=" << 
UniqueId(ctx->put_result.params.params.query_id)
-                    << ", errmsg=" << status.get_error_msg();
+                    << ", errmsg=" << status.get_error_msg()
+                    << ctx->brief();
                 // cancel body_sink, make sender known it
                 if (ctx->body_sink != nullptr) {
                     ctx->body_sink->cancel();
@@ -580,9 +596,8 @@ void StreamLoadAction::rollback(StreamLoadContext* ctx) {
             client->loadTxnRollback(result, request);
         });
     if (!rpc_st.ok()) {
-        LOG(WARNING) << "transaction rollback failed, id=" << ctx->id
-            << ", txn_id=" << ctx->txn_id
-            << ", errmsg=" << rpc_st.get_error_msg();
+        LOG(WARNING) << "transaction rollback failed. errmsg=" << 
rpc_st.get_error_msg()
+                << ctx->brief();
     }
 #else
     result = k_stream_load_rollback_result;
diff --git a/fe/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/src/main/java/org/apache/doris/catalog/Tablet.java
index a8885724..bddc234e 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -20,6 +20,7 @@
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.common.io.Writable;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.apache.logging.log4j.LogManager;
@@ -141,6 +142,14 @@ public void addReplica(Replica replica) {
         return beIds;
     }
 
+    public List<Long> getBackendIdsList() {
+        List<Long> beIds = Lists.newArrayList();
+        for (Replica replica : replicas) {
+            beIds.add(replica.getBackendId());
+        }
+        return beIds;
+    }
+
     // for query
     public void getQueryableReplicas(List<Replica> allQuerableReplica, 
List<Replica> localReplicas,
             long committedVersion, long committedVersionHash, long localBeId) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index d6e15899..6c8db6cb 100644
--- a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -17,13 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Table;
-
 import org.apache.doris.task.RecoverTabletTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TStorageMedium;
@@ -34,6 +27,14 @@
 import org.apache.doris.transaction.TableCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
diff --git a/fe/src/main/java/org/apache/doris/http/rest/LoadAction.java 
b/fe/src/main/java/org/apache/doris/http/rest/LoadAction.java
index ccc670c2..0037c304 100644
--- a/fe/src/main/java/org/apache/doris/http/rest/LoadAction.java
+++ b/fe/src/main/java/org/apache/doris/http/rest/LoadAction.java
@@ -136,7 +136,8 @@ public void executeWithoutPassword(AuthorizationInfo 
authInfo,
             }
         }
 
-        LOG.info("redirect load action to destination={}", 
redirectAddr.toString());
+        LOG.info("redirect load action to destination={}, stream: {}, db: {}, 
tbl: {}, label: {}",
+                redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
         redirectTo(request, response, redirectAddr);
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index c2620771..1398bf3b 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -17,13 +17,6 @@
 
 package org.apache.doris.master;
 
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -65,6 +58,14 @@
 import org.apache.doris.thrift.TTablet;
 import org.apache.doris.thrift.TTabletInfo;
 import org.apache.doris.thrift.TTaskType;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -326,6 +327,7 @@ private static void sync(Map<Long, TTablet> backendTablets, 
ListMultimap<Long, L
             }
             db.writeLock();
             try {
+
                 int syncCounter = 0;
                 List<Long> tabletIds = tabletSyncMap.get(dbId);
                 LOG.info("before sync tablets in db[{}]. report num: {}. 
backend[{}]",
@@ -461,7 +463,8 @@ private static void deleteFromMeta(ListMultimap<Long, Long> 
tabletDeleteFromMeta
                     }
                     
                     // check report version again
-                    if (backendReportVersion < 
Catalog.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
+                    long currentBackendReportVersion = 
Catalog.getCurrentSystemInfo().getBackendReportVersion(backendId);
+                    if (backendReportVersion < currentBackendReportVersion) {
                         continue;
                     }
 
@@ -508,8 +511,10 @@ private static void deleteFromMeta(ListMultimap<Long, 
Long> tabletDeleteFromMeta
                                                                                
      indexId, tabletId, backendId);
 
                         
Catalog.getInstance().getEditLog().logDeleteReplica(info);
-                        LOG.warn("delete replica[{}] in tablet[{}] from meta. 
backend[{}]",
-                                 replica.getId(), tabletId, backendId);
+                        LOG.warn("delete replica[{}] in tablet[{}] from meta. 
backend[{}], report version: {}"
+                                + ", current report version: {}",
+                                replica.getId(), tabletId, backendId, 
backendReportVersion,
+                                currentBackendReportVersion);
                         
                         // check for clone
                         replicas = tablet.getReplicas();
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e9131987..14a91586 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -52,10 +52,10 @@
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -279,11 +279,16 @@ private TOlapTablePartitionParam createPartition(long 
dbId, OlapTable table) thr
         return partitionParam;
     }
 
-    private TOlapTableLocationParam createLocation(OlapTable table) {
+    private TOlapTableLocationParam createLocation(OlapTable table) throws 
UserException {
         TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
         for (Partition partition : table.getPartitions()) {
+            int quorum = 
table.getPartitionInfo().getReplicationNum(partition.getId()) / 2 + 1;          
  
             for (MaterializedIndex index : partition.getMaterializedIndices()) 
{
                 for (Tablet tablet : index.getTablets()) {
+                    List<Long> beIds = tablet.getBackendIdsList();
+                    if (beIds.size() < quorum) {
+                        throw new UserException("tablet " + tablet.getId() + " 
has few replicas: " + beIds.size());
+                    }
                     locationParam.addToTablets(
                             new TTabletLocation(tablet.getId(), 
Lists.newArrayList(tablet.getBackendIds())));
                 }
diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 79f4bfe8..e4fc1842 100644
--- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -39,6 +39,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -138,7 +139,7 @@ public TExecPlanFragmentParams plan() throws UserException {
         queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
         params.setQuery_globals(queryGlobals);
 
-        LOG.info("params is {}", params);
+        LOG.debug("stream load txn id: {}, plan: {}", request.txnId, params);
         return params;
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 739f5cf6..18e9c055 100644
--- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -570,7 +570,9 @@ public TFeResult loadCheck(TLoadCheckRequest request) 
throws TException {
 
     @Override
     public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) 
throws TException {
-        LOG.info("receive loadTxnBegin request, request={}", request);
+        LOG.info("receive loadTxnBegin request, db: {}, tbl: {}, label: {}",
+                request.getDb(), request.getTbl(), request.getLabel());
+        LOG.debug("txn begin request: {}", request);
         TLoadTxnBeginResult result = new TLoadTxnBeginResult();
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java 
b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index a6da0b04..9ca75c0c 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -133,7 +133,7 @@ public long beginTransaction(long dbId, String label, 
String coordinator, LoadJo
                                                             + 
runningTxnNums.get(dbId) + ", larger than limit " + 
Config.max_running_txn_num_per_db);
             }
             long tid = idGenerator.getNextTransactionId();
-            LOG.debug("beginTransaction: tid {} with label {} from coordinator 
{}", tid, label, coordinator);
+            LOG.info("begin transaction: txn id {} with label {} from 
coordinator {}", tid, label, coordinator);
             TransactionState transactionState = new TransactionState(dbId, 
tid, label, sourceType,
                                                                      
coordinator, txnStateChangeListener);
             transactionState.setPrepareTime(System.currentTimeMillis());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to