Repository: incubator-impala Updated Branches: refs/heads/master a772f8456 -> b4ea57a7e
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index a6e935a..cd8c936 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -21,7 +21,6 @@ #include <memory> #include <boost/scoped_ptr.hpp> -#include "gen-cpp/Results_types.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/string-value.h" @@ -32,6 +31,9 @@ #include "util/fixed-size-hash-table.h" #include "util/scope-exit-trigger.h" +#include "gen-cpp/Results_types.h" +#include "gen-cpp/row_batch.pb.h" + #include "common/names.h" namespace impala { @@ -76,21 +78,56 @@ RowBatch::RowBatch( tuple_data_pool_(mem_tracker), row_desc_(row_desc), mem_tracker_(mem_tracker) { - DCHECK(mem_tracker_ != NULL); - tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*); - DCHECK_EQ(input_batch.row_tuples.size(), row_desc->tuple_descriptors().size()); - DCHECK_GT(tuple_ptrs_size_, 0); + DCHECK(mem_tracker_ != nullptr); + kudu::Slice tuple_data = + kudu::Slice(input_batch.tuple_data.c_str(), input_batch.tuple_data.size()); + kudu::Slice tuple_offsets = kudu::Slice( + reinterpret_cast<const char*>(input_batch.tuple_offsets.data()), + input_batch.tuple_offsets.size() * sizeof(int32_t)); + const THdfsCompression::type& compression_type = input_batch.compression_type; + DCHECK(compression_type == THdfsCompression::NONE || + compression_type == THdfsCompression::LZ4) + << "Unexpected compression type: " << input_batch.compression_type; + Deserialize(tuple_offsets, tuple_data, input_batch.uncompressed_size, + compression_type == THdfsCompression::LZ4); +} + +RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header, + const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data, + MemTracker* mem_tracker) + : num_rows_(header.num_rows()), + capacity_(header.num_rows()), + flush_(FlushMode::NO_FLUSH_RESOURCES), + needs_deep_copy_(false), + num_tuples_per_row_(header.num_tuples_per_row()), + attached_buffer_bytes_(0), + tuple_data_pool_(mem_tracker), + row_desc_(row_desc), + mem_tracker_(mem_tracker) { + DCHECK(mem_tracker_ != nullptr); + const CompressionType& compression_type = header.compression_type(); + DCHECK(compression_type == CompressionType::NONE || + compression_type == CompressionType::LZ4) + << "Unexpected compression type: " << compression_type; + Deserialize(tuple_offsets, tuple_data, header.uncompressed_size(), + compression_type == CompressionType::LZ4); +} + +void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets, + const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed) { // TODO: switch to Init() pattern so we can check memory limit and return Status. + DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size()); + tuple_ptrs_size_ = num_rows_ * num_tuples_per_row_ * sizeof(Tuple*); + DCHECK_GT(tuple_ptrs_size_, 0); mem_tracker_->Consume(tuple_ptrs_size_); tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_)); - DCHECK(tuple_ptrs_ != NULL); + DCHECK(tuple_ptrs_ != nullptr); + uint8_t* tuple_data; - if (input_batch.compression_type != THdfsCompression::NONE) { - DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type) - << "Unexpected compression type: " << input_batch.compression_type; + if (is_compressed) { // Decompress tuple data into data pool - uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str(); - size_t compressed_size = input_batch.tuple_data.size(); + const uint8_t* compressed_data = input_tuple_data.data(); + size_t compressed_size = input_tuple_data.size(); Lz4Decompressor decompressor(nullptr, false); Status status = decompressor.Init(); @@ -98,7 +135,6 @@ RowBatch::RowBatch( auto compressor_cleanup = MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); }); - int64_t uncompressed_size = input_batch.uncompressed_size; DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed"; tuple_data = tuple_data_pool_.Allocate(uncompressed_size); status = decompressor.ProcessBlock( @@ -106,18 +142,21 @@ RowBatch::RowBatch( DCHECK(status.ok()) << "RowBatch decompression failed."; } else { // Tuple data uncompressed, copy directly into data pool - tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size()); - memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size()); + tuple_data = tuple_data_pool_.Allocate(input_tuple_data.size()); + memcpy(tuple_data, input_tuple_data.data(), input_tuple_data.size()); } // Convert input_batch.tuple_offsets into pointers - int tuple_idx = 0; - for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin(); - offset != input_batch.tuple_offsets.end(); ++offset) { - if (*offset == -1) { - tuple_ptrs_[tuple_idx++] = NULL; + const int32_t* tuple_offsets = + reinterpret_cast<const int32_t*>(input_tuple_offsets.data()); + DCHECK_EQ(input_tuple_offsets.size() % sizeof(int32_t), 0); + int num_tuples = input_tuple_offsets.size() / sizeof(int32_t); + for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) { + int32_t offset = tuple_offsets[tuple_idx]; + if (offset == -1) { + tuple_ptrs_[tuple_idx] = nullptr; } else { - tuple_ptrs_[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset); + tuple_ptrs_[tuple_idx] = reinterpret_cast<Tuple*>(tuple_data + offset); } } @@ -126,9 +165,9 @@ RowBatch::RowBatch( // For every unique tuple, convert string offsets contained in tuple data into // pointers. Tuples were serialized in the order we are deserializing them in, - // so the first occurrence of a tuple will always have a higher offset than any tuple - // we already converted. - Tuple* last_converted = NULL; + // so the first occurrence of a tuple will always have a higher offset than any + // tuple we already converted. + Tuple* last_converted = nullptr; for (int i = 0; i < num_rows_; ++i) { for (int j = 0; j < num_tuples_per_row_; ++j) { const TupleDescriptor* desc = row_desc_->tuple_descriptors()[j]; @@ -148,10 +187,10 @@ RowBatch::~RowBatch() { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); } - DCHECK(tuple_ptrs_ != NULL); + DCHECK(tuple_ptrs_ != nullptr); free(tuple_ptrs_); mem_tracker_->Release(tuple_ptrs_size_); - tuple_ptrs_ = NULL; + tuple_ptrs_ = nullptr; } Status RowBatch::Serialize(TRowBatch* output_batch) { @@ -162,11 +201,41 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) { // why does Thrift not generate a Clear() function? output_batch->row_tuples.clear(); output_batch->tuple_offsets.clear(); - output_batch->compression_type = THdfsCompression::NONE; - - output_batch->num_rows = num_rows_; + int64_t uncompressed_size; + bool is_compressed; + RETURN_IF_ERROR(Serialize(full_dedup, &output_batch->tuple_offsets, + &output_batch->tuple_data, &uncompressed_size, &is_compressed)); + // TODO: max_size() is much larger than the amount of memory we could feasibly + // allocate. Need better way to detect problem. + DCHECK_LE(uncompressed_size, output_batch->tuple_data.max_size()); + output_batch->__set_num_rows(num_rows_); + output_batch->__set_uncompressed_size(uncompressed_size); + output_batch->__set_compression_type( + is_compressed ? THdfsCompression::LZ4 : THdfsCompression::NONE); row_desc_->ToThrift(&output_batch->row_tuples); + return Status::OK(); +} +Status RowBatch::Serialize(OutboundRowBatch* output_batch) { + int64_t uncompressed_size; + bool is_compressed; + output_batch->tuple_offsets_.clear(); + RETURN_IF_ERROR(Serialize(UseFullDedup(), &output_batch->tuple_offsets_, + &output_batch->tuple_data_, &uncompressed_size, &is_compressed)); + + // Initialize the RowBatchHeaderPB + RowBatchHeaderPB* header = &output_batch->header_; + header->Clear(); + header->set_num_rows(num_rows_); + header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size()); + header->set_uncompressed_size(uncompressed_size); + header->set_compression_type( + is_compressed ? CompressionType::LZ4 : CompressionType::NONE); + return Status::OK(); +} + +Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, + string* tuple_data, int64_t* uncompressed_size, bool* is_compressed) { // As part of the serialization process we deduplicate tuples to avoid serializing a // Tuple multiple times for the RowBatch. By default we only detect duplicate tuples // in adjacent rows only. If full deduplication is enabled, we will build a @@ -179,11 +248,13 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) { RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0)); size = TotalByteSize(&distinct_tuples); distinct_tuples.Clear(); // Reuse allocated hash table. - SerializeInternal(size, &distinct_tuples, output_batch); + SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data); } else { - size = TotalByteSize(NULL); - SerializeInternal(size, NULL, output_batch); + size = TotalByteSize(nullptr); + SerializeInternal(size, nullptr, tuple_offsets, tuple_data); } + *uncompressed_size = size; + *is_compressed = false; if (size > 0) { // Try compressing tuple_data to compression_scratch_, swap if compressed data is @@ -197,15 +268,14 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) { if (compression_scratch_.size() < compressed_size) { compression_scratch_.resize(compressed_size); } - uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str(); + uint8_t* input = (uint8_t*)tuple_data->c_str(); uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str(); RETURN_IF_ERROR( compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output)); - if (LIKELY(compressed_size < size)) { compression_scratch_.resize(compressed_size); - output_batch->tuple_data.swap(compression_scratch_); - output_batch->compression_type = THdfsCompression::LZ4; + tuple_data->swap(compression_scratch_); + *is_compressed = true; } VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; } @@ -227,54 +297,52 @@ bool RowBatch::UseFullDedup() { } void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples, - TRowBatch* output_batch) { - DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0); + vector<int32_t>* tuple_offsets, string* tuple_data_str) { + DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0); // TODO: max_size() is much larger than the amount of memory we could feasibly // allocate. Need better way to detect problem. - DCHECK_LE(size, output_batch->tuple_data.max_size()); + DCHECK_LE(size, tuple_data_str->max_size()); // TODO: track memory usage // TODO: detect if serialized size is too large to allocate and return proper error. - output_batch->tuple_data.resize(size); - output_batch->uncompressed_size = size; - output_batch->tuple_offsets.reserve(num_rows_ * num_tuples_per_row_); + tuple_data_str->resize(size); + tuple_offsets->reserve(num_rows_ * num_tuples_per_row_); // Copy tuple data of unique tuples, including strings, into output_batch (converting // string pointers into offsets in the process). int offset = 0; // current offset into output_batch->tuple_data - char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str()); + char* tuple_data = const_cast<char*>(tuple_data_str->c_str()); for (int i = 0; i < num_rows_; ++i) { vector<TupleDescriptor*>::const_iterator desc = row_desc_->tuple_descriptors().begin(); for (int j = 0; desc != row_desc_->tuple_descriptors().end(); ++desc, ++j) { Tuple* tuple = GetRow(i)->GetTuple(j); - if (UNLIKELY(tuple == NULL)) { + if (UNLIKELY(tuple == nullptr)) { // NULLs are encoded as -1 - output_batch->tuple_offsets.push_back(-1); + tuple_offsets->push_back(-1); continue; } else if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) { // Fast tuple deduplication for adjacent rows. - int prev_row_idx = output_batch->tuple_offsets.size() - num_tuples_per_row_; - output_batch->tuple_offsets.push_back( - output_batch->tuple_offsets[prev_row_idx]); + int prev_row_idx = tuple_offsets->size() - num_tuples_per_row_; + tuple_offsets->push_back((*tuple_offsets)[prev_row_idx]); continue; - } else if (UNLIKELY(distinct_tuples != NULL)) { + } else if (UNLIKELY(distinct_tuples != nullptr)) { if ((*desc)->byte_size() == 0) { - // Zero-length tuples can be represented as NULL. - output_batch->tuple_offsets.push_back(-1); + // Zero-length tuples can be represented as nullptr. + tuple_offsets->push_back(-1); continue; } int* dedupd_offset = distinct_tuples->FindOrInsert(tuple, offset); if (*dedupd_offset != offset) { // Repeat of tuple DCHECK_GE(*dedupd_offset, 0); - output_batch->tuple_offsets.push_back(*dedupd_offset); + tuple_offsets->push_back(*dedupd_offset); continue; } } // Record offset before creating copy (which increments offset and tuple_data) - output_batch->tuple_offsets.push_back(offset); + tuple_offsets->push_back(offset); tuple->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true); DCHECK_LE(offset, size); } @@ -323,10 +391,7 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) { } int64_t RowBatch::GetDeserializedSize(const TRowBatch& batch) { - int64_t result = batch.uncompressed_size; - result += batch.row_tuples.size() * sizeof(TTupleId); - result += batch.tuple_offsets.size() * sizeof(int32_t); - return result; + return batch.uncompressed_size + batch.tuple_offsets.size() * sizeof(Tuple*); } int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) { @@ -336,6 +401,21 @@ int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) { return result; } +int64_t RowBatch::GetDeserializedSize(const RowBatchHeaderPB& header, + const kudu::Slice& tuple_offsets) { + DCHECK_EQ(tuple_offsets.size() % sizeof(int32_t), 0); + return header.uncompressed_size() + + (tuple_offsets.size() / sizeof(int32_t)) * sizeof(Tuple*); +} + +int64_t RowBatch::GetDeserializedSize(const OutboundRowBatch& batch) { + return batch.header_.uncompressed_size() + batch.tuple_offsets_.size() * sizeof(Tuple*); +} + +int64_t RowBatch::GetSerializedSize(const OutboundRowBatch& batch) { + return batch.tuple_data_.size() + batch.tuple_offsets_.size() * sizeof(int32_t); +} + void RowBatch::AcquireState(RowBatch* src) { DCHECK(row_desc_->LayoutEquals(*src->row_desc_)); DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_); @@ -370,7 +450,7 @@ void RowBatch::DeepCopyTo(RowBatch* dst) { // TODO: consider computing size of batches as they are built up int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { - DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0); + DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0); int64_t result = 0; vector<int> tuple_count(row_desc_->tuple_descriptors().size(), 0); @@ -378,12 +458,12 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) { for (int i = 0; i < num_rows_; ++i) { for (int j = 0; j < num_tuples_per_row_; ++j) { Tuple* tuple = GetRow(i)->GetTuple(j); - if (UNLIKELY(tuple == NULL)) continue; + if (UNLIKELY(tuple == nullptr)) continue; // Only count the data of unique tuples. if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) { // Fast tuple deduplication for adjacent rows. continue; - } else if (UNLIKELY(distinct_tuples != NULL)) { + } else if (UNLIKELY(distinct_tuples != nullptr)) { if (row_desc_->tuple_descriptors()[j]->byte_size() == 0) continue; bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1); if (!inserted) continue; @@ -413,7 +493,7 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* pool } *buffer_size = static_cast<int64_t>(row_size) * *capacity; *buffer = pool->TryAllocate(*buffer_size); - if (*buffer == NULL) { + if (*buffer == nullptr) { return pool->mem_tracker()->MemLimitExceeded( state, "Failed to allocate tuple buffer", *buffer_size); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 5a7edd4..d246024 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -25,11 +25,17 @@ #include "codegen/impala-ir.h" #include "common/compiler-util.h" #include "common/logging.h" +#include "gen-cpp/row_batch.pb.h" +#include "kudu/util/slice.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/descriptors.h" #include "runtime/disk-io-mgr.h" #include "runtime/mem-pool.h" +namespace kudu { +class Slice; +} // namespace kudu + namespace impala { template <typename K, typename V> class FixedSizeHashTable; @@ -41,6 +47,47 @@ class Tuple; class TupleRow; class TupleDescriptor; +/// A KRPC outbound row batch which contains the serialized row batch header and buffers +/// for holding the tuple offsets and tuple data. +class OutboundRowBatch { + public: + const RowBatchHeaderPB* header() const { return &header_; } + + /// Returns the serialized tuple offsets' vector as a kudu::Slice. + /// The tuple offsets vector is sent as KRPC sidecar. + kudu::Slice TupleOffsetsAsSlice() const { + return kudu::Slice((uint8_t*)tuple_offsets_.data(), + tuple_offsets_.size() * sizeof(tuple_offsets_[0])); + } + + /// Returns the serialized tuple data's buffer as a kudu::Slice. + /// The tuple data is sent as KRPC sidecar. + kudu::Slice TupleDataAsSlice() const { + return kudu::Slice((uint8_t*)tuple_data_.data(), tuple_data_.length()); + } + + /// Returns true if the header has been intialized and ready to be sent. + /// This entails setting some fields initialized in RowBatch::Serialize(). + bool IsInitialized() const { + return header_.has_num_rows() && header_.has_uncompressed_size() && + header_.has_compression_type(); + } + + private: + friend class RowBatch; + + /// The serialized header which contains the meta-data of the row batch such as the + /// number of rows and compression scheme used etc. + RowBatchHeaderPB header_; + + /// Contains offsets into 'tuple_data_' of all tuples in a row batch. -1 refers to + /// a NULL tuple. + vector<int32_t> tuple_offsets_; + + /// Contains the actual data of all the tuples. The data could be compressed. + string tuple_data_; +}; + /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples. /// The maximum number of rows is fixed at the time of construction. /// The row batch can reference various types of memory. @@ -55,10 +102,10 @@ class TupleDescriptor; /// used. /// TODO: IMPALA-4179: simplify the ownership transfer model. /// -/// In order to minimize memory allocations, RowBatches and TRowBatches that have been -/// serialized and sent over the wire should be reused (this prevents compression_scratch_ -/// from being needlessly reallocated). -// +/// In order to minimize memory allocations, RowBatches and TRowBatches or +/// OutboundRowBatch that have been serialized and sent over the wire should be reused +/// (this prevents compression_scratch_ from being needlessly reallocated). +/// /// Row batches and memory usage: We attempt to stream row batches through the plan /// tree without copying the data. This means that row batches are often not-compact /// and reference memory outside of the row batch. This results in most row batches @@ -88,13 +135,21 @@ class RowBatch { /// tracker cannot be NULL. RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* tracker); - /// Populate a row batch from input_batch by copying input_batch's - /// tuple_data into the row batch's mempool and converting all offsets - /// in the data back into pointers. + /// Populate a row batch from a serialized thrift input_batch by copying + /// input_batch's tuple_data into the row batch's mempool and converting all + /// offsets in the data back into pointers. /// TODO: figure out how to transfer the data from input_batch to this RowBatch /// (so that we don't need to make yet another copy) - RowBatch( - const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* tracker); + RowBatch(const RowDescriptor* row_desc, const TRowBatch& input_batch, + MemTracker* tracker); + + /// Populate a row batch from the serialized row batch header, decompress / copy + /// the tuple's data into a buffer and convert all offsets in 'tuple_offsets' back + /// into pointers into the tuple data's buffer. The tuple data's buffer is allocated + /// from the row batch's MemPool tracked by 'mem_tracker'. + RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header, + const kudu::Slice& input_tuple_data, const kudu::Slice& input_tuple_offsets, + MemTracker* mem_tracker); /// Releases all resources accumulated at this row batch. This includes /// - tuple_ptrs @@ -288,12 +343,13 @@ class RowBatch { void DeepCopyTo(RowBatch* dst); /// Create a serialized version of this row batch in output_batch, attaching all of the - /// data it references to output_batch.tuple_data. This function attempts to - /// detect duplicate tuples in the row batch to reduce the serialized size. - /// output_batch.tuple_data will be snappy-compressed unless the compressed data is - /// larger than the uncompressed data. Use output_batch.is_compressed to determine + /// data it references to output_batch.tuple_data. This function attempts to detect + /// duplicate tuples in the row batch to reduce the serialized size. + /// output_batch.tuple_data will be LZ4-compressed unless the compressed data is larger + /// larger than the uncompressed data. Use output_batch.compression_type to determine /// whether tuple_data is compressed. If an in-flight row is present in this row batch, /// it is ignored. This function does not Reset(). + Status Serialize(OutboundRowBatch* output_batch); Status Serialize(TRowBatch* output_batch); /// Utility function: returns total byte size of a batch in either serialized or @@ -301,6 +357,10 @@ class RowBatch { /// less than the deserialized size. static int64_t GetSerializedSize(const TRowBatch& batch); static int64_t GetDeserializedSize(const TRowBatch& batch); + static int64_t GetSerializedSize(const OutboundRowBatch& batch); + static int64_t GetDeserializedSize(const OutboundRowBatch& batch); + static int64_t GetDeserializedSize(const RowBatchHeaderPB& header, + const kudu::Slice& tuple_offsets); int ALWAYS_INLINE num_rows() const { return num_rows_; } int ALWAYS_INLINE capacity() const { return capacity_; } @@ -353,6 +413,38 @@ class RowBatch { /// Overload for testing that allows the test to force the deduplication level. Status Serialize(TRowBatch* output_batch, bool full_dedup); + /// Shared implementation between thrift and protobuf to serialize this row batch. + /// + /// 'full_dedup': true if full deduplication is used. + /// 'tuple_offsets': Updated to contain offsets of all tuples into 'tuple_data' upon + /// return. There are a total of num_rows * num_tuples_per_row offsets. + /// An offset of -1 records a NULL. + /// 'tuple_data': Updated to hold the serialized tuples' data. If 'is_compressed' + /// is true, this is LZ4 compressed. + /// 'uncompressed_size': Updated with the uncompressed size of 'tuple_data'. + /// 'is_compressed': true if compression is applied on 'tuple_data'. + /// + /// Returns error status if serialization failed. Returns OK otherwise. + /// TODO: clean this up once the thrift RPC implementation is removed. + Status Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, string* tuple_data, + int64_t* uncompressed_size, bool* is_compressed); + + /// Shared implementation between thrift and protobuf to deserialize a row batch. + /// + /// 'input_tuple_offsets': an int32_t array of tuples; offsets into 'input_tuple_data'. + /// Used for populating the tuples in the row batch with actual pointers. + /// + /// 'input_tuple_data': contains pointer and size of tuples' data buffer. + /// If 'is_compressed' is true, the data is compressed. + /// + /// 'uncompressed_size': the uncompressed size of 'input_tuple_data' if it's compressed. + /// + /// 'is_compressed': True if 'input_tuple_data' is compressed. + /// + /// TODO: clean this up once the thrift RPC implementation is removed. + void Deserialize(const kudu::Slice& input_tuple_offsets, + const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed); + typedef FixedSizeHashTable<Tuple*, int> DedupMap; /// The total size of all data represented in this row batch (tuples and referenced @@ -363,7 +455,7 @@ class RowBatch { int64_t TotalByteSize(DedupMap* distinct_tuples); void SerializeInternal(int64_t size, DedupMap* distinct_tuples, - TRowBatch* output_batch); + vector<int32_t>* tuple_offsets, string* tuple_data); /// All members below need to be handled in RowBatch::AcquireState() @@ -419,10 +511,11 @@ class RowBatch { std::vector<BufferInfo> buffers_; /// String to write compressed tuple data to in Serialize(). - /// This is a string so we can swap() with the string in the TRowBatch we're serializing - /// to (we don't compress directly into the TRowBatch in case the compressed data is - /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch - /// and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and + /// This is a string so we can swap() with the string in the serialized row batch + /// (i.e. TRowBatch or OutboundRowBatch) we're serializing to (we don't compress + /// directly into the serialized row batch in case the compressed data is longer than + /// the uncompressed data). Swapping avoids copying data to the serialized row batch + /// and avoids excess memory allocations: since we reuse the serialized row batches, and /// assuming all row batches are roughly the same size, all strings will eventually be /// allocated to the right size. std::string compression_scratch_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index ab51740..c78116c 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -21,20 +21,25 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service") # where to put generated binaries. set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service") +# Mark the protobuf file as generated +set_source_files_properties(${DATA_STREAM_SVC_PROTO_SRCS} PROPERTIES GENERATED TRUE) + add_library(Service + child-query.cc + client-request-state.cc + ${DATA_STREAM_SVC_PROTO_SRCS} + data-stream-service.cc frontend.cc fe-support.cc hs2-util.cc - impala-server.cc - impala-http-handler.cc - impala-hs2-server.cc impala-beeswax-server.cc + impala-hs2-server.cc + impala-http-handler.cc impala-internal-service.cc - client-request-state.cc + impalad-main.cc + impala-server.cc query-options.cc query-result-set.cc - child-query.cc - impalad-main.cc ) add_dependencies(Service gen-deps) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/data-stream-service.cc ---------------------------------------------------------------------- diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc new file mode 100644 index 0000000..dcf0c1f --- /dev/null +++ b/be/src/service/data-stream-service.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/data-stream-service.h" + +#include "common/status.h" +#include "exec/kudu-util.h" +#include "kudu/rpc/rpc_context.h" +#include "rpc/rpc-mgr.h" +#include "runtime/krpc-data-stream-mgr.h" +#include "runtime/exec-env.h" +#include "runtime/row-batch.h" +#include "testutil/fault-injection-util.h" + +#include "gen-cpp/data_stream_service.pb.h" + +#include "common/names.h" + +using kudu::rpc::RpcContext; + +namespace impala { + +DataStreamService::DataStreamService(RpcMgr* mgr) + : DataStreamServiceIf(mgr->metric_entity(), mgr->result_tracker()) {} + +void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request, + EndDataStreamResponsePB* response, RpcContext* rpc_context) { + // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here. + ExecEnv::GetInstance()->KrpcStreamMgr()->CloseSender(request, response, rpc_context); +} + +void DataStreamService::TransmitData(const TransmitDataRequestPB* request, + TransmitDataResponsePB* response, RpcContext* rpc_context) { + FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA); + // AddData() is guaranteed to eventually respond to this RPC so we don't do it here. + ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, rpc_context); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/data-stream-service.h ---------------------------------------------------------------------- diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h new file mode 100644 index 0000000..7f3c6e4 --- /dev/null +++ b/be/src/service/data-stream-service.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_SERVICE_DATA_STREAM_SERVICE_H +#define IMPALA_SERVICE_DATA_STREAM_SERVICE_H + +#include "gen-cpp/data_stream_service.service.h" + +namespace kudu { +namespace rpc { +class RpcContext; +} // namespace rpc +} // namespace kudu + +namespace impala { + +class RpcMgr; + +/// This is singleton class which provides data transmission services between fragment +/// instances. The client for this service is implemented in KrpcDataStreamSender. +/// The processing of incoming requests is implemented in KrpcDataStreamRecvr. +/// KrpcDataStreamMgr is responsible for routing the incoming requests to the +/// appropriate receivers. +class DataStreamService : public DataStreamServiceIf { + public: + DataStreamService(RpcMgr* rpc_mgr); + + /// Notifies the receiver to close the data stream specified in 'request'. + /// The receiver replies to the client with a status serialized in 'response'. + virtual void EndDataStream(const EndDataStreamRequestPB* request, + EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context); + + /// Sends a row batch to the receiver specified in 'request'. + /// The receiver replies to the client with a status serialized in 'response'. + virtual void TransmitData(const TransmitDataRequestPB* request, + TransmitDataResponsePB* response, kudu::rpc::RpcContext* context); +}; + +} // namespace impala +#endif // IMPALA_SERVICE_DATA_STREAM_SERVICE_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 07a83eb..4c540b7 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1195,7 +1195,7 @@ void ImpalaServer::TransmitData( } if (params.eos) { - exec_env_->stream_mgr()->CloseSender( + exec_env_->ThriftStreamMgr()->CloseSender( params.dest_fragment_instance_id, params.dest_node_id, params.sender_id).SetTStatus(&return_val); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/cmake_modules/FindProtobuf.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/FindProtobuf.cmake b/cmake_modules/FindProtobuf.cmake index 4c2ab2f..f300141 100644 --- a/cmake_modules/FindProtobuf.cmake +++ b/cmake_modules/FindProtobuf.cmake @@ -180,7 +180,7 @@ function(PROTOBUF_GENERATE_CPP SRCS HDRS TGTS) # This custom target enforces that there's just one invocation of protoc # when there are multiple consumers of the generated files. The target name # must be unique; adding parts of the filename helps ensure this. - set(TGT_NAME ${REL_DIR}${FIL}) + set(TGT_NAME PROTO_${REL_DIR}${FIL}) string(REPLACE "/" "-" TGT_NAME ${TGT_NAME}) add_custom_target(${TGT_NAME} DEPENDS "${PROTO_CC_OUT}" "${PROTO_H_OUT}") http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt index 4d5f121..2faec7e 100644 --- a/common/protobuf/CMakeLists.txt +++ b/common/protobuf/CMakeLists.txt @@ -20,6 +20,30 @@ cmake_minimum_required(VERSION 2.6) set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/) +PROTOBUF_GENERATE_CPP( + COMMON_PROTO_SRCS COMMON_PROTO_HDRS COMMON_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR} + BINARY_ROOT ${PROTOBUF_OUTPUT_DIR} + PROTO_FILES common.proto) +add_custom_target(common_proto DEPENDS ${COMMON_PROTO_TGTS}) +set(COMMON_PROTO_SRCS ${COMMON_PROTO_SRCS} PARENT_SCOPE) + +PROTOBUF_GENERATE_CPP( + ROW_BATCH_PROTO_SRCS ROW_BATCH_PROTO_HDRS ROW_BATCH_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR} + BINARY_ROOT ${PROTOBUF_OUTPUT_DIR} + PROTO_FILES row_batch.proto) +add_custom_target(row_batch_proto DEPENDS ${ROW_BATCH_PROTO_TGTS}) +set(ROW_BATCH_PROTO_SRCS ${ROW_BATCH_PROTO_SRCS} PARENT_SCOPE) + +KRPC_GENERATE(DATA_STREAM_SVC_PROTO_SRCS DATA_STREAM_SVC_PROTO_HDRS + DATA_STREAM_SVC_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR} + BINARY_ROOT ${PROTOBUF_OUTPUT_DIR} + PROTO_FILES data_stream_service.proto) +add_custom_target(data_stream_svc_proto DEPENDS ${DATA_STREAM_SVC_PROTO_TGTS}) +set(DATA_STREAM_SVC_PROTO_SRCS ${DATA_STREAM_SVC_PROTO_SRCS} PARENT_SCOPE) + KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS RPC_TEST_PROTO_TGTS SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR} @@ -28,4 +52,4 @@ KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS}) set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE) -add_custom_target(proto-deps ALL DEPENDS token_proto rpc_header_proto) \ No newline at end of file +add_custom_target(proto-deps DEPENDS token_proto rpc_header_proto common_proto row_batch_proto data_stream_svc_proto) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/common.proto ---------------------------------------------------------------------- diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto new file mode 100644 index 0000000..17f9fc6 --- /dev/null +++ b/common/protobuf/common.proto @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Common protobuf definitions. + +package impala; + +// Proto-serialized version of Impala's Status object. +message StatusPB { + optional int32 status_code = 1; + repeated string error_msgs = 2; +} + +// 128-bit ID (equivalent to TUniqueID). +message UniqueIdPB { + optional int64 hi = 1; + optional int64 lo = 2; +} + +// The compression codec. Currently used in row batch's header to +// indicate the type of compression applied to the row batch. +enum CompressionType { + NONE = 0; // No compression. + LZ4 = 1; +}; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/data_stream_service.proto ---------------------------------------------------------------------- diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto new file mode 100644 index 0000000..3aa3f28 --- /dev/null +++ b/common/protobuf/data_stream_service.proto @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package impala; + +import "common.proto"; +import "row_batch.proto"; + +// All fields are required in V1. +message TransmitDataRequestPB { + // The fragment instance id of the receiver. + optional UniqueIdPB dest_fragment_instance_id = 1; + + // Sender instance id, unique within a fragment. + optional int32 sender_id = 2; + + // PlanNodeId of the exchange node which owns the receiver. + optional int32 dest_node_id = 3; + + // The header which contains the meta-data of the row batch. + optional RowBatchHeaderPB row_batch_header = 4; + + // The sidecar index of tuple offsets' buffer which is an array of int32 containing the + // offsets of tuples into the buffer pointed to by tuple data's sidecar below. There are + // num_rows * num_tuples_per_row offsets in total. An offset of -1 records a NULL. + optional int32 tuple_offsets_sidecar_idx = 5; + + // The sidecar index of the tuple's data which is a (compressed) row batch. + // The details of the row batch (e.g. # of rows) is in 'row_batch_header' above. + optional int32 tuple_data_sidecar_idx = 6; +} + +// All fields are required in V1. +message TransmitDataResponsePB { + // Status::OK() on success; Error status on failure. + optional StatusPB status = 1; +} + +// All fields are required in V1. +message EndDataStreamRequestPB { + // The fragment instance id of the receiver. + optional UniqueIdPB dest_fragment_instance_id = 1; + + // Sender instance id, unique within a fragment. + optional int32 sender_id = 2; + + // PlanNodeId of the exchange node which owns the receiver. + optional int32 dest_node_id = 3; +} + +// All fields are required in V1. +message EndDataStreamResponsePB { + optional StatusPB status = 1; +} + +// Handles data transmission between fragment instances. +service DataStreamService { + // Called by sender to transmit a single row batch. Returns error indication + // if params.fragmentId or params.destNodeId are unknown or if data couldn't + // be read. + rpc TransmitData(TransmitDataRequestPB) returns (TransmitDataResponsePB); + + // Called by a sender to close the channel between fragment instances. + rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/row_batch.proto ---------------------------------------------------------------------- diff --git a/common/protobuf/row_batch.proto b/common/protobuf/row_batch.proto new file mode 100644 index 0000000..5aef55b --- /dev/null +++ b/common/protobuf/row_batch.proto @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package impala; + +import "common.proto"; + +// The serialized version of a header of a RowBatch (in be/src/runtime/row-batch.h). +// It contains the meta-data of a row batch. The actual data of a row batch is sent +// as KRPC sidecars. Please see TransmitDataRequestPB for details. +// All fields are required in V1. +message RowBatchHeaderPB { + // Total number of rows contained in this batch. + optional int32 num_rows = 1; + + // Number of tuples per row in this batch. + optional int32 num_tuples_per_row = 2; + + // Size of 'tuple_data' in bytes before any compression is applied. + optional int64 uncompressed_size = 3; + + // The compression codec (if any) used for compressing the row batch. + optional CompressionType compression_type = 4; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index ad07963..dd7afc5 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -336,7 +336,8 @@ error_codes = ( ("DISK_IO_ERROR", 110, "Disk I/O error: $0"), - + ("DATASTREAM_RECVR_CLOSED", 111, + "DataStreamRecvr for fragment=$0, node=$1 is closed already"), ) import sys
