This is an automated email from the ASF dual-hosted git repository.
mpercy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d17e3ef KUDU-2741: De-flake TestMergeIterator.TestDeDupGhostRows
d17e3ef is described below
commit d17e3ef345498777e32f2b275f952abac1369a7a
Author: Mike Percy <[email protected]>
AuthorDate: Tue Mar 12 15:23:37 2019 -0700
KUDU-2741: De-flake TestMergeIterator.TestDeDupGhostRows
This test had a couple of test-only bugs related to bookkeeping,
resulting in this test being flaky in certain scenarios. This patch
fixes those bugs:
1. Ensure that we don't generate duplicate keys in the same list (a.k.a.
RowSet) during the test because that can't occur "in the wild" and
the MergeIterator does not support it.
2. Ensure that we don't maintain more than one instance of a single
row key in the 'expected' result set. Switch to using an ordered map
to enforce that test invariant. An additional benefit to using that
data structure is that we can skip the sorting step when iterating
the expected results.
Before this change, the test had a failure rate of about 10% according
to the flaky test dashboard. After this change, the tests passes
reliably and succeeded 1024/1024 times when I looped it using dist-test:
http://dist-test.cloudera.org/job?job_id=mpercy.1552437266.46341
Change-Id: I5ca9cdfa4620b1a9d4144de2289cc80ac37060aa
Reviewed-on: http://gerrit.cloudera.org:8080/12735
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/common/generic_iterators-test.cc | 99 ++++++++++++++++++-------------
1 file changed, 59 insertions(+), 40 deletions(-)
diff --git a/src/kudu/common/generic_iterators-test.cc
b/src/kudu/common/generic_iterators-test.cc
index 2578491..d002e83 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -20,12 +20,14 @@
#include <algorithm>
#include <cstdint>
#include <cstdlib>
+#include <map>
#include <memory>
#include <ostream>
#include <random>
#include <string>
#include <unordered_map>
#include <unordered_set>
+#include <utility>
#include <vector>
#include <gflags/gflags.h>
@@ -58,6 +60,7 @@ DEFINE_int32(num_rows, 1000, "Number of entries per list");
DEFINE_int32(num_iters, 1, "Number of times to run merge");
DECLARE_bool(materializing_iterator_do_pushdown);
+using std::map;
using std::string;
using std::unique_ptr;
using std::unordered_set;
@@ -241,23 +244,26 @@ class TestIntRangePredicate {
void TestMerge(const Schema& schema, const TestIntRangePredicate &predicate,
bool overlapping_ranges = true, bool include_deleted_rows =
false) {
struct List {
+ explicit List(int num_rows)
+ : sv(new SelectionVector(num_rows)) {
+ ints.reserve(num_rows);
+ is_deleted.reserve(num_rows);
+ }
+
vector<uint32_t> ints;
vector<uint8_t> is_deleted;
unique_ptr<SelectionVector> sv;
};
+
vector<List> all_ints;
- vector<uint32_t> expected;
- unordered_set<uint32_t> expected_ints_deleted;
+ map<uint32_t, bool> expected;
unordered_set<uint32_t> seen_live;
- expected.reserve(FLAGS_num_rows * FLAGS_num_lists);
Random prng(SeedRandom());
uint32_t entry = 0;
for (int i = 0; i < FLAGS_num_lists; i++) {
- vector<uint32_t> ints;
- vector<uint8_t> deleted_status;
- ints.reserve(FLAGS_num_rows);
- unique_ptr<SelectionVector> sv(new SelectionVector(FLAGS_num_rows));
+ List list(FLAGS_num_rows);
+ unordered_set<uint32_t> seen_this_list;
if (overlapping_ranges) {
entry = 0;
@@ -268,54 +274,60 @@ void TestMerge(const Schema& schema, const
TestIntRangePredicate &predicate,
// The merge iterator does not support duplicate non-deleted keys.
while (true) {
potential = entry + prng.Uniform(FLAGS_num_rows * FLAGS_num_lists *
10);
+ // Only one live version of a row can exist across all lists.
+ // Including several duplicate deleted keys is fine.
if (ContainsKey(seen_live, potential)) continue;
+ // No duplicate keys are allowed in the same list (same RowSet).
+ if (ContainsKey(seen_this_list, potential)) continue;
+ InsertOrDie(&seen_this_list, potential);
+
// If we are including deleted rows, with some probability make this a
// deleted row.
if (include_deleted_rows) {
is_deleted = prng.OneIn(4);
if (is_deleted) {
- // Including several duplicate deleted keys is fine.
- expected_ints_deleted.insert(potential);
break;
}
}
// This is a new live row. Un-mark it as deleted if necessary.
- InsertOrDie(&seen_live, potential);
- if (include_deleted_rows && ContainsKey(expected_ints_deleted,
potential)) {
- CHECK_EQ(1, expected_ints_deleted.erase(potential));
+ if (!is_deleted) {
+ InsertOrDie(&seen_live, potential);
}
break;
}
entry = potential;
- ints.emplace_back(entry);
- deleted_status.emplace_back(is_deleted);
+ list.ints.emplace_back(entry);
+ list.is_deleted.emplace_back(is_deleted);
// Some entries are randomly deselected in order to exercise the
selection
// vector logic in the MergeIterator. This is reflected both in the input
- // to the MeregIterator as well as the expected output (see below).
+ // to the MergeIterator as well as the expected output (see below).
bool row_selected = prng.Uniform(8) > 0;
VLOG(2) << Substitute("Row $0 with value $1 selected? $2",
j, entry, row_selected);
if (row_selected) {
- sv->SetRowSelected(j);
+ list.sv->SetRowSelected(j);
} else {
- sv->SetRowUnselected(j);
+ list.sv->SetRowUnselected(j);
}
- // Consider the predicate and the selection vector before pushing to
'expected'.
+ // Consider the predicate and the selection vector before inserting this
+ // row into 'expected'.
if (entry >= predicate.lower_ && entry < predicate.upper_ &&
row_selected) {
- expected.emplace_back(entry);
+ auto result = expected.emplace(entry, is_deleted);
+ if (!result.second) {
+ // We should only be overwriting a deleted row.
+ bool existing_is_deleted = result.first->second;
+ CHECK_EQ(true, existing_is_deleted);
+ result.first->second = is_deleted;
+ }
}
}
- all_ints.emplace_back(List{ std::move(ints), std::move(deleted_status),
std::move(sv) });
- }
-
- LOG_TIMING(INFO, "sorting the expected results") {
- std::sort(expected.begin(), expected.end());
+ all_ints.emplace_back(std::move(list));
}
LOG_TIMING(INFO, "shuffling the inputs") {
@@ -327,12 +339,13 @@ void TestMerge(const Schema& schema, const
TestIntRangePredicate &predicate,
VLOG(1) << "Predicate expects " << expected.size() << " results: " <<
expected;
const int kIsDeletedIndex = schema.find_first_is_deleted_virtual_column();
+
for (int trial = 0; trial < FLAGS_num_iters; trial++) {
vector<unique_ptr<RowwiseIterator>> to_merge;
- for (const auto& e : all_ints) {
- unique_ptr<VectorIterator> vec_it(new VectorIterator(e.ints,
e.is_deleted, schema));
+ for (const auto& list : all_ints) {
+ unique_ptr<VectorIterator> vec_it(new VectorIterator(list.ints,
list.is_deleted, schema));
vec_it->set_block_size(10);
- vec_it->set_selection_vector(e.sv.get());
+ vec_it->set_selection_vector(list.sv.get());
unique_ptr<RowwiseIterator>
mat_it(NewMaterializingIterator(std::move(vec_it)));
vector<unique_ptr<RowwiseIterator>> input;
input.emplace_back(std::move(mat_it));
@@ -353,29 +366,35 @@ void TestMerge(const Schema& schema, const
TestIntRangePredicate &predicate,
RowBlock dst(schema, 100, nullptr);
size_t total_idx = 0;
+ auto expected_iter = expected.cbegin();
while (merger->HasNext()) {
ASSERT_OK(merger->NextBlock(&dst));
ASSERT_GT(dst.nrows(), 0) <<
"if HasNext() returns true, must return some rows";
for (int i = 0; i < dst.nrows(); i++) {
- uint32_t row_val = *schema.ExtractColumnFromRow<UINT32>(dst.row(i),
kValColIdx);
- ASSERT_GE(row_val, predicate.lower_) << "Yielded integer excluded by
predicate";
- ASSERT_LT(row_val, predicate.upper_) << "Yielded integer excluded by
predicate";
- if (expected[total_idx] != row_val) {
- ASSERT_EQ(expected[total_idx], row_val) <<
- "Yielded out of order at idx " << total_idx;
- }
+ ASSERT_NE(expected.end(), expected_iter);
+ uint32_t expected_key = expected_iter->first;
+ bool expected_is_deleted = expected_iter->second;
+ uint32_t row_key = *schema.ExtractColumnFromRow<UINT32>(dst.row(i),
kValColIdx);
+ ASSERT_GE(row_key, predicate.lower_) << "Yielded integer excluded by
predicate";
+ ASSERT_LT(row_key, predicate.upper_) << "Yielded integer excluded by
predicate";
+ EXPECT_EQ(expected_key, row_key) << "Yielded out of order at idx "
<< total_idx;
+ bool is_deleted = false;
if (include_deleted_rows) {
- ASSERT_NE(Schema::kColumnNotFound, kIsDeletedIndex);
- bool is_deleted =
*schema.ExtractColumnFromRow<IS_DELETED>(dst.row(i), kIsDeletedIndex);
- ASSERT_EQ(is_deleted, ContainsKey(expected_ints_deleted, row_val))
- << "Row " << row_val << " has unexpected IS_DELETED value at
index " << total_idx;
+ CHECK_NE(Schema::kColumnNotFound, kIsDeletedIndex);
+ is_deleted = *schema.ExtractColumnFromRow<IS_DELETED>(dst.row(i),
kIsDeletedIndex);
+ EXPECT_EQ(expected_is_deleted, is_deleted)
+ << "Row " << row_key << " has unexpected IS_DELETED value at
index " << total_idx;
}
- total_idx++;
+ VLOG(2) << "Observed: val=" << row_key << ", is_deleted=" <<
is_deleted;
+ VLOG(2) << "Expected: val=" << expected_key << ", is_deleted=" <<
expected_is_deleted;
+ ++expected_iter;
+ ++total_idx;
}
}
- ASSERT_EQ(total_idx, expected.size());
+ ASSERT_EQ(expected.size(), total_idx);
+ ASSERT_EQ(expected.end(), expected_iter);
LOG(INFO) << "Total number of comparisons performed: "
<< GetMergeIteratorNumComparisonsForTests(merger);