Repository: kudu
Updated Branches:
refs/heads/master 1fc87e993 -> 1e27c9fc4
(01/05) delta_store: avoid copying deleted row data in ApplyUpdates
When applying deltas, the scan path will first populate a selection vector
with deletes (i.e. a row is unset if there's a relevant DELETE for it), and
then use it to enable two optimizations:
1. Short-circuit out if all rows in the block have been deleted.
2. Skip predicate evaluation and base data copying for deleted rows.
To this we can add a third optimization: don't apply a delta whose mutation
is to a deleted row. Note that it's not incorrect to apply such deltas (as
we'll skip deleted rows when serializing the scan response to the client);
it's just unnecessary work.
I wrote a microbenchmark to evaluate the impact of this optimization. Below
is the timing and perf stat output of the microbenchmark before and after
applying the optimization (specifically, applying this patch, then
commenting out the filtering in DeltaPrepare::ApplyUpdates for the first
run, and uncommenting it for the next run).
Time spent running 1000 scans with 0 deletes: real 0.523s user 0.524s
sys 0.000s
Time spent running 1000 scans with 10 deletes: real 0.515s user 0.516s
sys 0.000s
Time spent running 1000 scans with 100 deletes: real 0.512s user 0.512s
sys 0.000s
Time spent running 1000 scans with 1000 deletes: real 0.553s user 0.552s
sys 0.000s
Performance counter stats for 'bin/deltamemstore-test
--gtest_filter=*Varying* --benchmark_num_passes=1000':
2201.029290 task-clock (msec) # 0.991 CPUs utilized
5 context-switches # 0.002 K/sec
0 cpu-migrations # 0.000 K/sec
4,950 page-faults # 0.002 M/sec
8,276,723,832 cycles # 3.760 GHz
24,539,935,941 instructions # 2.96 insn per cycle
4,709,709,705 branches # 2139.776 M/sec
12,631,579 branch-misses # 0.27% of all branches
2.220370506 seconds time elapsed
Time spent running 1000 scans with 0 deletes: real 0.474s user 0.475s
sys 0.000s
Time spent running 1000 scans with 10 deletes: real 0.475s user 0.472s
sys 0.004s
Time spent running 1000 scans with 100 deletes: real 0.478s user 0.476s
sys 0.004s
Time spent running 1000 scans with 1000 deletes: real 0.550s user 0.552s
sys 0.000s
Performance counter stats for 'bin/deltamemstore-test
--gtest_filter=*Varying* --benchmark_num_passes=1000':
2074.795741 task-clock (msec) # 0.990 CPUs utilized
23 context-switches # 0.011 K/sec
1 cpu-migrations # 0.000 K/sec
4,951 page-faults # 0.002 M/sec
7,675,100,058 cycles # 3.699 GHz
23,100,692,252 instructions # 3.01 insn per cycle
4,539,777,117 branches # 2188.060 M/sec
11,819,267 branch-misses # 0.26% of all branches
2.096193851 seconds time elapsed
Note: I originally wrote this patch thinking it was necessary for diff scan
correctness, but have since convinced myself that it's just an optimization.
Change-Id: I6a646d0816a96e9aba486c0d0a1e6b4a7e15c144
Reviewed-on: http://gerrit.cloudera.org:8080/11856
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <[email protected]>
Reviewed-by: Mike Percy <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1e27c9fc
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1e27c9fc
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1e27c9fc
Branch: refs/heads/master
Commit: 1e27c9fc4ca9d92d0a053a24e76f1420aac0abbf
Parents: 1fc87e9
Author: Adar Dembo <[email protected]>
Authored: Thu Nov 1 15:58:56 2018 -0700
Committer: Adar Dembo <[email protected]>
Committed: Fri Nov 9 04:23:04 2018 +0000
----------------------------------------------------------------------
src/kudu/tablet/delta_applier.cc | 2 +-
src/kudu/tablet/delta_applier.h | 2 +-
src/kudu/tablet/delta_iterator_merger.cc | 5 +-
src/kudu/tablet/delta_iterator_merger.h | 3 +-
src/kudu/tablet/delta_store.cc | 6 ++-
src/kudu/tablet/delta_store.h | 12 +++--
src/kudu/tablet/deltafile-test.cc | 7 ++-
src/kudu/tablet/deltafile.cc | 5 +-
src/kudu/tablet/deltafile.h | 3 +-
src/kudu/tablet/deltamemstore-test.cc | 71 +++++++++++++++++++++------
src/kudu/tablet/deltamemstore.cc | 5 +-
src/kudu/tablet/deltamemstore.h | 3 +-
src/kudu/tablet/tablet-test-util.h | 38 ++++++++------
13 files changed, 116 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_applier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_applier.cc b/src/kudu/tablet/delta_applier.cc
index 73bc407..459b5a9 100644
--- a/src/kudu/tablet/delta_applier.cc
+++ b/src/kudu/tablet/delta_applier.cc
@@ -109,7 +109,7 @@ Status
DeltaApplier::MaterializeColumn(ColumnMaterializationContext *ctx) {
if (delta_iter_->MayHaveDeltas()) {
ctx->SetDecoderEvalNotSupported();
RETURN_NOT_OK(base_iter_->MaterializeColumn(ctx));
- RETURN_NOT_OK(delta_iter_->ApplyUpdates(ctx->col_idx(), ctx->block()));
+ RETURN_NOT_OK(delta_iter_->ApplyUpdates(ctx->col_idx(), ctx->block(),
*ctx->sel()));
} else {
RETURN_NOT_OK(base_iter_->MaterializeColumn(ctx));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_applier.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_applier.h b/src/kudu/tablet/delta_applier.h
index 27e8041..00e37ef 100644
--- a/src/kudu/tablet/delta_applier.h
+++ b/src/kudu/tablet/delta_applier.h
@@ -18,8 +18,8 @@
#define KUDU_TABLET_DELTA_APPLIER_H
#include <cstddef>
-#include <string>
#include <memory>
+#include <string>
#include <vector>
#include <gtest/gtest_prod.h>
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_iterator_merger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.cc
b/src/kudu/tablet/delta_iterator_merger.cc
index 4230c1c..23ed670 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -64,9 +64,10 @@ Status DeltaIteratorMerger::PrepareBatch(size_t nrows,
PrepareFlag flag) {
return Status::OK();
}
-Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock*
dst) {
+Status DeltaIteratorMerger::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) {
for (const unique_ptr<DeltaIterator> &iter : iters_) {
- RETURN_NOT_OK(iter->ApplyUpdates(col_to_apply, dst));
+ RETURN_NOT_OK(iter->ApplyUpdates(col_to_apply, dst, filter));
}
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_iterator_merger.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.h
b/src/kudu/tablet/delta_iterator_merger.h
index cbfb685..e5e3bb1 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -62,7 +62,8 @@ class DeltaIteratorMerger : public DeltaIterator {
Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
- Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+ Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) override;
Status ApplyDeletes(SelectionVector* sel_vec) override;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 94ee510..13b1e4a 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -214,7 +214,8 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key,
Slice val, bool* fin
}
template<class Traits>
-Status DeltaPreparer<Traits>::ApplyUpdates(size_t col_to_apply, ColumnBlock*
dst) {
+Status DeltaPreparer<Traits>::ApplyUpdates(size_t col_to_apply, ColumnBlock*
dst,
+ const SelectionVector& filter) {
DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
@@ -222,6 +223,9 @@ Status DeltaPreparer<Traits>::ApplyUpdates(size_t
col_to_apply, ColumnBlock* dst
for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
int32_t idx_in_block = cu.row_id - prev_prepared_idx_;
DCHECK_GE(idx_in_block, 0);
+ if (!filter.IsRowSelected(idx_in_block)) {
+ continue;
+ }
SimpleConstCell src(col_schema, cu.new_val_ptr);
ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index e9e8846..12ec9c6 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -137,10 +137,15 @@ class PreparedDeltas {
public:
// Applies the snapshotted updates to one of the columns.
//
- // 'dst' must be the same length as was previously passed to PrepareBatch()
+ // 'dst' must be the same length as was previously passed to PrepareBatch().
+ //
+ // Updates belonging to unselected rows in 'filter' will be skipped. This is
+ // intended as an optimization; if the caller knows with certainty that some
+ // rows are irrelevant (e.g. they've been deleted), we can avoid some
copying.
//
// Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
- virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) = 0;
+ virtual Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) = 0;
// Applies any deletes to the given selection vector.
//
@@ -280,7 +285,8 @@ class DeltaPreparer : public PreparedDeltas {
// Call when a new delta becomes available in DeltaIterator::PrepareBatch.
Status AddDelta(const DeltaKey& key, Slice val, bool* finished_row);
- Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+ Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) override;
Status ApplyDeletes(SelectionVector* sel_vec) override;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltafile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile-test.cc
b/src/kudu/tablet/deltafile-test.cc
index 7c4a454..1226363 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -193,8 +193,11 @@ class TestDeltaFile : public KuduTest {
arena_.Reset();
ASSERT_OK_FAST(it->PrepareBatch(block.nrows(),
DeltaIterator::PREPARE_FOR_APPLY));
+ SelectionVector sv(block.nrows());
+ sv.SetAllTrue();
+ ASSERT_OK_FAST(it->ApplyDeletes(&sv));
ColumnBlock dst_col = block.column_block(0);
- ASSERT_OK_FAST(it->ApplyUpdates(0, &dst_col));
+ ASSERT_OK_FAST(it->ApplyUpdates(0, &dst_col, sv));
for (int i = 0; i < block.nrows(); i++) {
uint32_t row = start_row + i;
@@ -523,7 +526,7 @@ TYPED_TEST(DeltaTypeTestDeltaFile,
BenchmarkPrepareAndApply) {
scb[j] = 0;
}
for (int j = 0; j < projection.num_columns(); j++) {
- ASSERT_OK(iter->ApplyUpdates(j, &scb));
+ ASSERT_OK(iter->ApplyUpdates(j, &scb, sel_vec));
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 3a0f017..575895a 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -661,8 +661,9 @@ Status DeltaFileIterator<Type>::AddDeltas(rowid_t
start_row, rowid_t stop_row) {
}
template<DeltaType Type>
-Status DeltaFileIterator<Type>::ApplyUpdates(size_t col_to_apply, ColumnBlock*
dst) {
- return preparer_.ApplyUpdates(col_to_apply, dst);
+Status DeltaFileIterator<Type>::ApplyUpdates(size_t col_to_apply, ColumnBlock*
dst,
+ const SelectionVector& filter) {
+ return preparer_.ApplyUpdates(col_to_apply, dst, filter);
}
template<DeltaType Type>
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 8ec98ce..e99a4f4 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -229,7 +229,8 @@ class DeltaFileIterator : public DeltaIterator {
Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
- Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+ Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) override;
Status ApplyDeletes(SelectionVector* sel_vec) override;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltamemstore-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore-test.cc
b/src/kudu/tablet/deltamemstore-test.cc
index 856a4aa..3edaa51 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -38,6 +38,7 @@
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
@@ -48,7 +49,6 @@
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
@@ -101,14 +101,6 @@ class TestDeltaMemStore : public KuduTest {
CHECK_OK(dms_->Init(nullptr));
}
- void SetUp() OVERRIDE {
- KuduTest::SetUp();
-
- fs_manager_.reset(new FsManager(env_, GetTestPath("fs_root")));
- ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
- ASSERT_OK(fs_manager_->Open());
- }
-
static Schema CreateSchema() {
SchemaBuilder builder;
CHECK_OK(builder.AddColumn("col1", STRING));
@@ -156,10 +148,12 @@ class TestDeltaMemStore : public KuduTest {
ASSERT_OK(iter->Init(nullptr));
ASSERT_OK(iter->SeekToOrdinal(row_idx));
ASSERT_OK(iter->PrepareBatch(cb->nrows(),
DeltaIterator::PREPARE_FOR_APPLY));
- ASSERT_OK(iter->ApplyUpdates(0, cb));
+ SelectionVector filter(cb->nrows());
+ filter.SetAllTrue();
+ ASSERT_OK(iter->ApplyDeletes(&filter));
+ ASSERT_OK(iter->ApplyUpdates(0, cb, filter));
}
-
protected:
static const int kStringColumn = 1;
static const int kIntColumn = 2;
@@ -170,7 +164,6 @@ class TestDeltaMemStore : public KuduTest {
shared_ptr<DeltaMemStore> dms_;
scoped_refptr<clock::Clock> clock_;
MvccManager mvcc_;
- gscoped_ptr<FsManager> fs_manager_;
};
static void GenerateRandomIndexes(uint32_t range, uint32_t count,
@@ -213,8 +206,12 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
// Flush the delta file so that the stats get updated.
+
+ FsManager fs(env_, GetTestPath("fs_root"));
+ ASSERT_OK(fs.CreateInitialFileSystemLayout());
+ ASSERT_OK(fs.Open());
unique_ptr<WritableBlock> block;
- ASSERT_OK(fs_manager_->CreateNewBlock({}, &block));
+ ASSERT_OK(fs.CreateNewBlock({}, &block));
DeltaFileWriter dfw(std::move(block));
ASSERT_OK(dfw.Start());
gscoped_ptr<DeltaStats> stats;
@@ -341,6 +338,48 @@ TEST_P(TestDeltaMemStoreNumUpdates,
BenchmarkSnapshotScans) {
}
}
+class TestDeltaMemStoreNumDeletes : public TestDeltaMemStore,
+ public ::testing::WithParamInterface<int> {
+};
+
+INSTANTIATE_TEST_CASE_P(DifferentNumDeletes,
+ TestDeltaMemStoreNumDeletes, ::testing::Values(0, 10,
100, 1000));
+
+TEST_P(TestDeltaMemStoreNumDeletes, BenchmarkScansWithVaryingNumberOfDeletes) {
+ const int kNumUpdates = 10000;
+
+ // Populate the DMS with kNumRows updates, one to each row.
+ faststring buf;
+ RowChangeListEncoder update(&buf);
+ rowid_t delete_stepping = GetParam() != 0 ? kNumUpdates / GetParam() : 0;
+ for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
+ update.Reset();
+
+ uint32_t new_val = row_idx;
+ update.AddColumnUpdate(schema_.column(kIntColumn),
+ schema_.column_id(kIntColumn), &new_val);
+ ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf),
op_id_));
+
+ // When appropriate, add a DELETE too.
+ if (delete_stepping != 0 && row_idx % delete_stepping == 0) {
+ update.Reset();
+ update.SetToDelete();
+ ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx,
RowChangeList(buf), op_id_));
+ }
+ }
+
+ // Now scan the DMS. The scans are repeated in a number of passes to
stabilize
+ // the results.
+ ScopedColumnBlock<UINT32> ints(kNumUpdates);
+ MvccSnapshot snap(MvccSnapshot::CreateSnapshotIncludingAllTransactions());
+ LOG_TIMING(INFO, Substitute("running $0 scans with $1 deletes",
+ FLAGS_benchmark_num_passes, GetParam())) {
+ for (int pass = 0; pass < FLAGS_benchmark_num_passes; pass++) {
+ NO_FATALS(ApplyUpdates(snap, 0, kIntColumn, &ints));
+ }
+ }
+}
+
// Test when a slice column has been updated multiple times in the
// memrowset that the referred to values properly end up in the
// right arena.
@@ -523,7 +562,9 @@ TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
int block_start_row = 50;
ASSERT_OK(iter->SeekToOrdinal(block_start_row));
ASSERT_OK(iter->PrepareBatch(block.nrows(),
DeltaIterator::PREPARE_FOR_APPLY));
- ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block));
+ SelectionVector sv(block.nrows());
+ sv.SetAllTrue();
+ ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
@@ -533,7 +574,7 @@ TEST_F(TestDeltaMemStore, TestIteratorDoesUpdates) {
// Apply the next block
block_start_row += block.nrows();
ASSERT_OK(iter->PrepareBatch(block.nrows(),
DeltaIterator::PREPARE_FOR_APPLY));
- ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block));
+ ASSERT_OK(iter->ApplyUpdates(kIntColumn, &block, sv));
for (int i = 0; i < 100; i++) {
int actual_row = block_start_row + i;
ASSERT_EQ(actual_row * 10, block[i]) << "at row " << actual_row;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 212873d..8676d33 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -275,8 +275,9 @@ Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag
flag) {
return Status::OK();
}
-Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) {
- return preparer_.ApplyUpdates(col_to_apply, dst);
+Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) {
+ return preparer_.ApplyUpdates(col_to_apply, dst, filter);
}
Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index fa093b3..d9f094b 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -201,7 +201,8 @@ class DMSIterator : public DeltaIterator {
Status PrepareBatch(size_t nrows, PrepareFlag flag) override;
- Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst) override;
+ Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
+ const SelectionVector& filter) override;
Status ApplyDeletes(SelectionVector* sel_vec) override;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1e27c9fc/src/kudu/tablet/tablet-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.h
b/src/kudu/tablet/tablet-test-util.h
index f724533..89a816e 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -399,10 +399,13 @@ class MirroredDeltas {
// Applies tracked UPDATE and REINSERT values to 'cb'.
//
+ // Rows not set in 'filter' are skipped.
+ //
// Deltas not relevant to 'ts' are skipped. The set of rows considered is
// determined by 'start_row_idx' and the number of rows in 'cb'.
Status ApplyUpdates(const Schema& projection, Timestamp ts,
- rowid_t start_row_idx, int col_idx, ColumnBlock* cb) {
+ rowid_t start_row_idx, int col_idx, ColumnBlock* cb,
+ const SelectionVector& filter) {
for (int i = 0; i < cb->nrows(); i++) {
rowid_t row_idx = start_row_idx + i;
for (const auto& e : all_deltas_[row_idx]) {
@@ -411,6 +414,9 @@ class MirroredDeltas {
// be irrelevant.
break;
}
+ if (!filter.IsRowSelected(i)) {
+ continue;
+ }
RowChangeList changes(e.second);
if (changes.is_delete()) {
continue;
@@ -813,6 +819,21 @@ void RunDeltaFuzzTest(const DeltaStore& store,
start_row_idx, batch_size));
ASSERT_OK(iter->PrepareBatch(batch_size,
DeltaIterator::PREPARE_FOR_APPLY));
+ // Test ApplyDeletes: the selection vector is all true and a row is unset
+ // if the last relevant update deleted it.
+ //
+ // Note: we retain 'actual_deleted' for use as a filter in the
+ // ApplyUpdates test below.
+ SelectionVector actual_deleted(batch_size);
+ {
+ SelectionVector expected_deleted(batch_size);
+ expected_deleted.SetAllTrue();
+ actual_deleted.SetAllTrue();
+ ASSERT_OK(mirror->ApplyDeletes(ts, start_row_idx, &expected_deleted));
+ ASSERT_OK(iter->ApplyDeletes(&actual_deleted));
+ ASSERT_EQ(expected_deleted, actual_deleted);
+ }
+
// Test ApplyUpdates: all relevant updates are applied to the column
block.
for (int j = 0; j < opts.projection->num_columns(); j++) {
SCOPED_TRACE(strings::Substitute("Column $0", j));
@@ -823,24 +844,13 @@ void RunDeltaFuzzTest(const DeltaStore& store,
actual_scb[k] = 0;
}
ASSERT_OK(mirror->ApplyUpdates(*opts.projection, ts, start_row_idx, j,
- &expected_scb));
- ASSERT_OK(iter->ApplyUpdates(j, &actual_scb));
+ &expected_scb, actual_deleted));
+ ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, actual_deleted));
ASSERT_EQ(expected_scb, actual_scb)
<< "Expected column block: " << expected_scb.ToString()
<< "\nActual column block: " << actual_scb.ToString();
}
- // Test ApplyDeletes: the selection vector is all true and a row is unset
- // if the last relevant update deleted it.
- {
- SelectionVector expected_sv(batch_size);
- SelectionVector actual_sv(batch_size);
- expected_sv.SetAllTrue();
- actual_sv.SetAllTrue();
- ASSERT_OK(mirror->ApplyDeletes(ts, start_row_idx, &expected_sv));
- ASSERT_OK(iter->ApplyDeletes(&actual_sv));
- ASSERT_EQ(expected_sv, actual_sv);
- }
start_row_idx += batch_size;
}