This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 89a6a14 exactly_once_writes-itest: fix TSAN data race
89a6a14 is described below
commit 89a6a142ff9243cb986750e962fdfe6e7bfb4e78
Author: Adar Dembo <[email protected]>
AuthorDate: Mon Mar 11 18:12:37 2019 -0700
exactly_once_writes-itest: fix TSAN data race
The accesses of ExternalDaemon::bound_rpc_addr() in the test's helper
threads raced with the call to RestartAnyCrashedTabletServers() in the main
thread; the latter resets ExternalDaemon.status_, the member from which the
RPC address is derived.
Noticed this on the flaky test dashboard.
Change-Id: Iafd901b0efbe205136eae5ea5de31b6fe6565480
Reviewed-on: http://gerrit.cloudera.org:8080/12720
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
---
.../integration-tests/exactly_once_writes-itest.cc | 50 +++++++++++-----------
1 file changed, 26 insertions(+), 24 deletions(-)
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc
b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index c1fa863..7ab92c4 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -80,14 +80,18 @@ class ExactlyOnceSemanticsITest : public
TabletServerIntegrationTestBase {
FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
}
- // Writes 'num_rows' to the tablet server listening on 'address' and
collects all success
- // responses. If a write fails for some reason, continues to try until it
succeeds. Since
- // followers are also able to return responses to the client, writes should
succeed in bounded
- // time. Uses 'random' to generate the rows to write so that multiple
threads try to write the
- // same rows.
- void WriteRowsAndCollectResponses(int thread_idx,
+ // Writes rows to the tablet server listening on 'address' and collects all
+ // successful responses.
+ //
+ // If a write fails for some reason, continues to try until it succeeds.
Since
+ // followers are also able to return responses to the client, writes should
+ // succeed in bounded time.
+ //
+ // Uses a PRNG to generate the rows to write so that multiple threads try to
+ // write the same rows.
+ void WriteRowsAndCollectResponses(Sockaddr address,
+ int thread_idx,
int num_batches,
- int batch_size,
Barrier* barrier,
vector<WriteResponsePB>* responses);
@@ -97,21 +101,20 @@ class ExactlyOnceSemanticsITest : public
TabletServerIntegrationTestBase {
bool allow_crashes);
protected:
- int seed_;
+ const int kBatchSize = 10;
+ int seed_;
};
-void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
+void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(Sockaddr address,
+ int thread_idx,
int num_batches,
- int batch_size,
Barrier* barrier,
vector<WriteResponsePB>* responses) {
const int64_t kMaxAttempts = 100000;
// Set the same seed in all threads so that they generate the same requests.
Random random(seed_);
- Sockaddr address = cluster_.get()->tablet_server(
- thread_idx % FLAGS_num_replicas)->bound_rpc_addr();
rpc::RpcController controller;
@@ -141,7 +144,7 @@ void
ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
// For 1/3 of the batches, perform an empty write. This will make sure
that we also stress
// the path where writes aren't serialized by row locks.
if (i % 3 != 0) {
- for (int j = 0; j < batch_size; j++) {
+ for (int j = 0; j < kBatchSize; j++) {
int row_key = random.Next() % kNumDifferentRows;
AddTestRowToPB(RowOperationsPB::INSERT, schema, row_key, row_key, "",
request.mutable_row_operations());
@@ -190,7 +193,6 @@ void
ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics(
const vector<string>& master_flags,
int num_batches,
bool allow_crashes) {
- const int kBatchSize = 10;
const int kNumThreadsPerReplica = 2;
NO_FATALS(BuildAndStart(ts_flags, master_flags));
@@ -198,33 +200,33 @@ void
ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics(
vector<itest::TServerDetails*> tservers;
AppendValuesFromMap(tablet_servers_, &tservers);
- vector<scoped_refptr<kudu::Thread>> threads;
-
const int num_threads = FLAGS_num_replicas * kNumThreadsPerReplica;
vector<vector<WriteResponsePB>> responses(num_threads);
Barrier barrier(num_threads);
+ vector<scoped_refptr<kudu::Thread>> threads;
+ threads.reserve(num_threads);
// Create kNumThreadsPerReplica write threads per replica.
for (int i = 0; i < num_threads; i++) {
int thread_idx = i;
- int ts_idx = thread_idx % FLAGS_num_replicas;
- scoped_refptr<kudu::Thread> thread;
+ Sockaddr address = cluster_->tablet_server(
+ thread_idx % FLAGS_num_replicas)->bound_rpc_addr();
string worker_name = strings::Substitute(
- "writer-$0-$1", thread_idx,
- cluster_.get()->tablet_server(ts_idx)->bound_rpc_addr().ToString());
+ "writer-$0-$1", thread_idx, address.ToString());
- kudu::Thread::Create(
+ scoped_refptr<kudu::Thread> thread;
+ ASSERT_OK(kudu::Thread::Create(
"TestWritesWithExactlyOnceSemantics",
worker_name,
&ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses,
this,
+ address,
thread_idx,
num_batches,
- kBatchSize,
&barrier,
&responses[i],
- &thread);
- threads.push_back(thread);
+ &thread));
+ threads.emplace_back(thread);
}
bool done = false;