This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fa2d4c756646b3f8428d2931113e8dfc3bc5af79 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Mar 26 22:28:06 2020 -0700 client: add support for columnar format scan Change-Id: I69ec4487c78b872e7b9eba5facdd44cfd6b8f4fa Reviewed-on: http://gerrit.cloudera.org:8080/15622 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/client/CMakeLists.txt | 2 + src/kudu/client/client-test.cc | 45 ++++++++++ src/kudu/client/client.cc | 41 +++++---- src/kudu/client/client.h | 29 +++++++ src/kudu/client/columnar_scan_batch.cc | 55 ++++++++++++ src/kudu/client/columnar_scan_batch.h | 90 ++++++++++++++++++++ src/kudu/client/scanner-internal.cc | 121 +++++++++++++++++++++++++-- src/kudu/client/scanner-internal.h | 59 ++++++++++++- src/kudu/tools/tool_action_remote_replica.cc | 2 +- 9 files changed, 421 insertions(+), 23 deletions(-) diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt index ffd536a..f456757 100644 --- a/src/kudu/client/CMakeLists.txt +++ b/src/kudu/client/CMakeLists.txt @@ -34,6 +34,7 @@ set(CLIENT_SRCS client.cc client_builder-internal.cc client-internal.cc + columnar_scan_batch.cc error_collector.cc error-internal.cc hash.cc @@ -177,6 +178,7 @@ install(TARGETS kudu_client_exported install(FILES callbacks.h client.h + columnar_scan_batch.h hash.h row_result.h scan_batch.h diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index f8bca5d..668c9d3 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -51,6 +51,7 @@ #include "kudu/client/client-internal.h" #include "kudu/client/client-test-util.h" #include "kudu/client/client.pb.h" +#include "kudu/client/columnar_scan_batch.h" #include "kudu/client/error_collector.h" #include "kudu/client/meta_cache.h" #include "kudu/client/resource_metrics.h" @@ -104,6 +105,7 @@ #include "kudu/tserver/tablet_server_options.h" #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/util/array_view.h" #include "kudu/util/async_util.h" #include "kudu/util/barrier.h" #include "kudu/util/countdown_latch.h" @@ -1051,6 +1053,49 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) { ASSERT_STR_CONTAINS(s.ToString(), "in the future."); } +TEST_F(ClientTest, TestColumnarScan) { + // Set the batch size such that a full scan could yield either multi-batch + // or single-batch scans. + int64_t num_rows = rand() % 2000; + int64_t batch_size = rand() % 1000; + FLAGS_scanner_batch_size_rows = batch_size; + + NO_FATALS(InsertTestRows(client_table_.get(), num_rows)); + KuduScanner scanner(client_table_.get()); + ASSERT_OK(scanner.SetRowFormatFlags(KuduScanner::COLUMNAR_LAYOUT)); + + ASSERT_OK(scanner.Open()); + KuduColumnarScanBatch batch; + int total_rows = 0; + while (scanner.HasMoreRows()) { + ASSERT_OK(scanner.NextBatch(&batch)); + + // Verify the data. + Slice col_data[4]; + for (int i = 0; i < 4; i++) { + ASSERT_OK(batch.GetDataForColumn(i, &col_data[i])); + } + ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()), + batch.NumRows()); + ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()), + batch.NumRows()); + ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()), + batch.NumRows()); + ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()), + batch.NumRows()); + + for (int i = 0; i < batch.NumRows(); i++) { + int row_idx = total_rows + i; + EXPECT_EQ(row_idx, c0[i]); + EXPECT_EQ(row_idx * 2, c1[i]); + EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]); + EXPECT_EQ(row_idx * 3, c3[i]); + } + total_rows += batch.NumRows(); + } + ASSERT_EQ(num_rows, total_rows); +} + const KuduScanner::ReadMode read_modes[] = { KuduScanner::READ_LATEST, KuduScanner::READ_AT_SNAPSHOT, diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index 571844a..968537b 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -37,6 +37,7 @@ #include "kudu/client/client-internal.h" #include "kudu/client/client.pb.h" #include "kudu/client/client_builder-internal.h" +#include "kudu/client/columnar_scan_batch.h" #include "kudu/client/error-internal.h" #include "kudu/client/error_collector.h" #include "kudu/client/master_proxy_rpc.h" @@ -1569,6 +1570,7 @@ Status KuduScanner::SetRowFormatFlags(uint64_t flags) { switch (flags) { case NO_FLAGS: case PAD_UNIXTIME_MICROS_TO_16_BYTES: + case COLUMNAR_LAYOUT: break; default: return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags)); @@ -1711,6 +1713,15 @@ Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) { } Status KuduScanner::NextBatch(KuduScanBatch* batch) { + return NextBatch(batch->data_); +} + +Status KuduScanner::NextBatch(KuduColumnarScanBatch* batch) { + return NextBatch(batch->data_); +} + +Status KuduScanner::NextBatch(internal::ScanBatchDataInterface* batch_data) { + // TODO: do some double-buffering here -- when we return this batch // we should already have fired off the RPC for the next batch, but // need to do some swapping of the response objects around to avoid @@ -1718,7 +1729,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) { CHECK(data_->open_); CHECK(data_->proxy_); - batch->data_->Clear(); + batch_data->Clear(); if (data_->short_circuit_) { return Status::OK(); @@ -1728,12 +1739,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) { // We have data from a previous scan. VLOG(2) << "Extracting data from " << data_->DebugString(); data_->data_in_open_ = false; - return batch->data_->Reset(&data_->controller_, - data_->configuration().projection(), - data_->configuration().client_projection(), - data_->configuration().row_format_flags(), - unique_ptr<RowwiseRowBlockPB>( - data_->last_response_.release_data())); + return batch_data->Reset(&data_->controller_, + data_->configuration().projection(), + data_->configuration().client_projection(), + data_->configuration().row_format_flags(), + &data_->last_response_); } if (data_->last_response_.has_more_results()) { @@ -1753,12 +1763,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) { data_->last_primary_key_ = data_->last_response_.last_primary_key(); } data_->scan_attempts_ = 0; - return batch->data_->Reset(&data_->controller_, - data_->configuration().projection(), - data_->configuration().client_projection(), - data_->configuration().row_format_flags(), - unique_ptr<RowwiseRowBlockPB>( - data_->last_response_.release_data())); + return batch_data->Reset(&data_->controller_, + data_->configuration().projection(), + data_->configuration().client_projection(), + data_->configuration().row_format_flags(), + &data_->last_response_); } data_->scan_attempts_++; @@ -1797,7 +1806,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) { set<string> blacklist; RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist)); - // No rows written, the next invocation will pick them up. + if (data_->data_in_open_) { + // Avoid returning an empty batch in between tablets if we have data + // we can return from this call. + return NextBatch(batch_data); + } return Status::OK(); } else { // No more data anywhere. diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 5c59ac7..f8d16d8 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -73,6 +73,7 @@ class RemoteKsckCluster; namespace client { +class KuduColumnarScanBatch; class KuduDelete; class KuduInsert; class KuduInsertIgnore; @@ -102,6 +103,7 @@ class RemoteTablet; class RemoteTabletServer; class ReplicaController; class RetrieveAuthzTokenRpc; +class ScanBatchDataInterface; class WriteRpc; template <class ReqClass, class RespClass> class AsyncLeaderMasterRpc; // IWYU pragma: keep @@ -2255,6 +2257,9 @@ class KUDU_EXPORT KuduScanner { /// Fetch the next batch of results for this scanner. /// + /// This variant may not be used when the scan is configured with the + /// COLUMNAR_LAYOUT RowFormatFlag. + /// /// A single KuduScanBatch object may be reused. Each subsequent call /// replaces the data from the previous call, and invalidates any /// KuduScanBatch::RowPtr objects previously obtained from the batch. @@ -2263,6 +2268,19 @@ class KUDU_EXPORT KuduScanner { /// @return Operation result status. Status NextBatch(KuduScanBatch* batch); + /// Fetch the next batch of columnar results for this scanner. + /// + /// This variant may only be used when the scan is configured with the + /// COLUMNAR_LAYOUT RowFormatFlag. + /// + /// A single KuduColumnarScanBatch object may be reused. Each subsequent call + /// replaces the data from the previous call, and invalidates any + /// Slice objects previously obtained from the batch. + /// @param [out] batch + /// Placeholder for the result. + /// @return Operation result status. + Status NextBatch(KuduColumnarScanBatch* batch); + /// Get the KuduTabletServer that is currently handling the scan. /// /// More concretely, this is the server that handled the most recent @@ -2389,6 +2407,15 @@ class KUDU_EXPORT KuduScanner { /// results and might even cause the client to crash. static const uint64_t PAD_UNIXTIME_MICROS_TO_16_BYTES = 1 << 0; + /// Enable column-oriented data transfer. The server will transfer data to the client + /// in a columnar format rather than a row-wise format. The KuduColumnarScanBatch API + /// must be used to fetch results from this scan. + /// + /// NOTE: older versions of the Kudu server do not support this feature. Clients + /// aiming to support compatibility with previous versions should have a fallback + /// code path. + static const uint64_t COLUMNAR_LAYOUT = 1 << 1; + /// Optionally set row format modifier flags. /// /// If flags is RowFormatFlags::NO_FLAGS, then no modifications will be made to the row @@ -2436,6 +2463,8 @@ class KUDU_EXPORT KuduScanner { private: class KUDU_NO_EXPORT Data; + Status NextBatch(internal::ScanBatchDataInterface* batch); + friend class KuduScanToken; FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts); FRIEND_TEST(ClientTest, TestScanCloseProxy); diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc new file mode 100644 index 0000000..b346445 --- /dev/null +++ b/src/kudu/client/columnar_scan_batch.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/client/columnar_scan_batch.h" + +#include "kudu/client/scanner-internal.h" +#include "kudu/common/wire_protocol.pb.h" +#include "kudu/rpc/rpc_controller.h" + +namespace kudu { +class Slice; + +namespace client { + +KuduColumnarScanBatch::KuduColumnarScanBatch() + : data_(new KuduColumnarScanBatch::Data()) { +} + +KuduColumnarScanBatch::~KuduColumnarScanBatch() { + delete data_; +} + +int KuduColumnarScanBatch::NumRows() const { + return data_->resp_data_.num_rows(); +} + +Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const { + return data_->GetDataForColumn(idx, data); +} + +Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const { + RETURN_NOT_OK(data_->CheckColumnIndex(idx)); + const auto& col = data_->resp_data_.columns(idx); + if (!col.has_non_null_bitmap_sidecar()) { + return Status::NotFound("column is not nullable"); + } + return data_->controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data); +} + +} // namespace client +} // namespace kudu diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h new file mode 100644 index 0000000..23a3e7d --- /dev/null +++ b/src/kudu/client/columnar_scan_batch.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H +#define KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H + +#ifdef KUDU_HEADERS_NO_STUBS +#include "kudu/gutil/macros.h" +#else +#include "kudu/client/stubs.h" +#endif + +#include "kudu/util/kudu_export.h" +#include "kudu/util/status.h" + +namespace kudu { +class Slice; + +namespace client { + +/// @brief A batch of columnar data returned from a scanner +/// +/// Similar to KuduScanBatch, this contains a batch of rows returned from a scanner. +/// This type of batch is used if the KuduScanner::COLUMNAR_LAYOUT row format flag +/// is enabled. +/// +/// Retrieving rows in columnar layout can be significantly more efficient. It saves +/// some CPU cycles on the Kudu cluster and can also enable faster processing of the +/// returned data in certain client applications. +/// +/// NOTE: this class is not thread-safe. +class KUDU_EXPORT KuduColumnarScanBatch { + public: + KuduColumnarScanBatch(); + ~KuduColumnarScanBatch(); + + /// @return The number of rows in this batch. + int NumRows() const; + + /// Get the raw columnar data corresponding to the column with index 'idx'. + /// + /// The data is in little-endian packed array format. No alignment is guaranteed. + /// Space is reserved for all cells regardless of whether they might be null. + /// The data stored in a null cell may or may not be zeroed. + /// + /// If this returns an error for a given column, then a second call for the same + /// column has undefined results. + /// + /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch. + Status GetDataForColumn(int idx, Slice* data) const; + + /// Get a bitmap corresponding to the non-null status of the cells in the given column. + /// + /// A set bit indicates a non-null cell. + /// If the number of rows is not a multiple of 8, the state of the trailing bits in the + /// bitmap is undefined. + /// + /// It is an error to call this function on a column which is not marked as nullable + /// in the schema. + /// + /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch. + Status GetNonNullBitmapForColumn(int idx, Slice* data) const; + + private: + class KUDU_NO_EXPORT Data; + + friend class KuduScanner; + + Data* data_; + DISALLOW_COPY_AND_ASSIGN(KuduColumnarScanBatch); +}; + + +} // namespace client +} // namespace kudu + +#endif diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc index 9cb14aa..93006b3 100644 --- a/src/kudu/client/scanner-internal.cc +++ b/src/kudu/client/scanner-internal.cc @@ -38,6 +38,7 @@ #include "kudu/common/partition.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" +#include "kudu/common/types.h" #include "kudu/common/wire_protocol.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" @@ -49,11 +50,13 @@ #include "kudu/rpc/rpc_header.pb.h" #include "kudu/security/token.pb.h" #include "kudu/tserver/tserver_service.proxy.h" +#include "kudu/util/array_view.h" #include "kudu/util/async_util.h" #include "kudu/util/bitmap.h" #include "kudu/util/hexdump.h" #include "kudu/util/logging.h" #include "kudu/util/monotime.h" +#include "kudu/util/safe_math.h" using google::protobuf::FieldDescriptor; using google::protobuf::Reflection; @@ -70,6 +73,8 @@ using rpc::RpcController; using security::SignedTokenPB; using strings::Substitute; using tserver::NewScanRequestPB; +using tserver::RowFormatFlags; +using tserver::ScanResponsePB; using tserver::TabletServerFeatures; namespace client { @@ -367,6 +372,10 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline, if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) { controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES); } + if (configuration().row_format_flags() & KuduScanner::COLUMNAR_LAYOUT) { + controller_.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE); + } + if (next_req_.has_new_scan_request()) { // Only new scan requests require authz tokens. Scan continuations rely on // Kudu's prevention of scanner hijacking by different users. @@ -386,7 +395,9 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline, rpc_deadline, overall_deadline); if (scan_status.result == ScanRpcStatus::OK) { UpdateResourceMetrics(); - num_rows_returned_ += last_response_.data().num_rows(); + num_rows_returned_ += last_response_.has_data() ? last_response_.data().num_rows() : 0; + num_rows_returned_ += last_response_.has_columnar_data() ? + last_response_.columnar_data().num_rows() : 0; } return scan_status; } @@ -561,13 +572,15 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key, partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end()); next_req_.clear_new_scan_request(); - data_in_open_ = last_response_.has_data() && last_response_.data().num_rows() > 0; + data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) || + (last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0); if (last_response_.has_more_results()) { next_req_.set_scanner_id(last_response_.scanner_id()); VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", scanner ID " << last_response_.scanner_id(); - } else if (last_response_.has_data()) { - VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned"; + } else if (last_response_.has_data() || last_response_.has_columnar_data()) { + VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, " + << " data_in_open=" << data_in_open_; } else { VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned"; } @@ -670,13 +683,19 @@ Status KuduScanBatch::Data::Reset(RpcController* controller, const Schema* projection, const KuduSchema* client_projection, uint64_t row_format_flags, - unique_ptr<RowwiseRowBlockPB> resp_data) { + ScanResponsePB* response) { CHECK(controller->finished()); + if (row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT) { + return Status::InvalidArgument("columnar layout specified, must use KuduColumnarScanBatch"); + } + controller_.Swap(controller); projection_ = projection; projected_row_size_ = CalculateProjectedRowSize(*projection_); client_projection_ = client_projection; row_format_flags_ = row_format_flags; + unique_ptr<RowwiseRowBlockPB> resp_data(response->release_data()); + if (!resp_data) { // No new data; just clear out the old stuff. resp_data_.Clear(); @@ -748,5 +767,97 @@ void KuduScanBatch::Data::Clear() { controller_.Reset(); } +//////////////////////////////////////////////////////////// +// KuduColumnarScanBatch +//////////////////////////////////////////////////////////// + +Status KuduColumnarScanBatch::Data::Reset( + rpc::RpcController* controller, + const Schema* projection, + const KuduSchema* client_projection, + uint64_t row_format_flags, + tserver::ScanResponsePB* response) { + if (!(row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT)) { + return Status::InvalidArgument("rowwise layout specified, must use KuduScanBatch"); + } + CHECK(!response->has_data()) << "expected columnar data"; + + CHECK(controller->finished()); + controller_.Swap(controller); + projection_ = projection; + client_projection_ = client_projection; + rewritten_varlen_columns_.clear(); + + unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data()); + if (!resp_data) { + // No new data; just clear out the old stuff. + resp_data_.Clear(); + return Status::OK(); + } + resp_data_ = std::move(*resp_data); + return Status::OK(); +} + +void KuduColumnarScanBatch::Data::Clear() { + resp_data_.Clear(); + controller_.Reset(); +} + +Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const { + if (idx >= resp_data_.columns_size()) { + return Status::InvalidArgument(Substitute("bad column index $0 ($1 columns present)", + idx, resp_data_.columns_size())); + } + return Status::OK(); +} + +Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const { + RETURN_NOT_OK(CheckColumnIndex(idx)); + RETURN_NOT_OK(controller_.GetInboundSidecar( + resp_data_.columns(idx).data_sidecar(), + data)); + + // Rewrite slices to be real pointers instead of pointers relative to the + // indirect data buffer. + if (projection_->column(idx).type_info()->physical_type() == BINARY && + !ContainsKey(rewritten_varlen_columns_, idx)) { + + Slice indirect_data_slice; + RETURN_NOT_OK(controller_.GetInboundSidecar( + resp_data_.columns(idx).indirect_data_sidecar(), + &indirect_data_slice)); + + ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()), + data->size() / sizeof(Slice)); + for (int row_idx = 0; row_idx < v.size(); row_idx++) { + Slice* slice = &v[row_idx]; + size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data()); + // Ensure the updated pointer is within the bounds of the indirect data. + bool overflowed = false; + size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed); + if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) { + const auto& col = projection_->column(idx); + return Status::Corruption( + Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)", + row_idx, col.ToString(), offset_in_indirect, slice->size())); + } + *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size()); + } + rewritten_varlen_columns_.insert(idx); + } + return Status::OK(); +} + +Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const { + RETURN_NOT_OK(CheckColumnIndex(idx)); + const auto& col = resp_data_.columns(idx); + if (!col.has_non_null_bitmap_sidecar()) { + return Status::NotFound("column is not nullable"); + } + return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data); +} + + + } // namespace client } // namespace kudu diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h index f0d07a0..01da884 100644 --- a/src/kudu/client/scanner-internal.h +++ b/src/kudu/client/scanner-internal.h @@ -23,11 +23,13 @@ #include <ostream> #include <set> #include <string> +#include <unordered_set> #include <vector> #include <glog/logging.h> #include "kudu/client/client.h" +#include "kudu/client/columnar_scan_batch.h" #include "kudu/client/resource_metrics.h" #include "kudu/client/row_result.h" #include "kudu/client/scan_batch.h" @@ -294,7 +296,20 @@ class KuduScanner::Data { DISALLOW_COPY_AND_ASSIGN(Data); }; -class KuduScanBatch::Data { +namespace internal { +class ScanBatchDataInterface { + public: + virtual ~ScanBatchDataInterface() = default; + virtual Status Reset(rpc::RpcController* controller, + const Schema* projection, + const KuduSchema* client_projection, + uint64_t row_format_flags, + tserver::ScanResponsePB* response) = 0; + virtual void Clear() = 0; +}; +} // namespace internal + +class KuduScanBatch::Data : public internal::ScanBatchDataInterface { public: Data(); ~Data(); @@ -303,7 +318,7 @@ class KuduScanBatch::Data { const Schema* projection, const KuduSchema* client_projection, uint64_t row_format_flags, - std::unique_ptr<RowwiseRowBlockPB> resp_data); + tserver::ScanResponsePB* response) override; int num_rows() const { return resp_data_.num_rows(); @@ -324,7 +339,7 @@ class KuduScanBatch::Data { void ExtractRows(std::vector<KuduScanBatch::RowPtr>* rows); - void Clear(); + void Clear() override; // Returns the size of a row for the given projection 'proj'. static size_t CalculateProjectedRowSize(const Schema& proj); @@ -354,6 +369,44 @@ class KuduScanBatch::Data { size_t projected_row_size_; }; +class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface { + public: + Status Reset(rpc::RpcController* controller, + const Schema* projection, + const KuduSchema* client_projection, + uint64_t row_format_flags, + tserver::ScanResponsePB* response) override; + void Clear() override; + + Status GetDataForColumn(int idx, Slice* data) const; + Status GetNonNullBitmapForColumn(int idx, Slice* data) const; + + private: + Status CheckColumnIndex(int idx) const; + + friend class KuduColumnarScanBatch; + + // The RPC controller for the RPC which returned this batch. + // Holding on to the controller ensures we hold on to the + // sidecars which contain the actual data. + rpc::RpcController controller_; + + // The PB which contains the "direct data" slice. + ColumnarRowBlockPB resp_data_; + + // Tracks for each variable-length (binary) column whether the pointers have been + // rewritten yet to be "real" pointers instead of sidecar-relative offsets. + // Mutable since the 'GetDataForColumn' call is semantically const, but in fact + // needs to modify this member to do the lazy pointer rewrites. + mutable std::unordered_set<int> rewritten_varlen_columns_; + + // The projection being scanned. + const Schema* projection_; + // The KuduSchema version of 'projection_' + const KuduSchema* client_projection_; +}; + + } // namespace client } // namespace kudu diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc index 4a30000..44c2de6 100644 --- a/src/kudu/tools/tool_action_remote_replica.cc +++ b/src/kudu/tools/tool_action_remote_replica.cc @@ -155,7 +155,7 @@ class ReplicaDumper { &schema, &client_schema, client::KuduScanner::NO_FLAGS, - unique_ptr<RowwiseRowBlockPB>(resp.release_data()))); + &resp)); vector<KuduRowResult> rows; results.ExtractRows(&rows); for (const auto& r : rows) {
