http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc index 0731d45..68f00e3 100644 --- a/be/src/runtime/krpc-data-stream-recvr.cc +++ b/be/src/runtime/krpc-data-stream-recvr.cc @@ -17,46 +17,596 @@ #include "runtime/krpc-data-stream-recvr.h" -#include "common/logging.h" +#include <condition_variable> +#include <queue> + +#include <boost/thread/locks.hpp> +#include <boost/thread/mutex.hpp> + +#include "exec/kudu-util.h" +#include "kudu/rpc/rpc_context.h" +#include "runtime/krpc-data-stream-recvr.h" +#include "runtime/krpc-data-stream-mgr.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/sorted-run-merger.h" +#include "util/runtime-profile-counters.h" +#include "util/periodic-counter-updater.h" + +#include "gen-cpp/data_stream_service.pb.h" + +#include "common/names.h" DECLARE_bool(use_krpc); +DECLARE_int32(datastream_service_num_deserialization_threads); + +using kudu::rpc::RpcContext; +using std::condition_variable_any; namespace impala { -[[noreturn]] static void AbortUnsupportedFeature() { - // We should have gotten here only if the FLAGS_use_krpc is set to true. - CHECK(FLAGS_use_krpc) << "Shouldn't reach here unless startup flag 'use_krpc' " - "is true."; - // KRPC isn't supported yet, so abort. - ABORT_WITH_ERROR("KRPC is not supported yet. Please set the 'use_krpc' flag to " - "false and restart the cluster."); +// Implements a FIFO queue of row batches from one or more senders. One queue is +// maintained per sender if is_merging_ is true for the enclosing receiver, otherwise rows +// from all senders are placed in the same queue. +// +// Batches are added by senders via AddBatch(), and removed by an enclosing +// KrpcDataStreamRecvr via GetBatch(). There is a soft limit for the total amount of +// memory consumed by buffered row batches in all sender queues of a receiver. If adding +// a batch will push the memory consumption beyond the limit, that RPC is added to the +// 'deferred batches' queue, which will be drained in FIFO order when space opens up. +// Senders in that state will not be replied to until their row batches are deserialized +// or the receiver is cancelled. This ensures that only one batch per sender is buffered +// in the deferred batches queue. +class KrpcDataStreamRecvr::SenderQueue { + public: + SenderQueue(KrpcDataStreamRecvr* parent_recvr, int num_senders); + + // Returns the next batch from this sender queue. Sets the returned batch in cur_batch_. + // A returned batch that is not filled to capacity does *not* indicate end-of-stream. + // The call blocks until another batch arrives or all senders close their channels. + // The returned batch is owned by the sender queue. The caller must acquire the + // resources from the returned batch before the next call to GetBatch(). + Status GetBatch(RowBatch** next_batch); + + // Adds a new row batch to this sender queue if this stream has not been cancelled. + // If adding this batch causes us to exceed the receiver's buffer limit, the RPC state + // is copied into 'deferred_rpcs_' for deferred processing and this function returns + // immediately. The deferred RPCs are replied to later when space becomes available. + void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response, + RpcContext* context); + + // Tries inserting the front of 'deferred_rpcs_' queue into 'batch_queue_' if possible. + // On success, the first entry of 'deferred_rpcs_' is removed and the sender of the RPC + // will be responded to. If the serialized row batch fails to be extracted from the + // entry, the error status will be sent as reply. + void DequeueDeferredRpc(); + + // Takes over the RPC state 'ctx' of an early sender for deferred processing and + // kicks off a deserialization task to process it asynchronously. The ownership of + // 'ctx' is transferred to this sender queue. + void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx); + + // Decrements the number of remaining senders for this queue and signal any threads + // waiting on the arrival of new batch if the count drops to 0. The number of senders + // will be 1 for a merging KrpcDataStreamRecvr. + void DecrementSenders(); + + // Sets cancellation flag and signals cancellation to receiver and sender. Subsequent + // incoming batches will be dropped and senders in 'deferred_rpcs_' are replied to. + void Cancel(); + + // Must be called once to cleanup any queued resources. + void Close(); + + // Returns the current batch from this queue being processed by a consumer. + RowBatch* current_batch() const { return current_batch_.get(); } + + private: + // Returns true if either (1) 'batch_queue' is empty and there is no pending insertion + // or (2) inserting a row batch of 'batch_size' into 'batch_queue' will not cause the + // soft limit of the receiver to be exceeded. Expected to be called with lock_ held. + bool CanEnqueue(int64_t batch_size) const; + + // Unpacks a serialized row batch from 'request' and 'rpc_context' and populates + // 'tuple_offsets' and 'tuple_data'. On success, the deserialized row batch size will + // be stored in 'batch_size'. On failure, the error status is returned. + static Status UnpackRequest(const TransmitDataRequestPB* request, + RpcContext* rpc_context, kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, + int64_t* batch_size); + + // The workhorse function for deserializing a row batch represented by ('header', + // 'tuple_offsets' and 'tuple_data') and inserting it into 'batch_queue'. Expects to be + // called with 'lock_' held and passed into this function via the argument 'lock'. This + // function may drop lock when deserializing the row batch and re-acquire it after + // the row batch is deserialized. 'batch_size' is the size in bytes of the deserialized + // row batch. The caller is expected to have called CanEnqueue() to make sure the row + // batch can be inserted without exceeding the soft limit of the receiver. Also notify + // a thread waiting on 'data_arrival_cv_'. + void AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header, + const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data, + unique_lock<SpinLock>* lock); + + // Receiver of which this queue is a member. + KrpcDataStreamRecvr* recvr_; + + // Protects all subsequent fields. + SpinLock lock_; + + // If true, the receiver fragment for this stream got cancelled. + bool is_cancelled_ = false; + + // Number of deserialization requests sent to deserialization threads to drain + // 'deferred_rpcs_' which are yet to be processed. Used to limit the number of + // requests queued. + int num_deserialize_tasks_pending_ = 0; + + // Number of senders which haven't closed the channel yet + // (if it drops to 0, end-of-stream is true) + int num_remaining_senders_; + + // Number of pending row batch insertion. AddBatchWork() may drop and reacquire 'lock_', + // causing race between multiple threads calling AddBatch() at the same time or race + // between threads calling AddBatch() and threads calling Close() concurrently. + // AddBatchWork() increments this counter before dropping 'lock_' for deserializing + // the row batch. The counter is decremented after 'lock_' is re-acquired and the row + // batch is inserted into 'batch_queue'. The races are as follows: + // + // 1. Multiple threads inserting into an empty 'batch_queue' concurrently may all see + // it as empty before the first thread manages to insert into batch_queue. This may + // cause the soft limit to be exceeded. A queue is truly empty iff this counter is 0. + // + // 2. Close() cannot proceed until this counter is 0 to make sure all pending inserts + // complete before the 'batch_queue' is cleared. + int num_pending_enqueue_ = 0; + + // Signal the arrival of new batch or the eos/cancelled condition. + condition_variable_any data_arrival_cv_; + + // Queue of (batch length, batch) pairs. The SenderQueue owns the memory to these + // batches until they are handed off to the callers of GetBatch(). + typedef list<pair<int, std::unique_ptr<RowBatch>>> RowBatchQueue; + RowBatchQueue batch_queue_; + + // The batch that was most recently returned via GetBatch(), i.e. the current batch + // from this queue being processed by a consumer. It's destroyed when the next batch + // is retrieved. + scoped_ptr<RowBatch> current_batch_; + + // Set to true when the first batch has been received + bool received_first_batch_ = false; + + // Queue of deferred RPCs - those that have a batch to deliver, but the queue was + // full when they last tried to do so. The senders wait here until there is a space for + // their batches, allowing the receiver-side to implement basic flow-control. + std::queue<std::unique_ptr<TransmitDataCtx>> deferred_rpcs_; +}; + +KrpcDataStreamRecvr::SenderQueue::SenderQueue( + KrpcDataStreamRecvr* parent_recvr, int num_senders) + : recvr_(parent_recvr), num_remaining_senders_(num_senders) { } + +Status KrpcDataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) { + SCOPED_TIMER(recvr_->queue_get_batch_time_); + int num_to_dequeue = 0; + // The sender id is set below when we decide to dequeue entries from 'deferred_rpcs_'. + int sender_id = -1; + { + unique_lock<SpinLock> l(lock_); + // current_batch_ must be replaced with the returned batch. + current_batch_.reset(); + *next_batch = nullptr; + + // Wait until something shows up or we know we're done + while (batch_queue_.empty() && !is_cancelled_ && num_remaining_senders_ > 0) { + VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id() + << " node=" << recvr_->dest_node_id(); + // Don't count time spent waiting on the sender as active time. + CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); + CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); + CANCEL_SAFE_SCOPED_TIMER( + received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, + &is_cancelled_); + data_arrival_cv_.wait(l); + } + + if (UNLIKELY(is_cancelled_)) { + // Cancellation should have drained the entire 'deferred_rpcs_' queue. + // Make sure the senders were replied to or they may be stuck waiting for a reply. + DCHECK(deferred_rpcs_.empty()); + return Status::CANCELLED; + } + + // All senders have sent their row batches. Nothing to do. + if (num_remaining_senders_ == 0 && batch_queue_.empty()) { + // Note that it's an invariant that a sender cannot send the EOS RPC until all + // outstanding TransmitData() RPCs have been replied to. Therefore, it should be + // impossible for num_remaining_senders_ to reach 0 before all RPCs in + // 'deferred_rpcs_' have been replied to. + DCHECK(deferred_rpcs_.empty()); + DCHECK_EQ(num_pending_enqueue_, 0); + return Status::OK(); + } + + // Notify the deserialization threads to retry delivering the deferred RPCs. + if (!deferred_rpcs_.empty()) { + // Try dequeuing multiple entries from 'deferred_rpcs_' to parallelize the CPU + // bound deserialization work. No point in dequeuing more than number of + // deserialization threads available. + DCHECK_GE(deferred_rpcs_.size(), num_deserialize_tasks_pending_); + num_to_dequeue = min(FLAGS_datastream_service_num_deserialization_threads, + (int)deferred_rpcs_.size() - num_deserialize_tasks_pending_); + num_deserialize_tasks_pending_ += num_to_dequeue; + sender_id = deferred_rpcs_.front()->request->sender_id(); + } + + DCHECK(!batch_queue_.empty()); + received_first_batch_ = true; + RowBatch* result = batch_queue_.front().second.release(); + recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); + batch_queue_.pop_front(); + VLOG_ROW << "fetched #rows=" << result->num_rows(); + current_batch_.reset(result); + *next_batch = current_batch_.get(); + } + // Don't hold lock when calling EnqueueDeserializeTask() as it may block. + // It's important that the dequeuing of 'deferred_rpcs_' is done after the entry + // has been removed from 'batch_queue_' or the deserialization threads may fail to + // insert into a non-empty 'batch_queue_' and the receiver will be waiting forever. + if (num_to_dequeue > 0) { + DCHECK_GE(sender_id, 0); + recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(), + recvr_->dest_node_id(), sender_id, num_to_dequeue); + } + return Status::OK(); } -[[noreturn]] KrpcDataStreamRecvr::KrpcDataStreamRecvr() { - AbortUnsupportedFeature(); +inline bool KrpcDataStreamRecvr::SenderQueue::CanEnqueue(int64_t batch_size) const { + // The queue is truly empty iff there is no pending insert. It's important that we + // enqueue the new batch regardless of buffer limit if the queue is currently empty. + // In the case of a merging receiver, batches are received from a specific queue + // based on data order, and the pipeline will stall if the merger is waiting for data + // from an empty queue that cannot be filled because the limit has been reached. + bool queue_empty = batch_queue_.empty() && num_pending_enqueue_ == 0; + return queue_empty || !recvr_->ExceedsLimit(batch_size); } -KrpcDataStreamRecvr::~KrpcDataStreamRecvr() { +Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest( + const TransmitDataRequestPB* request, RpcContext* rpc_context, + kudu::Slice* tuple_offsets, kudu::Slice* tuple_data, int64_t* batch_size) { + // Unpack the tuple offsets. + KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar( + request->tuple_offsets_sidecar_idx(), tuple_offsets), + "Failed to get the tuple offsets sidecar"); + // Unpack the tuple data. + KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar( + request->tuple_data_sidecar_idx(), tuple_data), + "Failed to get the tuple data sidecar"); + // Compute the size of the deserialized row batch. + *batch_size = + RowBatch::GetDeserializedSize(request->row_batch_header(), *tuple_offsets); + return Status::OK(); } -[[noreturn]] Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) { - AbortUnsupportedFeature(); +void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size, + const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets, + const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) { + DCHECK(lock != nullptr); + DCHECK(lock->owns_lock()); + + COUNTER_ADD(recvr_->num_accepted_batches_, 1); + COUNTER_ADD(recvr_->bytes_received_counter_, batch_size); + // Reserve queue space before dropping the lock below. + recvr_->num_buffered_bytes_.Add(batch_size); + DCHECK_GE(num_pending_enqueue_, 0); + ++num_pending_enqueue_; + + // Deserialization may take some time due to compression and memory allocation. + // Drop the lock so we can deserialize multiple batches in parallel. + lock->unlock(); + unique_ptr<RowBatch> batch; + { + SCOPED_TIMER(recvr_->deserialize_row_batch_timer_); + // At this point, the row batch will be inserted into batch_queue_. Close() will + // handle deleting any unconsumed batches from batch_queue_. Close() cannot proceed + // until there are no pending insertion to batch_queue_. + batch.reset(new RowBatch(recvr_->row_desc(), header, tuple_offsets, tuple_data, + recvr_->mem_tracker())); + } + lock->lock(); + + DCHECK_GT(num_pending_enqueue_, 0); + --num_pending_enqueue_; + VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size; + batch_queue_.emplace_back(batch_size, move(batch)); + data_arrival_cv_.notify_one(); } -[[noreturn]] void KrpcDataStreamRecvr::Close() { - AbortUnsupportedFeature(); +void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* request, + TransmitDataResponsePB* response, RpcContext* rpc_context) { + // TODO: Add timers for time spent in this function and queue time in 'batch_queue_'. + const RowBatchHeaderPB& header = request->row_batch_header(); + kudu::Slice tuple_offsets; + kudu::Slice tuple_data; + int64_t batch_size; + Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data, + &batch_size); + if (UNLIKELY(!status.ok())) { + status.ToProto(response->mutable_status()); + rpc_context->RespondSuccess(); + return; + } + + { + unique_lock<SpinLock> l(lock_); + // There should be one or more senders left when this function is called. The reason + // is that EndDataStream RPC is not sent until all outstanding TransmitData() RPC has + // been replied to. There is at least one TransmitData() RPC which hasn't yet been + // responded to if we reach here. + DCHECK_GT(num_remaining_senders_, 0); + if (UNLIKELY(is_cancelled_)) { + Status::OK().ToProto(response->mutable_status()); + rpc_context->RespondSuccess(); + return; + } + + // If there's something in the queue and this batch will push us over the buffer + // limit we need to wait until the queue gets drained. We store the rpc context + // so that we can signal it at a later time to resend the batch that we couldn't + // process here. If there are already deferred RPCs waiting in queue, the new + // batch needs to line up after the deferred RPCs to avoid starvation of senders + // in the non-merging case. + if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) { + auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context); + deferred_rpcs_.push(move(payload)); + COUNTER_ADD(recvr_->num_deferred_batches_, 1); + return; + } + + // At this point, we are committed to inserting the row batch into 'batch_queue_'. + AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l); + } + + // Respond to the sender to ack the insertion of the row batches. + Status::OK().ToProto(response->mutable_status()); + rpc_context->RespondSuccess(); +} + +void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() { + // Owns the first entry of 'deferred_rpcs_' if it ends up being popped. + std::unique_ptr<TransmitDataCtx> ctx; + { + unique_lock<SpinLock> l(lock_); + DCHECK_GT(num_deserialize_tasks_pending_, 0); + --num_deserialize_tasks_pending_; + + // Returns if the queue has been cancelled or if it's empty. + if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return; + + // Try enqueuing the first entry into 'batch_queue_'. + ctx.swap(deferred_rpcs_.front()); + kudu::Slice tuple_offsets; + kudu::Slice tuple_data; + int64_t batch_size; + Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets, + &tuple_data, &batch_size); + // Reply with error status if the entry cannot be unpacked. + if (UNLIKELY(!status.ok())) { + status.ToProto(ctx->response->mutable_status()); + ctx->rpc_context->RespondSuccess(); + deferred_rpcs_.pop(); + return; + } + + // Stops if inserting the batch causes us to go over the limit. + // Put 'ctx' back on the queue. + if (!CanEnqueue(batch_size)) { + ctx.swap(deferred_rpcs_.front()); + DCHECK(deferred_rpcs_.front().get() != nullptr); + return; + } + + // Dequeues the deferred batch and adds it to 'batch_queue_'. + deferred_rpcs_.pop(); + const RowBatchHeaderPB& header = ctx->request->row_batch_header(); + AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l); + } + + // Responds to the sender to ack the insertion of the row batches. + Status::OK().ToProto(ctx->response->mutable_status()); + ctx->rpc_context->RespondSuccess(); +} + +void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender( + unique_ptr<TransmitDataCtx> ctx) { + int sender_id = ctx->request->sender_id(); + COUNTER_ADD(recvr_->num_deferred_batches_, 1); + { + lock_guard<SpinLock> l(lock_); + deferred_rpcs_.push(move(ctx)); + ++num_deserialize_tasks_pending_; + } + recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(), + recvr_->dest_node_id(), sender_id, 1); +} + +void KrpcDataStreamRecvr::SenderQueue::DecrementSenders() { + lock_guard<SpinLock> l(lock_); + DCHECK_GT(num_remaining_senders_, 0); + num_remaining_senders_ = max(0, num_remaining_senders_ - 1); + VLOG_FILE << "decremented senders: fragment_instance_id=" + << recvr_->fragment_instance_id() + << " node_id=" << recvr_->dest_node_id() + << " #senders=" << num_remaining_senders_; + if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one(); +} + +void KrpcDataStreamRecvr::SenderQueue::Cancel() { + { + lock_guard<SpinLock> l(lock_); + if (is_cancelled_) return; + is_cancelled_ = true; + + // Respond to deferred RPCs. + while (!deferred_rpcs_.empty()) { + const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front(); + Status::OK().ToProto(payload->response->mutable_status()); + payload->rpc_context->RespondSuccess(); + deferred_rpcs_.pop(); + } + } + VLOG_QUERY << "cancelled stream: fragment_instance_id_=" + << recvr_->fragment_instance_id() + << " node_id=" << recvr_->dest_node_id(); + // Wake up all threads waiting to produce/consume batches. They will all + // notice that the stream is cancelled and handle it. + data_arrival_cv_.notify_all(); + PeriodicCounterUpdater::StopTimeSeriesCounter( + recvr_->bytes_received_time_series_counter_); +} + +void KrpcDataStreamRecvr::SenderQueue::Close() { + unique_lock<SpinLock> l(lock_); + // Note that the queue must be cancelled first before it can be closed or we may + // risk running into a race which can leak row batches. Please see IMPALA-3034. + DCHECK(is_cancelled_); + + // Wait for any pending insertion to complete first. + while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l); + + // Delete any batches queued in batch_queue_ + batch_queue_.clear(); + current_batch_.reset(); +} + +Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) { + DCHECK(is_merging_); + vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers; + input_batch_suppliers.reserve(sender_queues_.size()); + + // Create the merger that will a single stream of sorted rows. + merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false)); + + for (SenderQueue* queue: sender_queues_) { + input_batch_suppliers.push_back( + [queue](RowBatch** next_batch) -> Status { + return queue->GetBatch(next_batch); + }); + } + + RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers)); + return Status::OK(); } -[[noreturn]] Status KrpcDataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) { - AbortUnsupportedFeature(); +void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) { + for (SenderQueue* sender_queue: sender_queues_) { + if (sender_queue->current_batch() != nullptr) { + sender_queue->current_batch()->TransferResourceOwnership(transfer_batch); + } + } +} + +KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, + MemTracker* parent_tracker, const RowDescriptor* row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile) + : mgr_(stream_mgr), + fragment_instance_id_(fragment_instance_id), + dest_node_id_(dest_node_id), + total_buffer_limit_(total_buffer_limit), + row_desc_(row_desc), + is_merging_(is_merging), + num_buffered_bytes_(0), + profile_(profile), + recvr_side_profile_(profile_->CreateChild("RecvrSide")), + sender_side_profile_(profile_->CreateChild("SenderSide")) { + mem_tracker_.reset(new MemTracker(-1, "KrpcDataStreamRecvr", parent_tracker)); + // Create one queue per sender if is_merging is true. + int num_queues = is_merging ? num_senders : 1; + sender_queues_.reserve(num_queues); + int num_sender_per_queue = is_merging ? 1 : num_senders; + for (int i = 0; i < num_queues; ++i) { + SenderQueue* queue = + sender_queue_pool_.Add(new SenderQueue(this, num_sender_per_queue)); + sender_queues_.push_back(queue); + } + + // Initialize the counters + bytes_received_counter_ = + ADD_COUNTER(recvr_side_profile_, "TotalBytesReceived", TUnit::BYTES); + bytes_received_time_series_counter_ = ADD_TIME_SERIES_COUNTER( + recvr_side_profile_, "BytesReceived", bytes_received_counter_); + queue_get_batch_time_ = ADD_TIMER(recvr_side_profile_, "TotalGetBatchTime"); + data_arrival_timer_ = + ADD_CHILD_TIMER(recvr_side_profile_, "DataArrivalTimer", "TotalGetBatchTime"); + first_batch_wait_total_timer_ = + ADD_TIMER(recvr_side_profile_, "FirstBatchArrivalWaitTime"); + deserialize_row_batch_timer_ = + ADD_TIMER(sender_side_profile_, "DeserializeRowBatchTime"); + inactive_timer_ = profile_->inactive_timer(); + num_early_senders_ = + ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT); + num_deferred_batches_ = + ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT); + num_accepted_batches_ = + ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT); } -[[noreturn]] Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) { - AbortUnsupportedFeature(); +Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) { + DCHECK(merger_.get() != nullptr); + return merger_->GetNext(output_batch, eos); +} + +void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request, + TransmitDataResponsePB* response, RpcContext* rpc_context) { + int use_sender_id = is_merging_ ? request->sender_id() : 0; + // Add all batches to the same queue if is_merging_ is false. + sender_queues_[use_sender_id]->AddBatch(request, response, rpc_context); +} + +void KrpcDataStreamRecvr::DequeueDeferredRpc(int sender_id) { + int use_sender_id = is_merging_ ? sender_id : 0; + // Add all batches to the same queue if is_merging_ is false. + sender_queues_[use_sender_id]->DequeueDeferredRpc(); +} + +void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) { + int use_sender_id = is_merging_ ? ctx->request->sender_id() : 0; + // Add all batches to the same queue if is_merging_ is false. + sender_queues_[use_sender_id]->TakeOverEarlySender(move(ctx)); + COUNTER_ADD(num_early_senders_, 1); +} + +void KrpcDataStreamRecvr::RemoveSender(int sender_id) { + int use_sender_id = is_merging_ ? sender_id : 0; + sender_queues_[use_sender_id]->DecrementSenders(); +} + +void KrpcDataStreamRecvr::CancelStream() { + for (auto& queue: sender_queues_) queue->Cancel(); +} + +void KrpcDataStreamRecvr::Close() { + // Remove this receiver from the KrpcDataStreamMgr that created it. + // All the sender queues will be cancelled after this call returns. + const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id()); + if (!status.ok()) { + LOG(ERROR) << "Error deregistering receiver: " << status.GetDetail(); + } + mgr_ = nullptr; + for (auto& queue: sender_queues_) queue->Close(); + merger_.reset(); + mem_tracker_->Close(); + recvr_side_profile_->StopPeriodicCounters(); +} + +KrpcDataStreamRecvr::~KrpcDataStreamRecvr() { + DCHECK(mgr_ == nullptr) << "Must call Close()"; } -[[noreturn]] void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) { - AbortUnsupportedFeature(); +Status KrpcDataStreamRecvr::GetBatch(RowBatch** next_batch) { + DCHECK(!is_merging_); + DCHECK_EQ(sender_queues_.size(), 1); + return sender_queues_[0]->GetBatch(next_batch); } } // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-recvr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h index be46ae3..8bd99cf 100644 --- a/be/src/runtime/krpc-data-stream-recvr.h +++ b/be/src/runtime/krpc-data-stream-recvr.h @@ -20,28 +20,207 @@ #include "data-stream-recvr-base.h" +#include <boost/scoped_ptr.hpp> +#include <boost/thread/mutex.hpp> + +#include "common/object-pool.h" #include "common/status.h" +#include "gen-cpp/Types_types.h" // for TUniqueId +#include "runtime/descriptors.h" +#include "util/tuple-row-compare.h" + +namespace kudu { +namespace rpc { +class RpcContext; +} // namespace rpc +} // namespace kudu namespace impala { +class KrpcDataStreamMgr; +class MemTracker; class RowBatch; -class TupleRowComparator; +class RuntimeProfile; +class SortedRunMerger; +class TransmitDataCtx; +class TransmitDataRequestPB; +class TransmitDataResponsePB; -/// TODO: Stub for the KRPC version of the DataStreamRecvr. Fill with actual -/// implementation. +/// Single receiver of an m:n data stream. +/// +/// KrpcDataStreamRecvr maintains one or more queues of row batches received by a +/// KrpcDataStreamMgr from one or more sender fragment instances. Receivers are created +/// via KrpcDataStreamMgr::CreateRecvr(). Ownership of a stream recvr is shared between +/// the KrpcDataStreamMgr that created it and the caller of +/// KrpcDataStreamMgr::CreateRecvr() (i.e. the exchange node). +/// +/// The is_merging_ member determines if the recvr merges input streams from different +/// sender fragment instances according to a specified sort order. +/// If is_merging_ is false : Only one batch queue is maintained for row batches from all +/// sender fragment instances. These row batches are returned one at a time via GetBatch() +/// If is_merging_ is true : One queue is created for the batches from each distinct +/// sender. A SortedRunMerger instance must be created via CreateMerger() prior to +/// retrieving any rows from the receiver. Rows are retrieved from the receiver via +/// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to +/// GetNext(), TransferAllResources() must be called to transfer resources from the input +/// batches from each sender to the caller's output batch. +/// The receiver sets deep_copy to false on the merger - resources are transferred from +/// the input batches from each sender queue to the merger to the output batch by the +/// merger itself as it processes each run. +/// +/// KrpcDataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove +/// the recvr instance from the tracking structure of its KrpcDataStreamMgr in all cases. class KrpcDataStreamRecvr : public DataStreamRecvrBase { public: - [[noreturn]] KrpcDataStreamRecvr(); - virtual ~KrpcDataStreamRecvr() override; + ~KrpcDataStreamRecvr(); + + /// Returns next row batch in data stream; blocks if there aren't any. + /// Retains ownership of the returned batch. The caller must call TransferAllResources() + /// to acquire the resources from the returned batch before the next call to GetBatch(). + /// A NULL returned batch indicated eos. Must only be called if is_merging_ is false. + /// TODO: This is currently only exposed to the non-merging version of the exchange. + /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos). + Status GetBatch(RowBatch** next_batch); + + /// Deregister from KrpcDataStreamMgr instance, which shares ownership of this instance. + void Close(); + + /// Create a SortedRunMerger instance to merge rows from multiple sender according to + /// the specified row comparator. Fetches the first batches from the individual sender + /// queues. The exprs used in less_than must have already been prepared and opened. + Status CreateMerger(const TupleRowComparator& less_than); + + /// Fill output_batch with the next batch of rows obtained by merging the per-sender + /// input streams. Must only be called if is_merging_ is true. + Status GetNext(RowBatch* output_batch, bool* eos); + + /// Transfer all resources from the current batches being processed from each sender + /// queue to the specified batch. + void TransferAllResources(RowBatch* transfer_batch); + + const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; } + PlanNodeId dest_node_id() const { return dest_node_id_; } + const RowDescriptor* row_desc() const { return row_desc_; } + MemTracker* mem_tracker() const { return mem_tracker_.get(); } + + private: + friend class KrpcDataStreamMgr; + class SenderQueue; + + KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, MemTracker* parent_tracker, + const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, + int64_t total_buffer_limit, RuntimeProfile* profile); + + /// Adds a new row batch to the appropriate sender queue. If the row batch can be + /// inserted, the RPC will be responded to before this function returns. If the batch + /// can't be added without exceeding the buffer limit, it is appended to a queue for + /// deferred processing. The RPC will be responded to when the row batch is deserialized + /// later. + void AddBatch(const TransmitDataRequestPB* request, TransmitDataResponsePB* response, + kudu::rpc::RpcContext* context); + + /// Tries adding the first entry of 'deferred_rpcs_' queue for the sender queue + /// identified by 'sender_id'. If is_merging_ is false, it always defaults to + /// queue 0; If is_merging_ is true, the sender queue is identified by 'sender_id_'. + void DequeueDeferredRpc(int sender_id); + + /// Takes over the RPC state 'ctx' of an early sender for deferred processing and + /// kicks off a deserialization task to process it asynchronously. This makes sure + /// new incoming RPCs won't pass the early senders, leading to starvation. + void TakeOverEarlySender(std::unique_ptr<TransmitDataCtx> ctx); + + /// Indicate that a particular sender is done. Delegated to the appropriate + /// sender queue. Called from KrpcDataStreamMgr. + void RemoveSender(int sender_id); + + /// Marks all sender queues as cancelled and notifies all waiting consumers of + /// cancellation. + void CancelStream(); + + /// Return true if the addition of a new batch of size 'batch_size' would exceed the + /// total buffer limit. + bool ExceedsLimit(int64_t batch_size) { + return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_; + } + + /// KrpcDataStreamMgr instance used to create this recvr. (Not owned) + KrpcDataStreamMgr* mgr_; + + /// Fragment and node id of the destination exchange node this receiver is used by. + TUniqueId fragment_instance_id_; + PlanNodeId dest_node_id_; + + /// Soft upper limit on the total amount of buffering in bytes allowed for this stream + /// across all sender queues. We defer processing of incoming RPCs once the amount of + /// buffered data exceeds this value. + const int64_t total_buffer_limit_; + + /// Row schema. + const RowDescriptor* row_desc_; + + /// True if this reciver merges incoming rows from different senders. Per-sender + /// row batch queues are maintained in this case. + bool is_merging_; + + /// total number of bytes held across all sender queues. + AtomicInt32 num_buffered_bytes_; + + /// Memtracker for batches in the sender queue(s). + boost::scoped_ptr<MemTracker> mem_tracker_; + + /// One or more queues of row batches received from senders. If is_merging_ is true, + /// there is one SenderQueue for each sender. Otherwise, row batches from all senders + /// are placed in the same SenderQueue. The SenderQueue instances are owned by the + /// receiver and placed in sender_queue_pool_. + std::vector<SenderQueue*> sender_queues_; + + /// SortedRunMerger used to merge rows from different senders. + boost::scoped_ptr<SortedRunMerger> merger_; + + /// Pool of sender queues. + ObjectPool sender_queue_pool_; + + /// Runtime profile storing the counters below. + RuntimeProfile* profile_; + + /// Maintain two child profiles - receiver side measurements (from the GetBatch() path), + /// and sender side measurements (from AddBatch()). + RuntimeProfile* recvr_side_profile_; + RuntimeProfile* sender_side_profile_; + + /// Number of bytes received. + RuntimeProfile::Counter* bytes_received_counter_; + + /// Time series of number of bytes received, samples bytes_received_counter_ + RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_; + + /// Total wall-clock time spent deserializing row batches. + RuntimeProfile::Counter* deserialize_row_batch_timer_; + + /// Number of senders which arrive before the receiver is ready. + RuntimeProfile::Counter* num_early_senders_; + + /// Time spent waiting until the first batch arrives across all queues. + /// TODO: Turn this into a wall-clock timer. + RuntimeProfile::Counter* first_batch_wait_total_timer_; + + /// Total number of batches received and deferred as sender queue is full. + RuntimeProfile::Counter* num_deferred_batches_; + + /// Total number of batches received and accepted into the sender queue. + RuntimeProfile::Counter* num_accepted_batches_; + + /// Total wall-clock time spent waiting for data to arrive in the recv buffer. + RuntimeProfile::Counter* data_arrival_timer_; - [[noreturn]] Status GetBatch(RowBatch** next_batch) override; - [[noreturn]] void Close() override; - [[noreturn]] Status CreateMerger(const TupleRowComparator& less_than) override; - [[noreturn]] Status GetNext(RowBatch* output_batch, bool* eos) override; - [[noreturn]] void TransferAllResources(RowBatch* transfer_batch) override; + /// Pointer to profile's inactive timer. + RuntimeProfile::Counter* inactive_timer_; + /// Total time spent in SenderQueue::GetBatch(). + RuntimeProfile::Counter* queue_get_batch_time_; }; } // namespace impala -#endif /* IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H */ +#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc new file mode 100644 index 0000000..32e20cd --- /dev/null +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -0,0 +1,754 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/krpc-data-stream-sender.h" + +#include <boost/bind.hpp> + +#include <chrono> +#include <condition_variable> +#include <iostream> +#include <thrift/protocol/TDebugProtocol.h> + +#include "common/logging.h" +#include "exec/kudu-util.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" +#include "gutil/strings/substitute.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "rpc/rpc-mgr.inline.h" +#include "runtime/descriptors.h" +#include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" +#include "runtime/raw-value.inline.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "util/aligned-new.h" +#include "util/debug-util.h" +#include "util/network-util.h" + +#include "gen-cpp/data_stream_service.pb.h" +#include "gen-cpp/data_stream_service.proxy.h" +#include "gen-cpp/Types_types.h" + +#include "common/names.h" + +using std::condition_variable_any; +using namespace apache::thrift; +using kudu::rpc::RpcController; +using kudu::rpc::RpcSidecar; +using kudu::MonoDelta; + +DECLARE_int32(rpc_retry_interval_ms); + +namespace impala { + +// A datastream sender may send row batches to multiple destinations. There is one +// channel for each destination. +// +// Clients can call TransmitData() to directly send a serialized row batch to the +// destination or it can call AddRow() to accumulate rows in an internal row batch +// to certain capacity before sending it. The underlying RPC layer is implemented +// with KRPC, which provides interfaces for asynchronous RPC calls. Normally, the +// calls above will return before the RPC has completed but they may block if there +// is already an in-flight RPC. +// +// Each channel internally has two OutboundRowBatch to serialize to. They are reused +// across multiple RPC calls. Having two OutboundRowBatch allows client to serialize +// the next row batch while the current row batch is being sent. Upon completion of +// a RPC, the callback TransmitDataCompleteCb() is invoked. If the RPC fails due to +// remote service's queue being full, TransmitDataCompleteCb() will schedule the retry +// callback RetryCb() after some delay dervied from 'FLAGS_rpc_retry_internal_ms'. +// +// When a data stream sender is shut down, it will call Teardown() on all channels to +// release resources. Teardown() will cancel any in-flight RPC and wait for the +// completion callback to be called before returning. It's expected that the execution +// thread to call FlushAndSendEos() before closing the data stream sender to flush all +// buffered row batches and send the end-of-stream message to the remote receiver. +// Note that the RPC payloads are owned solely by the channel and the KRPC layer will +// relinquish references of them before the completion callback is invoked so it's +// safe to free them once the callback has been invoked. +// +// Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client +// has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the +// right place, except that's currently called too early). RpcController::Cancel() ensures +// that the callback is called only after the RPC layer no longer references the sidecar +// buffers. +class KrpcDataStreamSender::Channel : public CacheLineAligned { + public: + // Creates a channel to send data to particular ipaddress/port/fragment instance id/node + // combination. buffer_size is specified in bytes and a soft limit on how much tuple + // data is getting accumulated before being sent; it only applies when data is added via + // AddRow() and not sent directly via SendBatch(). + Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc, + const TNetworkAddress& destination, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int buffer_size) + : parent_(parent), + row_desc_(row_desc), + address_(destination), + fragment_instance_id_(fragment_instance_id), + dest_node_id_(dest_node_id) { + DCHECK(IsResolvedAddress(address_)); + } + + // Initializes the channel. + // Returns OK if successful, error indication otherwise. + Status Init(RuntimeState* state); + + // Serializes the given row batch and send it to the destination. If the preceding + // RPC is in progress, this function may block until the previous RPC finishes. + // Return error status if serialization or the preceding RPC failed. Return OK + // otherwise. + Status SerializeAndSendBatch(RowBatch* batch); + + // Transmits the serialized row batch 'outbound_batch'. This function may block if the + // preceding RPC is still in-flight. This is expected to be called from the fragment + // instance execution thread. Return error status if initialization of the RPC request + // parameters failed or if the preceding RPC failed. Returns OK otherwise. + Status TransmitData(const OutboundRowBatch* outbound_batch); + + // Copies a single row into this channel's row batch and flushes the row batch once + // it reaches capacity. This call may block if the row batch's capacity is reached + // and the preceding RPC is still in progress. Returns error status if serialization + // failed or if the preceding RPC failed. Return OK otherwise. + Status AddRow(TupleRow* row); + + // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will + // be cancelled. It's expected that clients normally call FlushAndSendEos() before + // calling Teardown() to flush all buffered row batches to destinations. Teardown() + // may be called without FlushAndSendEos() in cases such as cancellation or error. + void Teardown(RuntimeState* state); + + // Flushes any buffered row batches and sends the EOS RPC to close the channel. + // Return error status if either the last TransmitData() RPC or EOS RPC failed. + // This function blocks until the EOS RPC is complete. + Status FlushAndSendEos(RuntimeState* state); + + int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; } + + // The type for a RPC worker function. + typedef boost::function<Status()> DoRpcFn; + + private: + // The parent data stream sender owning this channel. Not owned. + KrpcDataStreamSender* parent_; + + // The descriptor of the accumulated rows in 'batch_' below. Used for computing + // the capacity of 'batch_' and also when adding a row in AddRow(). + const RowDescriptor* row_desc_; + + // The triplet of IP-address:port/finst-id/node-id uniquely identifies the receiver. + const TNetworkAddress address_; + const TUniqueId fragment_instance_id_; + const PlanNodeId dest_node_id_; + + // Number of bytes of all serialized row batches sent successfully. + int64_t num_data_bytes_sent_ = 0; + + // The row batch for accumulating rows copied from AddRow(). + // Only used if the partitioning scheme is "KUDU" or "HASH_PARTITIONED". + scoped_ptr<RowBatch> batch_; + + // The outbound row batches are double-buffered so that we can serialize the next + // batch while the other is still referenced by the in-flight RPC. Each entry contains + // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data. + // + // TODO: replace this with an actual queue. Schedule another RPC callback in the + // completion callback if the queue is not empty. + // TODO: rethink whether to keep per-channel buffers vs having all buffers in the + // datastream sender and sharing them across all channels. These buffers are not used in + // "UNPARTITIONED" scheme. + OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES]; + + // Index into 'outbound_batches_' for the next available OutboundRowBatch to serialize + // into. This is read and written by the main execution thread. + int next_batch_idx_ = 0; + + // Synchronize accesses to the following fields between the main execution thread and + // the KRPC reactor thread. Note that there should be only one reactor thread invoking + // the callbacks for a channel so there should be no races between multiple reactor + // threads. Protect all subsequent fields. + SpinLock lock_; + + // 'lock_' needs to be held when accessing the following fields. + // The client interface for making RPC calls to the remote DataStreamService. + std::unique_ptr<DataStreamServiceProxy> proxy_; + + // Controller for managing properties of a single RPC call (such as features required + // in the remote servers) and passing the payloads to the actual OutboundCall object. + RpcController rpc_controller_; + + // Protobuf response buffer for TransmitData() RPC. + TransmitDataResponsePB resp_; + + // Protobuf response buffer for EndDataStream() RPC. + EndDataStreamResponsePB eos_resp_; + + // Signaled when the in-flight RPC completes. + condition_variable_any rpc_done_cv_; + + // Status of the most recently completed RPC. + Status rpc_status_; + + // The pointer to the current serialized row batch being sent. + const OutboundRowBatch* rpc_in_flight_batch_ = nullptr; + + // True if there is an in-flight RPC. + bool rpc_in_flight_ = false; + + // True if the channel is being shut down or shut down already. + bool shutdown_ = false; + + // True if the remote receiver is closed already. In which case, all rows would + // be dropped silently. + // TODO: Fix IMPALA-3990 + bool remote_recvr_closed_ = false; + + // Returns true if the channel should terminate because the parent sender + // has been closed or cancelled. + bool ShouldTerminate() const { return shutdown_ || parent_->state_->is_cancelled(); } + + // Send the rows accumulated in the internal row batch. This will serialize the + // internal row batch before sending them to the destination. This may block if + // the preceding RPC is still in progress. Returns error status if serialization + // fails or if the preceding RPC fails. + Status SendCurrentBatch(); + + // Called when an RPC failed. If it turns out that the RPC failed because the + // remote server is too busy, this function will schedule RetryCb() to be called + // after FLAGS_rpc_retry_interval_ms milliseconds, which in turn re-invokes the RPC. + // Otherwise, it will call MarkDone() to mark the RPC as done and failed. + // 'controller_status' is a Kudu status returned from the KRPC layer. + // 'rpc_fn' is a worker function which initializes the RPC parameters and invokes + // the actual RPC when the RPC is rescheduled. + // 'err_msg' is an error message to be prepended to the status converted from the + // Kudu status 'controller_status'. + void HandleFailedRPC(const DoRpcFn& rpc_fn, const kudu::Status& controller_status, + const string& err_msg); + + // Waits for the preceding RPC to complete. Expects to be called with 'lock_' held. + // May drop the lock while waiting for the RPC to complete. Return error status if + // the preceding RPC fails. Returns CANCELLED if the parent sender is cancelled or + // shut down. Returns OK otherwise. This should be only called from a fragment + // executor thread. + Status WaitForRpc(std::unique_lock<SpinLock>* lock); + + // A callback function called from KRPC reactor thread to retry an RPC which failed + // previously due to remote server being too busy. This will re-arm the request + // parameters of the RPC. The retry may not happen if the callback has been aborted + // internally by KRPC code (e.g. the reactor thread was being shut down) or if the + // parent sender has been cancelled or closed since the scheduling of this callback. + // In which case, MarkDone() will be called with the error status and the RPC is + // considered complete. 'status' is the error status passed by KRPC code in case the + // callback was aborted. + void RetryCb(DoRpcFn rpc_fn, const kudu::Status& status); + + // A callback function called from KRPC reactor threads upon completion of an in-flight + // TransmitData() RPC. This is called when the remote server responds to the RPC or + // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a + // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the + // response. HandleFailedRPC() is called to handle failed KRPC call. The RPC may be + // rescheduled if it's due to remote server being too busy. + void TransmitDataCompleteCb(); + + // Initializes the parameters for TransmitData() RPC and invokes the async RPC call. + // It will add 'tuple_offsets_' and 'tuple_data_' in 'rpc_in_flight_batch_' as sidecars + // to the RpcController and store the sidecars' indices to TransmitDataRequestPB sent as + // part of the RPC. Returns error status if adding sidecars to the RpcController failed. + Status DoTransmitDataRpc(); + + // A callback function called from KRPC reactor threads upon completion of an in-flight + // EndDataStream() RPC. This is called when the remote server responds to the RPC or + // when the RPC ends prematurely due to various reasons (e.g. cancellation). Upon a + // successful KRPC call, MarkDone() is called to update 'rpc_status_' based on the + // response. HandleFailedRPC() is called to handle failed KRPC calls. The RPC may be + // rescheduled if it's due to remote server being too busy. + void EndDataStreamCompleteCb(); + + // Initializes the parameters for EndDataStream() RPC and invokes the async RPC call. + Status DoEndDataStreamRpc(); + + // Marks the in-flight RPC as completed, updates 'rpc_status_' with the status of the + // RPC (indicated in parameter 'status') and notifies any thread waiting for RPC + // completion. Expects to be called with 'lock_' held. Called in the context of a + // reactor thread. + void MarkDone(const Status& status); +}; + +Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) { + // TODO: take into account of var-len data at runtime. + int capacity = + max(1, parent_->per_channel_buffer_size_ / max(row_desc_->GetRowSize(), 1)); + batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); + + // Create a DataStreamService proxy to the destination. + RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr(); + RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, &proxy_)); + return Status::OK(); +} + +void KrpcDataStreamSender::Channel::MarkDone(const Status& status) { + rpc_status_ = status; + rpc_in_flight_ = false; + rpc_in_flight_batch_ = nullptr; + rpc_done_cv_.notify_one(); +} + +Status KrpcDataStreamSender::Channel::WaitForRpc(std::unique_lock<SpinLock>* lock) { + DCHECK(lock != nullptr); + DCHECK(lock->owns_lock()); + + SCOPED_TIMER(parent_->state_->total_network_send_timer()); + + // Wait for in-flight RPCs to complete unless the parent sender is closed or cancelled. + while(rpc_in_flight_ && !ShouldTerminate()) { + rpc_done_cv_.wait_for(*lock, std::chrono::milliseconds(50)); + } + + if (UNLIKELY(ShouldTerminate())) { + // DSS is single-threaded so it's impossible for shutdown_ to be true here. + DCHECK(!shutdown_); + return Status::CANCELLED; + } + + DCHECK(!rpc_in_flight_); + if (UNLIKELY(!rpc_status_.ok())) { + LOG(ERROR) << "channel send status: " << rpc_status_.GetDetail(); + return rpc_status_; + } + return Status::OK(); +} + +void KrpcDataStreamSender::Channel::RetryCb( + DoRpcFn rpc_fn, const kudu::Status& cb_status) { + COUNTER_ADD(parent_->rpc_retry_counter_, 1); + std::unique_lock<SpinLock> l(lock_); + DCHECK(rpc_in_flight_); + // Aborted by KRPC layer as reactor thread was being shut down. + if (UNLIKELY(!cb_status.ok())) { + MarkDone(FromKuduStatus(cb_status, "KRPC retry failed")); + return; + } + // Parent datastream sender has been closed or cancelled. + if (UNLIKELY(ShouldTerminate())) { + MarkDone(Status::CANCELLED); + return; + } + // Retry the RPC. + Status status = rpc_fn(); + if (UNLIKELY(!status.ok())) { + MarkDone(status); + } +} + +void KrpcDataStreamSender::Channel::HandleFailedRPC(const DoRpcFn& rpc_fn, + const kudu::Status& controller_status, const string& prepend) { + // Retrying later if the destination is busy. We don't call ShouldTerminate() + // here as this is always checked in RetryCb() anyway. + // TODO: IMPALA-6159. Handle 'connection reset by peer' due to stale connections. + if (RpcMgr::IsServerTooBusy(rpc_controller_)) { + RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr(); + // RetryCb() is scheduled to be called in a reactor context. + rpc_mgr->messenger()->ScheduleOnReactor( + boost::bind(&KrpcDataStreamSender::Channel::RetryCb, this, rpc_fn, _1), + MonoDelta::FromMilliseconds(FLAGS_rpc_retry_interval_ms)); + return; + } + MarkDone(FromKuduStatus(controller_status, prepend)); +} + +void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() { + std::unique_lock<SpinLock> l(lock_); + DCHECK(rpc_in_flight_); + const kudu::Status controller_status = rpc_controller_.status(); + if (LIKELY(controller_status.ok())) { + Status rpc_status = Status::OK(); + int32_t status_code = resp_.status().status_code(); + if (LIKELY(status_code == TErrorCode::OK)) { + DCHECK(rpc_in_flight_batch_ != nullptr); + num_data_bytes_sent_ += RowBatch::GetSerializedSize(*rpc_in_flight_batch_); + VLOG_ROW << "incremented #data_bytes_sent=" << num_data_bytes_sent_; + } else if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) { + remote_recvr_closed_ = true; + } else { + rpc_status = Status(resp_.status()); + } + MarkDone(rpc_status); + } else { + DoRpcFn rpc_fn = + boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this); + const string& prepend = + Substitute("TransmitData() to $0 failed", TNetworkAddressToString(address_)); + HandleFailedRPC(rpc_fn, controller_status, prepend); + } +} + +Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() { + DCHECK(rpc_in_flight_batch_ != nullptr); + DCHECK(rpc_in_flight_batch_->IsInitialized()); + + // Initialize some constant fields in the request protobuf. + TransmitDataRequestPB req; + UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id(); + finstance_id_pb->set_lo(fragment_instance_id_.lo); + finstance_id_pb->set_hi(fragment_instance_id_.hi); + req.set_sender_id(parent_->sender_id_); + req.set_dest_node_id(dest_node_id_); + + // Set the RowBatchHeader in the request. + req.set_allocated_row_batch_header( + const_cast<RowBatchHeaderPB*>(rpc_in_flight_batch_->header())); + + rpc_controller_.Reset(); + int sidecar_idx; + // Add 'tuple_offsets_' as sidecar. + KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar(RpcSidecar::FromSlice( + rpc_in_flight_batch_->TupleOffsetsAsSlice()), &sidecar_idx), + "Unable to add tuple offsets to sidecar"); + req.set_tuple_offsets_sidecar_idx(sidecar_idx); + + // Add 'tuple_data_' as sidecar. + KUDU_RETURN_IF_ERROR(rpc_controller_.AddOutboundSidecar( + RpcSidecar::FromSlice(rpc_in_flight_batch_->TupleDataAsSlice()), &sidecar_idx), + "Unable to add tuple data to sidecar"); + req.set_tuple_data_sidecar_idx(sidecar_idx); + + resp_.Clear(); + proxy_->TransmitDataAsync(req, &resp_, &rpc_controller_, + boost::bind(&KrpcDataStreamSender::Channel::TransmitDataCompleteCb, this)); + // 'req' took ownership of 'header'. Need to release its ownership or 'header' will be + // deleted by destructor. + req.release_row_batch_header(); + return Status::OK(); +} + +Status KrpcDataStreamSender::Channel::TransmitData( + const OutboundRowBatch* outbound_batch) { + VLOG_ROW << "Channel::TransmitData() finst_id=" << fragment_instance_id_ + << " dest_node=" << dest_node_id_ + << " #rows=" << outbound_batch->header()->num_rows(); + std::unique_lock<SpinLock> l(lock_); + RETURN_IF_ERROR(WaitForRpc(&l)); + DCHECK(!rpc_in_flight_); + DCHECK(rpc_in_flight_batch_ == nullptr); + // If the remote receiver is closed already, there is no point in sending anything. + // TODO: Needs better solution for IMPALA-3990 in the long run. + if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); + rpc_in_flight_ = true; + rpc_in_flight_batch_ = outbound_batch; + RETURN_IF_ERROR(DoTransmitDataRpc()); + return Status::OK(); +} + +Status KrpcDataStreamSender::Channel::SerializeAndSendBatch(RowBatch* batch) { + OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_]; + DCHECK(outbound_batch != rpc_in_flight_batch_); + RETURN_IF_ERROR(parent_->SerializeBatch(batch, outbound_batch)); + RETURN_IF_ERROR(TransmitData(outbound_batch)); + next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES; + return Status::OK(); +} + +Status KrpcDataStreamSender::Channel::SendCurrentBatch() { + RETURN_IF_ERROR(SerializeAndSendBatch(batch_.get())); + batch_->Reset(); + return Status::OK(); +} + +Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) { + if (batch_->AtCapacity()) { + // batch_ is full, let's send it. + RETURN_IF_ERROR(SendCurrentBatch()); + } + TupleRow* dest = batch_->GetRow(batch_->AddRow()); + const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors(); + for (int i = 0; i < descs.size(); ++i) { + if (UNLIKELY(row->GetTuple(i) == nullptr)) { + dest->SetTuple(i, nullptr); + } else { + dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool())); + } + } + batch_->CommitLastRow(); + return Status::OK(); +} + +void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() { + std::unique_lock<SpinLock> l(lock_); + DCHECK(rpc_in_flight_); + const kudu::Status controller_status = rpc_controller_.status(); + if (LIKELY(controller_status.ok())) { + MarkDone(Status(eos_resp_.status())); + } else { + DoRpcFn rpc_fn = + boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this); + const string& prepend = + Substitute("EndDataStream() to $0 failed", TNetworkAddressToString(address_)); + HandleFailedRPC(rpc_fn, controller_status, prepend); + } +} + +Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() { + DCHECK(rpc_in_flight_); + EndDataStreamRequestPB eos_req; + rpc_controller_.Reset(); + UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id(); + finstance_id_pb->set_lo(fragment_instance_id_.lo); + finstance_id_pb->set_hi(fragment_instance_id_.hi); + eos_req.set_sender_id(parent_->sender_id_); + eos_req.set_dest_node_id(dest_node_id_); + eos_resp_.Clear(); + proxy_->EndDataStreamAsync(eos_req, &eos_resp_, &rpc_controller_, + boost::bind(&KrpcDataStreamSender::Channel::EndDataStreamCompleteCb, this)); + return Status::OK(); +} + +Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) { + VLOG_RPC << "Channel::FlushAndSendEos() instance_id=" << fragment_instance_id_ + << " dest_node=" << dest_node_id_ + << " #rows= " << batch_->num_rows(); + + // We can return an error here and not go on to send the EOS RPC because the error that + // we returned will be sent to the coordinator who will then cancel all the remote + // fragments including the one that this sender is sending to. + if (batch_->num_rows() > 0) RETURN_IF_ERROR(SendCurrentBatch()); + { + std::unique_lock<SpinLock> l(lock_); + RETURN_IF_ERROR(WaitForRpc(&l)); + DCHECK(!rpc_in_flight_); + if (UNLIKELY(remote_recvr_closed_)) return Status::OK(); + VLOG_RPC << "calling EndDataStream() to terminate channel."; + rpc_in_flight_ = true; + RETURN_IF_ERROR(DoEndDataStreamRpc()); + RETURN_IF_ERROR(WaitForRpc(&l)); + } + return Status::OK(); +} + +void KrpcDataStreamSender::Channel::Teardown(RuntimeState* state) { + // Normally, FlushAndSendEos() should have been called before calling Teardown(), + // which means that all the data should already be drained. If the fragment was + // was closed or cancelled, there may still be some in-flight RPCs and buffered + // row batches to be flushed. + std::unique_lock<SpinLock> l(lock_); + shutdown_ = true; + // Cancel any in-flight RPC. + if (rpc_in_flight_) { + rpc_controller_.Cancel(); + while (rpc_in_flight_) rpc_done_cv_.wait(l); + } + batch_.reset(); +} + +KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc, + const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size) + : DataSink(row_desc), + sender_id_(sender_id), + partition_type_(sink.output_partition.type), + per_channel_buffer_size_(per_channel_buffer_size), + dest_node_id_(sink.dest_node_id), + next_unknown_partition_(0) { + DCHECK_GT(destinations.size(), 0); + DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED + || sink.output_partition.type == TPartitionType::HASH_PARTITIONED + || sink.output_partition.type == TPartitionType::RANDOM + || sink.output_partition.type == TPartitionType::KUDU); + + for (int i = 0; i < destinations.size(); ++i) { + channels_.push_back( + new Channel(this, row_desc, destinations[i].krpc_server, + destinations[i].fragment_instance_id, sink.dest_node_id, + per_channel_buffer_size)); + } + + if (partition_type_ == TPartitionType::UNPARTITIONED || + partition_type_ == TPartitionType::RANDOM) { + // Randomize the order we open/transmit to channels to avoid thundering herd problems. + srand(reinterpret_cast<uint64_t>(this)); + random_shuffle(channels_.begin(), channels_.end()); + } +} + +string KrpcDataStreamSender::GetName() { + return Substitute("KrpcDataStreamSender (dst_id=$0)", dest_node_id_); +} + +KrpcDataStreamSender::~KrpcDataStreamSender() { + // TODO: check that sender was either already closed() or there was an error + // on some channel + for (int i = 0; i < channels_.size(); ++i) { + delete channels_[i]; + } +} + +Status KrpcDataStreamSender::Init(const vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) { + DCHECK(tsink.__isset.stream_sink); + if (partition_type_ == TPartitionType::HASH_PARTITIONED || + partition_type_ == TPartitionType::KUDU) { + RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs, + *row_desc_, state, &partition_exprs_)); + } + return Status::OK(); +} + +Status KrpcDataStreamSender::Prepare( + RuntimeState* state, MemTracker* parent_mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); + state_ = state; + SCOPED_TIMER(profile_->total_time_counter()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state, + state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(), + &partition_expr_evals_)); + serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime"); + rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT); + bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); + uncompressed_bytes_counter_ = + ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); + total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT); + overall_throughput_ = + profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND, + bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_, + profile()->total_time_counter())); + for (int i = 0; i < channels_.size(); ++i) { + RETURN_IF_ERROR(channels_[i]->Init(state)); + } + return Status::OK(); +} + +Status KrpcDataStreamSender::Open(RuntimeState* state) { + return ScalarExprEvaluator::Open(partition_expr_evals_, state); +} + +Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) { + DCHECK(!closed_); + DCHECK(!flushed_); + + if (batch->num_rows() == 0) return Status::OK(); + if (partition_type_ == TPartitionType::UNPARTITIONED) { + OutboundRowBatch* outbound_batch = &outbound_batches_[next_batch_idx_]; + RETURN_IF_ERROR(SerializeBatch(batch, outbound_batch)); + // TransmitData() will block if there are still in-flight rpcs (and those will + // reference the previously written serialized batch). + for (int i = 0; i < channels_.size(); ++i) { + RETURN_IF_ERROR(channels_[i]->TransmitData(outbound_batch)); + } + next_batch_idx_ = (next_batch_idx_ + 1) % NUM_OUTBOUND_BATCHES; + } else if (partition_type_ == TPartitionType::RANDOM || channels_.size() == 1) { + // Round-robin batches among channels. Wait for the current channel to finish its + // rpc before overwriting its batch. + Channel* current_channel = channels_[current_channel_idx_]; + RETURN_IF_ERROR(current_channel->SerializeAndSendBatch(batch)); + current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size(); + } else if (partition_type_ == TPartitionType::KUDU) { + DCHECK_EQ(partition_expr_evals_.size(), 1); + int num_channels = channels_.size(); + for (int i = 0; i < batch->num_rows(); ++i) { + TupleRow* row = batch->GetRow(i); + int32_t partition = + *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row)); + if (partition < 0) { + // This row doesn't correspond to a partition, e.g. it's outside the given ranges. + partition = next_unknown_partition_; + ++next_unknown_partition_; + } + RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row)); + } + } else { + DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED); + // hash-partition batch's rows across channels + // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case + // once we have codegen here. + int num_channels = channels_.size(); + for (int i = 0; i < batch->num_rows(); ++i) { + TupleRow* row = batch->GetRow(i); + uint64_t hash_val = EXCHANGE_HASH_SEED; + for (int j = 0; j < partition_exprs_.size(); ++j) { + ScalarExprEvaluator* eval = partition_expr_evals_[j]; + void* partition_val = eval->GetValue(row); + // We can't use the crc hash function here because it does not result in + // uncorrelated hashes with different seeds. Instead we use FastHash. + // TODO: fix crc hash/GetHashValue() + DCHECK(&(eval->root()) == partition_exprs_[j]); + hash_val = RawValue::GetHashValueFastHash( + partition_val, partition_exprs_[j]->type(), hash_val); + } + RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row)); + } + } + COUNTER_ADD(total_sent_rows_counter_, batch->num_rows()); + expr_results_pool_->Clear(); + RETURN_IF_ERROR(state->CheckQueryState()); + return Status::OK(); +} + +Status KrpcDataStreamSender::FlushFinal(RuntimeState* state) { + DCHECK(!flushed_); + DCHECK(!closed_); + flushed_ = true; + for (int i = 0; i < channels_.size(); ++i) { + // If we hit an error here, we can return without closing the remaining channels as + // the error is propagated back to the coordinator, which in turn cancels the query, + // which will cause the remaining open channels to be closed. + RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state)); + } + return Status::OK(); +} + +void KrpcDataStreamSender::Close(RuntimeState* state) { + if (closed_) return; + for (int i = 0; i < channels_.size(); ++i) { + channels_[i]->Teardown(state); + } + ScalarExprEvaluator::Close(partition_expr_evals_, state); + ScalarExpr::Close(partition_exprs_); + DataSink::Close(state); + closed_ = true; +} + +Status KrpcDataStreamSender::SerializeBatch( + RowBatch* src, OutboundRowBatch* dest, int num_receivers) { + VLOG_ROW << "serializing " << src->num_rows() << " rows"; + { + SCOPED_TIMER(profile_->total_time_counter()); + SCOPED_TIMER(serialize_batch_timer_); + RETURN_IF_ERROR(src->Serialize(dest)); + int64_t bytes = RowBatch::GetSerializedSize(*dest); + int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest); + COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers); + COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers); + } + return Status::OK(); +} + +int64_t KrpcDataStreamSender::GetNumDataBytesSent() const { + // TODO: do we need synchronization here or are reads & writes to 8-byte ints + // atomic? + int64_t result = 0; + for (int i = 0; i < channels_.size(); ++i) { + result += channels_[i]->num_data_bytes_sent(); + } + return result; +} + +} // namespace impala + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/krpc-data-stream-sender.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h new file mode 100644 index 0000000..1a8b30f --- /dev/null +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H +#define IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H + +#include <vector> +#include <string> + +#include "exec/data-sink.h" +#include "common/global-types.h" +#include "common/object-pool.h" +#include "common/status.h" +#include "runtime/row-batch.h" +#include "util/runtime-profile.h" + +namespace impala { + +class RowDescriptor; +class MemTracker; +class TDataStreamSink; +class TNetworkAddress; +class TPlanFragmentDestination; + +/// Single sender of an m:n data stream. +/// +/// Row batch data is routed to destinations based on the provided partitioning +/// specification. +/// *Not* thread-safe. +/// +/// TODO: capture stats that describe distribution of rows/data volume +/// across channels. +/// TODO: create a PlanNode equivalent class for DataSink. +class KrpcDataStreamSender : public DataSink { + public: + /// Construct a sender according to the output specification (tsink), sending to the + /// given destinations: + /// 'sender_id' identifies this sender instance, and is unique within a fragment. + /// 'row_desc' is the descriptor of the tuple row. It must out-live the sink. + /// 'destinations' are the receivers' network addresses. There is one channel for each + /// destination. + /// 'per_channel_buffer_size' is the soft limit in bytes of the buffering into the + /// per-channel's accumulating row batch before it will be sent. + /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED, + /// and RANDOM. + KrpcDataStreamSender(int sender_id, const RowDescriptor* row_desc, + const TDataStreamSink& tsink, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size); + + virtual ~KrpcDataStreamSender(); + + virtual std::string GetName(); + + /// Initialize the sender by initializing all the channels and allocates all + /// the stat counters. Return error status if any channels failed to initialize. + virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); + + /// Initialize the evaluator of the partitioning expressions. Return error status + /// if initialization failed. + virtual Status Open(RuntimeState* state); + + /// Flush all buffered data and close all existing channels to destination hosts. + /// Further Send() calls are illegal after calling FlushFinal(). It is legal to call + /// FlushFinal() no more than once. Return error status if Send() failed or the end + /// of stream call failed. + virtual Status FlushFinal(RuntimeState* state); + + /// Send data in 'batch' to destination nodes according to partitioning + /// specification provided in c'tor. + /// Blocks until all rows in batch are placed in their appropriate outgoing + /// buffers (ie, blocks if there are still in-flight rpcs from the last + /// Send() call). + virtual Status Send(RuntimeState* state, RowBatch* batch); + + /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are + /// illegal after calling Close(). + virtual void Close(RuntimeState* state); + + protected: + friend class DataStreamTest; + + /// Initialize any partitioning expressions based on 'thrift_output_exprs' and stores + /// them in 'partition_exprs_'. Returns error status if the initialization failed. + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state); + + /// Return total number of bytes sent. If batches are broadcast to multiple receivers, + /// they are counted once per receiver. + int64_t GetNumDataBytesSent() const; + + private: + class Channel; + + /// Serializes the src batch into the serialized row batch 'dest' and updates + /// various stat counters. + /// 'num_receivers' is the number of receivers this batch will be sent to. Used for + /// updating the stat counters. + Status SerializeBatch(RowBatch* src, OutboundRowBatch* dest, int num_receivers = 1); + + /// Sender instance id, unique within a fragment. + const int sender_id_; + + /// The type of partitioning to perform. + const TPartitionType::type partition_type_; + + /// Amount of per-channel buffering for rows before sending them to the destination. + const int per_channel_buffer_size_; + + /// RuntimeState of the fragment instance. + RuntimeState* state_ = nullptr; + + /// Index of the current channel to send to if random_ == true. + int current_channel_idx_ = 0; + + /// Index of the next OutboundRowBatch to use for serialization. + int next_batch_idx_ = 0; + + /// The outbound row batches are double-buffered so that we can serialize the next + /// batch while the other is still referenced by the in-flight RPC. Each entry contains + /// a RowBatchHeaderPB and buffers for the serialized tuple offsets and data. Used only + /// when the partitioning strategy is UNPARTITIONED. + static const int NUM_OUTBOUND_BATCHES = 2; + OutboundRowBatch outbound_batches_[NUM_OUTBOUND_BATCHES]; + + /// If true, this sender has called FlushFinal() successfully. + /// Not valid to call Send() anymore. + bool flushed_ = false; + + /// If true, this sender has been closed. Not valid to call Send() anymore. + bool closed_ = false; + + /// List of all channels. One for each destination. + std::vector<Channel*> channels_; + + /// Expressions of partition keys. It's used to compute the + /// per-row partition values for shuffling exchange; + std::vector<ScalarExpr*> partition_exprs_; + std::vector<ScalarExprEvaluator*> partition_expr_evals_; + + /// Time for serializing row batches. + RuntimeProfile::Counter* serialize_batch_timer_ = nullptr; + + /// Number of TransmitData() RPC retries due to remote service being busy. + RuntimeProfile::Counter* rpc_retry_counter_ = nullptr; + + /// Total number of bytes sent. + RuntimeProfile::Counter* bytes_sent_counter_ = nullptr; + + /// Total number of bytes of the row batches before compression. + RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr; + + /// Total number of rows sent. + RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr; + + /// Throughput per total time spent in sender + RuntimeProfile::Counter* overall_throughput_ = nullptr; + + /// Identifier of the destination plan node. + PlanNodeId dest_node_id_; + + /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition + /// or when errors are encountered. + int next_unknown_partition_; + + /// An arbitrary hash seed used for exchanges. + static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37; +}; + +} // namespace impala + +#endif // IMPALA_RUNTIME_KRPC_DATA_STREAM_SENDER_H
