Repository: impala
Updated Branches:
  refs/heads/master 474f64079 -> 8b7f27af8


Add fragment_instance_id/query_id to important log messages and make them grep 
friendly

Debugging via going through log messages can get very tricky when
the fragment_instance_id and query_id are missing from certain important
log messages. This patch tries to add it in places where I've had issues
in the past.

This patch also tries to standardize on 'fragment_instance_id' and
'query_id' over all log messages so that it's easy to grep through the
logs while debugging.

There may be more places that I've missed, but we can fix them as we
come across them.

Change-Id: I4bb79c0cfa8cb8b99b8539e15e3c1adcb6854cf3
Reviewed-on: http://gerrit.cloudera.org:8080/9535
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8b7f27af
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8b7f27af
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8b7f27af

Branch: refs/heads/master
Commit: 8b7f27af8ffc63f1280d3c092d6949a767c477b1
Parents: 474f640
Author: Sailesh Mukil <sail...@apache.org>
Authored: Wed Mar 7 12:37:04 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Mar 8 02:14:08 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc             | 23 ++++++++++++-----------
 be/src/runtime/data-stream-mgr.cc         |  9 +++++----
 be/src/runtime/data-stream-sender.cc      | 12 +++++++-----
 be/src/runtime/krpc-data-stream-mgr.cc    | 13 +++++++------
 be/src/runtime/krpc-data-stream-recvr.cc  |  2 +-
 be/src/runtime/krpc-data-stream-sender.cc |  8 +++++---
 6 files changed, 37 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index b69b848..a5d53b2 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -354,7 +354,7 @@ void Coordinator::StartBackendExec() {
 
   DebugOptions debug_options(schedule_.query_options());
 
-  VLOG_QUERY << "starting execution on " << num_backends << " backends for 
query "
+  VLOG_QUERY << "starting execution on " << num_backends << " backends for 
query_id="
              << query_id();
   query_events_->MarkEvent(Substitute("Ready to start on $0 backends", 
num_backends));
 
@@ -367,7 +367,7 @@ void Coordinator::StartBackendExec() {
   }
 
   exec_complete_barrier_->Wait();
-  VLOG_QUERY << "started execution on " << num_backends << " backends for 
query "
+  VLOG_QUERY << "started execution on " << num_backends << " backends for 
query_id="
              << query_id();
   query_events_->MarkEvent(
       Substitute("All $0 execution backends ($1 fragment instances) started",
@@ -480,10 +480,10 @@ Status Coordinator::UpdateStatus(const Status& status, 
const string& backend_hos
 
   if (is_fragment_failure) {
     // Log the id of the fragment that first failed so we can track it down 
more easily.
-    VLOG_QUERY << "Query id=" << query_id() << " failed because instance id="
+    VLOG_QUERY << "query_id=" << query_id() << " failed because 
fragment_instance_id="
                << instance_id << " on host=" << backend_hostname << " failed.";
   } else {
-    VLOG_QUERY << "Query id=" << query_id() << " failed due to error on host="
+    VLOG_QUERY << "query_id=" << query_id() << " failed due to error on host="
                << backend_hostname;
   }
   return query_status_;
@@ -787,14 +787,14 @@ Status Coordinator::WaitForBackendCompletion() {
   unique_lock<mutex> l(lock_);
   while (num_remaining_backends_ > 0 && query_status_.ok()) {
     VLOG_QUERY << "Coordinator waiting for backends to finish, "
-               << num_remaining_backends_ << " remaining";
+               << num_remaining_backends_ << " remaining. query_id=" << 
query_id();
     backend_completion_cv_.Wait(l);
   }
   if (query_status_.ok()) {
-    VLOG_QUERY << "All backends finished successfully.";
+    VLOG_QUERY << "All backends finished successfully. query_id=" << 
query_id();
   } else {
-    VLOG_QUERY << "All backends finished due to one or more errors. "
-               << query_status_.GetDetail();
+    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
+               << query_id() << ". " << query_status_.GetDetail();
   }
 
   return query_status_;
@@ -955,7 +955,8 @@ Status Coordinator::UpdateBackendExecStatus(const 
TReportExecStatusParams& param
     if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
       VLOG_QUERY << "Backend completed: "
           << " host=" << backend_state->impalad_address()
-          << " remaining=" << num_remaining_backends_ - 1;
+          << " remaining=" << num_remaining_backends_ - 1
+          << " query_id=" << query_id();
       BackendState::LogFirstInProgress(backend_states_);
     }
     if (--num_remaining_backends_ == 0 || !status.ok()) {
@@ -1088,7 +1089,7 @@ void Coordinator::ReleaseAdmissionControlResources() {
 
 void Coordinator::ReleaseAdmissionControlResourcesLocked() {
   if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admission control resources for query "
+  LOG(INFO) << "Release admission control resources for query_id="
             << PrintId(query_ctx_.query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
@@ -1200,7 +1201,7 @@ void Coordinator::FilterState::ApplyUpdate(const 
TUpdateFilterParams& params,
       if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
         VLOG_QUERY << "Not enough memory to allocate filter: "
                    << PrettyPrinter::Print(heap_space, TUnit::BYTES)
-                   << " (query: " << coord->query_id() << ")";
+                   << " (query_id=" << coord->query_id() << ")";
         // Disable, as one missing update means a correct filter cannot be 
produced.
         Disable(coord->filter_mem_tracker_);
       } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc 
b/be/src/runtime/data-stream-mgr.cc
index 3a22130..ed1e29e 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -81,7 +81,7 @@ shared_ptr<DataStreamRecvrBase> 
DataStreamMgr::CreateRecvr(const RowDescriptor*
     MemTracker* parent_tracker, BufferPool::ClientHandle* client) {
   DCHECK(profile != nullptr);
   DCHECK(parent_tracker != nullptr);
-  VLOG_FILE << "creating receiver for fragment="
+  VLOG_FILE << "creating receiver for fragment_instance_id="
             << fragment_instance_id << ", node=" << dest_node_id;
   shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(this, parent_tracker, 
row_desc,
       fragment_instance_id, dest_node_id, num_senders, is_merging, 
buffer_size, profile));
@@ -126,7 +126,7 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::FindRecvrOrWait(
   num_senders_waiting_->Increment(-1L);
   const string& time_taken = PrettyPrinter::Print(sw.ElapsedTime(), 
TUnit::TIME_NS);
   if (timed_out) {
-    LOG(INFO) << "Datastream sender timed-out waiting for recvr for fragment 
instance: "
+    LOG(INFO) << "Datastream sender timed-out waiting for recvr for 
fragment_instance_id="
               << fragment_instance_id << " (time-out was: " << time_taken << 
"). "
               << "Increase --datastream_sender_timeout_ms if you see this 
message "
               << "frequently.";
@@ -275,7 +275,8 @@ Status DataStreamMgr::DeregisterRecvr(
 }
 
 void DataStreamMgr::Cancel(const TUniqueId& fragment_instance_id) {
-  VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id;
+  VLOG_QUERY << "cancelling all streams for fragment_instance_id="
+             << fragment_instance_id;
   lock_guard<mutex> l(lock_);
   FragmentRecvrSet::iterator i =
       fragment_recvr_set_.lower_bound(make_pair(fragment_instance_id, 0));
@@ -284,7 +285,7 @@ void DataStreamMgr::Cancel(const TUniqueId& 
fragment_instance_id) {
     if (recvr.get() == NULL) {
       // keep going but at least log it
       stringstream err;
-      err << "Cancel(): missing in stream_map: fragment=" << i->first
+      err << "Cancel(): missing in stream_map: fragment_instance_id=" << 
i->first
           << " node=" << i->second;
       LOG(ERROR) << err.str();
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index 30bf5b6..a744929 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -165,7 +165,7 @@ Status DataStreamSender::Channel::Init(RuntimeState* state) 
{
 }
 
 Status DataStreamSender::Channel::SendBatch(TRowBatch* batch) {
-  VLOG_ROW << "Channel::SendBatch() instance_id=" << fragment_instance_id_
+  VLOG_ROW << "Channel::SendBatch() fragment_instance_id=" << 
fragment_instance_id_
            << " dest_node=" << dest_node_id_ << " #rows=" << batch->num_rows;
   // return if the previous batch saw an error
   RETURN_IF_ERROR(GetSendStatus());
@@ -193,7 +193,7 @@ void DataStreamSender::Channel::TransmitData(int thread_id, 
const TRowBatch* bat
 
 void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
   DCHECK(batch != NULL);
-  VLOG_ROW << "Channel::TransmitData() instance_id=" << fragment_instance_id_
+  VLOG_ROW << "Channel::TransmitData() fragment_instance_id=" << 
fragment_instance_id_
            << " dest_node=" << dest_node_id_
            << " #rows=" << batch->num_rows;
   TTransmitDataParams params;
@@ -275,14 +275,15 @@ Status DataStreamSender::Channel::SendCurrentBatch() {
 Status DataStreamSender::Channel::GetSendStatus() {
   WaitForRpc();
   if (!rpc_status_.ok()) {
-    LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " 
failed: "
+    LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " 
failed "
+               << "(fragment_instance_id=" << fragment_instance_id_ << "): "
                << rpc_status_.GetDetail();
   }
   return rpc_status_;
 }
 
 Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
-  VLOG_RPC << "Channel::FlushAndSendEos() instance_id=" << 
fragment_instance_id_
+  VLOG_RPC << "Channel::FlushAndSendEos() fragment_instance_id=" << 
fragment_instance_id_
            << " dest_node=" << dest_node_id_
            << " #rows= " << batch_->num_rows();
 
@@ -313,7 +314,8 @@ Status 
DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
   rpc_status_ = DoTransmitDataRpc(&client, params, &res);
   if (!rpc_status_.ok()) {
     LOG(ERROR) << "Failed to send EOS to " << TNetworkAddressToString(address_)
-               << " : " << rpc_status_.GetDetail();
+               << " (fragment_instance_id=" << fragment_instance_id_ << "): "
+               << rpc_status_.GetDetail();
     return rpc_status_;
   }
   return Status(res.status);

http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 8005183..b291fee 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -104,7 +104,7 @@ shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
   DCHECK(profile != nullptr);
   DCHECK(parent_tracker != nullptr);
   DCHECK(client != nullptr);
-  VLOG_FILE << "creating receiver for fragment="<< finst_id
+  VLOG_FILE << "creating receiver for fragment_instance_id="<< finst_id
             << ", node=" << dest_node_id;
   shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(
       this, parent_tracker, row_desc, finst_id, dest_node_id, num_senders, 
is_merging,
@@ -202,7 +202,7 @@ void KrpcDataStreamMgr::AddData(const 
TransmitDataRequestPB* request,
   finst_id.__set_lo(request->dest_fragment_instance_id().lo());
   finst_id.__set_hi(request->dest_fragment_instance_id().hi());
   TPlanNodeId dest_node_id = request->dest_node_id();
-  VLOG_ROW << "AddData(): finst_id=" << PrintId(finst_id)
+  VLOG_ROW << "AddData(): fragment_instance_id=" << PrintId(finst_id)
            << " node_id=" << request->dest_node_id()
            << " #rows=" << request->row_batch_header().num_rows()
            << " sender_id=" << request->sender_id();
@@ -263,7 +263,7 @@ void KrpcDataStreamMgr::CloseSender(const 
EndDataStreamRequestPB* request,
   TUniqueId finst_id;
   finst_id.__set_lo(request->dest_fragment_instance_id().lo());
   finst_id.__set_hi(request->dest_fragment_instance_id().hi());
-  VLOG_ROW << "CloseSender(): instance_id=" << PrintId(finst_id)
+  VLOG_ROW << "CloseSender(): fragment_instance_id=" << PrintId(finst_id)
            << " node_id=" << request->dest_node_id()
            << " sender_id=" << request->sender_id();
   shared_ptr<KrpcDataStreamRecvr> recvr;
@@ -321,7 +321,7 @@ Status KrpcDataStreamMgr::DeregisterRecvr(
 }
 
 void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
-  VLOG_QUERY << "cancelling all streams for fragment=" << finst_id;
+  VLOG_QUERY << "cancelling all streams for fragment_instance_id=" << finst_id;
   lock_guard<mutex> l(lock_);
   FragmentRecvrSet::iterator iter =
       fragment_recvr_set_.lower_bound(make_pair(finst_id, 0));
@@ -332,8 +332,9 @@ void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
       recvr->CancelStream();
     } else {
       // keep going but at least log it
-      LOG(ERROR) << Substitute("Cancel(): missing in stream_map: fragment=$0 
node=$1",
-          PrintId(iter->first), iter->second);
+      LOG(ERROR) << Substitute(
+          "Cancel(): missing in stream_map: fragment_instance_id=$0 node=$1",
+              PrintId(iter->first), iter->second);
     }
     ++iter;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc 
b/be/src/runtime/krpc-data-stream-recvr.cc
index 60aacce..85563bf 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -482,7 +482,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
       deferred_rpcs_.pop();
     }
   }
-  VLOG_QUERY << "cancelled stream: fragment_instance_id_="
+  VLOG_QUERY << "cancelled stream: fragment_instance_id="
              << recvr_->fragment_instance_id()
              << " node_id=" << recvr_->dest_node_id();
   // Wake up all threads waiting to produce/consume batches. They will all

http://git-wip-us.apache.org/repos/asf/impala/blob/8b7f27af/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index d758826..e6ff0a2 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -333,6 +333,7 @@ Status 
KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* loc
   DCHECK(!rpc_in_flight_);
   if (UNLIKELY(!rpc_status_.ok())) {
     LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " 
failed: "
+               << "(fragment_instance_id=" << fragment_instance_id_ << "): "
                << rpc_status_.GetDetail();
     return rpc_status_;
   }
@@ -444,7 +445,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
 
 Status KrpcDataStreamSender::Channel::TransmitData(
     const OutboundRowBatch* outbound_batch) {
-  VLOG_ROW << "Channel::TransmitData() finst_id=" << fragment_instance_id_
+  VLOG_ROW << "Channel::TransmitData() fragment_instance_id=" << 
fragment_instance_id_
            << " dest_node=" << dest_node_id_
            << " #rows=" << outbound_batch->header()->num_rows();
   std::unique_lock<SpinLock> l(lock_);
@@ -524,7 +525,7 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
 }
 
 Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
-  VLOG_RPC << "Channel::FlushAndSendEos() instance_id=" << 
fragment_instance_id_
+  VLOG_RPC << "Channel::FlushAndSendEos() fragment_instance_id=" << 
fragment_instance_id_
            << " dest_node=" << dest_node_id_
            << " #rows= " << batch_->num_rows();
 
@@ -538,7 +539,8 @@ Status 
KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
     DCHECK(!rpc_in_flight_);
     DCHECK(rpc_status_.ok());
     if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
-    VLOG_RPC << "calling EndDataStream() to terminate channel.";
+    VLOG_RPC << "calling EndDataStream() to terminate channel. 
fragment_instance_id="
+             << fragment_instance_id_;
     rpc_in_flight_ = true;
     COUNTER_ADD(parent_->eos_sent_counter_, 1);
     RETURN_IF_ERROR(DoEndDataStreamRpc());

Reply via email to