This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit c7c4d47ecca0ea0d90435dac736d22f4063d5507 Author: Todd Lipcon <[email protected]> AuthorDate: Tue Mar 31 12:03:09 2020 -0700 Avoid calling Schema::find_column() once per RowBlock in columnar serialization Prior to this patch, each row block being serialized in the columnar format would result in a call to Schema::find_column(name) for each projected column. That was relatively expensive, involving a hash computation and string equality check, etc. This changes the projection calculation to happen "up front" once per Scan RPC and per-rowblock calls. This optimization could also apply to the rowwise serialization, but I found that the other overheads inherent in that code path are so high that the find_column calls aren't particularly noticeable. Nonetheless I left a TODO. Change-Id: I1b683c7d6d6fe1026ee06c8b5ebfe2a5f1ee6cb1 Reviewed-on: http://gerrit.cloudera.org:8080/15678 Reviewed-by: Andrew Wong <[email protected]> Tested-by: Todd Lipcon <[email protected]> --- src/kudu/common/columnar_serialization.cc | 54 +++++++++----------- src/kudu/common/columnar_serialization.h | 41 +++++++++++---- src/kudu/common/wire_protocol-test.cc | 12 ++--- src/kudu/tserver/tablet_service.cc | 84 +++++++++++++++++++------------ 4 files changed, 112 insertions(+), 79 deletions(-) diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc index f6b289e..52a3425 100644 --- a/src/kudu/common/columnar_serialization.cc +++ b/src/kudu/common/columnar_serialization.cc @@ -587,34 +587,31 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock, } // anonymous namespace } // namespace internal -int SerializeRowBlockColumnar( - const RowBlock& block, - const Schema* projection_schema, - ColumnarSerializedBatch* out) { - DCHECK_GT(block.nrows(), 0); - const Schema* tablet_schema = block.schema(); - - if (projection_schema == nullptr) { - projection_schema = tablet_schema; - } - +ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema, + const Schema& client_schema) { // Initialize buffers for the columns. // TODO(todd) don't pre-size these to 1MB per column -- quite // expensive if there are a lot of columns! - if (out->columns.size() != projection_schema->num_columns()) { - CHECK_EQ(out->columns.size(), 0); - out->columns.reserve(projection_schema->num_columns()); - for (const auto& col : projection_schema->columns()) { - out->columns.emplace_back(); - out->columns.back().data.reserve(1024 * 1024); - if (col.type_info()->physical_type() == BINARY) { - out->columns.back().varlen_data.emplace(); - } - if (col.is_nullable()) { - out->columns.back().non_null_bitmap.emplace(); - } + columns_.reserve(client_schema.num_columns()); + for (const auto& schema_col : client_schema.columns()) { + columns_.emplace_back(); + auto& col = columns_.back(); + + col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name()); + CHECK_NE(col.rowblock_schema_col_idx, -1); + + col.data.reserve(1024 * 1024); + if (schema_col.type_info()->physical_type() == BINARY) { + col.varlen_data.emplace(); + } + if (schema_col.is_nullable()) { + col.non_null_bitmap.emplace(); } } +} + +int ColumnarSerializedBatch::AddRowBlock(const RowBlock& block) { + DCHECK_GT(block.nrows(), 0); SelectedRows sel = block.selection_vector()->GetSelectedRows(); if (sel.num_selected() == 0) { @@ -622,21 +619,18 @@ int SerializeRowBlockColumnar( } int col_idx = 0; - for (const auto& col : projection_schema->columns()) { - int t_schema_idx = tablet_schema->find_column(col.name()); - CHECK_NE(t_schema_idx, -1); - const ColumnBlock& column_block = block.column_block(t_schema_idx); - + for (const auto& col : columns_) { + const ColumnBlock& column_block = block.column_block(col.rowblock_schema_col_idx); if (column_block.type_info()->physical_type() == BINARY) { internal::CopySelectedVarlenCellsFromColumn( column_block, sel, - &out->columns[col_idx]); + &columns_[col_idx]); } else { internal::CopySelectedCellsFromColumn( column_block, sel, - &out->columns[col_idx]); + &columns_[col_idx]); } col_idx++; } diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h index bc74f08..b4862c7 100644 --- a/src/kudu/common/columnar_serialization.h +++ b/src/kudu/common/columnar_serialization.h @@ -17,6 +17,7 @@ #pragma once #include <cstdint> +#include <utility> #include <vector> #include <boost/optional/optional.hpp> @@ -30,8 +31,24 @@ class Schema; // A pending batch of serialized rows, suitable for easy conversion // into the protobuf representation and a set of sidecars. -struct ColumnarSerializedBatch { +class ColumnarSerializedBatch { + public: + // 'rowblock_schema': the schema of the RowBlocks that will be passed to + // AddRowBlock(). + // 'client_schema': the schema to be returned to the client, which may + // contain a subset of columns + ColumnarSerializedBatch(const Schema& rowblock_schema, + const Schema& client_schema); + + // Append the data in 'block' into this columnar batch. + // + // Returns the number of selected rows serialized. + int AddRowBlock(const RowBlock& block); + struct Column { + // The index of the column in the schema of the RowBlocks to be appended. + int rowblock_schema_col_idx; + // Underlying column data. faststring data; @@ -41,17 +58,19 @@ struct ColumnarSerializedBatch { // Each bit is set when a value is non-null boost::optional<faststring> non_null_bitmap; }; - std::vector<Column> columns; -}; -// Serialize the data in 'block' into the columnar batch 'out', appending to -// any data already serialized to the same batch. -// -// Returns the number of selected rows serialized. -int SerializeRowBlockColumnar( - const RowBlock& block, - const Schema* projection_schema, - ColumnarSerializedBatch* out); + const std::vector<Column>& columns() const { + return columns_; + } + + std::vector<Column> TakeColumns() && { + return std::move(columns_); + } + + private: + friend class WireProtocolTest; + std::vector<Column> columns_; +}; //////////////////////////////////////////////////////////// diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index 4a1c791..9ab534d 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -306,13 +306,13 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) { } // Convert all of the RowBlocks to a single serialized (concatenated) columnar format. - ColumnarSerializedBatch batch; + ColumnarSerializedBatch batch(schema_, schema_); for (const auto& block : blocks) { - SerializeRowBlockColumnar(block, nullptr, &batch); + batch.AddRowBlock(block); } // Verify that the resulting serialized data matches the concatenated original data blocks. - ASSERT_EQ(5, batch.columns.size()); + ASSERT_EQ(5, batch.columns().size()); int dst_row_idx = 0; for (const auto& block : blocks) { for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) { @@ -325,7 +325,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) { for (int c = 0; c < schema_.num_columns(); c++) { SCOPED_TRACE(c); const auto& col = schema_.column(c); - const auto& serialized_col = batch.columns[c]; + const auto& serialized_col = batch.columns()[c]; if (col.is_nullable()) { bool expect_null = row.is_null(c);; EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx), @@ -464,8 +464,8 @@ struct RowwiseConverter { struct ColumnarConverter { static void Run(const RowBlock& block) { - ColumnarSerializedBatch batch; - SerializeRowBlockColumnar(block, nullptr, &batch); + ColumnarSerializedBatch batch(*block.schema(), *block.schema()); + batch.AddRowBlock(block); } static constexpr const char* kName = "columnar"; diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 1bb93ff..edfc1fd 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -26,6 +26,7 @@ #include <numeric> #include <ostream> #include <string> +#include <type_traits> #include <unordered_set> #include <vector> @@ -691,7 +692,9 @@ class ScanResultCollector { // request is decoded and checked for 'row_format_flags'. // // Does nothing by default. - virtual Status InitSerializer(uint64_t /* row_format_flags */) { + virtual Status InitSerializer(uint64_t /* row_format_flags */, + const Schema& /* scanner_schema */, + const Schema& /* client_schema */) { return Status::OK(); } @@ -754,6 +757,8 @@ class RowwiseResultSerializer : public ResultSerializer { int SerializeRowBlock(const RowBlock& row_block, const Schema* client_projection_schema) override { + // TODO(todd) create some kind of serializer object that caches the projection + // information to avoid recalculating it on every SerializeRowBlock call. int num_selected = kudu::SerializeRowBlock( row_block, client_projection_schema, &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_); @@ -795,18 +800,21 @@ class RowwiseResultSerializer : public ResultSerializer { class ColumnarResultSerializer : public ResultSerializer { public: - static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) { + static Status Create(uint64_t flags, + const Schema& scanner_schema, + const Schema& client_schema, + unique_ptr<ResultSerializer>* serializer) { if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) { return Status::InvalidArgument("Row format flags not supported with columnar layout"); } - serializer->reset(new ColumnarResultSerializer()); + serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema)); return Status::OK(); } int SerializeRowBlock(const RowBlock& row_block, - const Schema* client_projection_schema) override { + const Schema* /* unused */) override { CHECK(!done_); - int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_); + int n_sel = results_.AddRowBlock(row_block); num_rows_ += n_sel; return n_sel; } @@ -815,7 +823,7 @@ class ColumnarResultSerializer : public ResultSerializer { CHECK(!done_); int total = 0; - for (const auto& col : results_.columns) { + for (const auto& col : results_.columns()) { total += col.data.size(); if (col.varlen_data) { total += col.varlen_data->size(); @@ -831,7 +839,8 @@ class ColumnarResultSerializer : public ResultSerializer { CHECK(!done_); done_ = true; ColumnarRowBlockPB* data = resp->mutable_columnar_data(); - for (auto& col : results_.columns) { + auto cols = std::move(results_).TakeColumns(); + for (auto& col : cols) { auto* col_pb = data->add_columns(); int sidecar_idx; CHECK_OK(context->AddOutboundSidecar( @@ -854,7 +863,10 @@ class ColumnarResultSerializer : public ResultSerializer { } private: - ColumnarResultSerializer() {} + ColumnarResultSerializer(const Schema& scanner_schema, + const Schema& client_schema) + : results_(scanner_schema, client_schema) { + } int64_t num_rows_ = 0; ColumnarSerializedBatch results_; @@ -898,14 +910,17 @@ class ScanResultCopier : public ScanResultCollector { return num_rows_returned_; } - Status InitSerializer(uint64_t row_format_flags) override { + Status InitSerializer(uint64_t row_format_flags, + const Schema& scanner_schema, + const Schema& client_schema) override { if (serializer_) { // TODO(todd) for the NewScanner case, this gets called twice // which is a bit ugly. Refactor to avoid! return Status::OK(); } if (row_format_flags & COLUMNAR_LAYOUT) { - return ColumnarResultSerializer::Create(row_format_flags, &serializer_); + return ColumnarResultSerializer::Create( + row_format_flags, scanner_schema, client_schema, &serializer_); } serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags)); return Status::OK(); @@ -2432,14 +2447,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest", "tablet_id", scan_pb.tablet_id()); - Status s = result_collector->InitSerializer(scan_pb.row_format_flags()); - if (!s.ok()) { - *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; - return s; - } - - const Schema& tablet_schema = replica->tablet_metadata()->schema(); - SharedScanner scanner; server_->scanner_manager()->NewScanner(replica, rpc_context->remote_user(), @@ -2455,7 +2462,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, // Create the user's requested projection. // TODO(todd): Add test cases for bad projections including 0 columns. Schema projection; - s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection); + Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection); if (PREDICT_FALSE(!s.ok())) { *error_code = TabletServerErrorPB::INVALID_SCHEMA; return s; @@ -2480,6 +2487,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, } } + const Schema& tablet_schema = replica->tablet_metadata()->schema(); + ScanSpec spec; s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec); if (PREDICT_FALSE(!s.ok())) { @@ -2498,11 +2507,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, // NOTE: We should build the missing column after optimizing scan which will // remove unnecessary predicates. vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection); - if (spec.CanShortCircuit()) { - VLOG(1) << "short-circuiting without creating a server-side scanner."; - *has_more_results = false; - return Status::OK(); - } // Store the original projection. { @@ -2551,6 +2555,20 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, projection = projection_builder.BuildWithoutIds(); VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO); + s = result_collector->InitSerializer(scan_pb.row_format_flags(), + projection, + *scanner->client_projection_schema()); + if (!s.ok()) { + *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; + return s; + } + + if (spec.CanShortCircuit()) { + VLOG(1) << "short-circuiting without creating a server-side scanner."; + *has_more_results = false; + return Status::OK(); + } + // It's important to keep the reference to the tablet for the case when the // tablet replica's shutdown is run concurrently with the code below. shared_ptr<Tablet> tablet; @@ -2741,13 +2759,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, << SecureShortDebugString(*req); TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id()); - // Set the row format flags on the ScanResultCollector. - s = result_collector->InitSerializer(scanner->row_format_flags()); - if (!s.ok()) { - *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; - return s; - } - if (batch_size_bytes == 0 && req->close_scanner()) { *has_more_results = false; return Status::OK(); @@ -2761,11 +2772,20 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, RowwiseIterator* iter = scanner->iter(); + // Set the row format flags on the ScanResultCollector. + s = result_collector->InitSerializer(scanner->row_format_flags(), + iter->schema(), + *scanner->client_projection_schema()); + if (!s.ok()) { + *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; + return s; + } + // TODO(todd): could size the RowBlock based on the user's requested batch size? // If people had really large indirect objects, we would currently overshoot // their requested batch size by a lot. Arena arena(32 * 1024); - RowBlock block(&scanner->iter()->schema(), + RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &arena); // TODO(todd): in the future, use the client timeout to set a budget. For now,
