This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 99ce007 Correctly set repaired data tracking flag on range commands
99ce007 is described below
commit 99ce007c5beb7988ce83fb1443a1e0ca259264cc
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Tue Feb 12 15:21:25 2019 +0000
Correctly set repaired data tracking flag on range commands
Patch by Sam Tunnicliffe; reviewed by Jordan West for CASSANDRA-15019
---
CHANGES.txt | 1 +
.../org/apache/cassandra/service/StorageProxy.java | 16 ++-
.../distributed/test/RepairDigestTrackingTest.java | 152 +++++++++++++++++++++
3 files changed, 162 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2254452..b511c18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019)
* Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
* Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
* Add Zstd compressor (CASSANDRA-14482)
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 439d8cf..ce78674 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2047,6 +2047,15 @@ public class StorageProxy implements StorageProxyMBean
private SingleRangeResponse query(ReplicaPlan.ForRangeRead
replicaPlan, boolean isFirst)
{
PartitionRangeReadCommand rangeCommand =
command.forSubRange(replicaPlan.range(), isFirst);
+ // If enabled, request repaired data tracking info from full
replicas but
+ // only if there are multiple full replicas to compare results from
+ if
(DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled()
+ && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
+ {
+ command.trackRepairedStatus();
+ rangeCommand.trackRepairedStatus();
+ }
+
ReplicaPlan.SharedForRangeRead sharedReplicaPlan =
ReplicaPlan.shared(replicaPlan);
ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
= ReadRepair.create(command, sharedReplicaPlan,
queryStartNanoTime);
@@ -2055,13 +2064,6 @@ public class StorageProxy implements StorageProxyMBean
ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
= new ReadCallback<>(resolver, rangeCommand,
sharedReplicaPlan, queryStartNanoTime);
- // If enabled, request repaired data tracking info from full
replicas but
- // only if there are multiple full replicas to compare results from
- if
(DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
- && replicaPlan.contacts().filter(Replica::isFull).size() >
1)
- {
- command.trackRepairedStatus();
- }
if (replicaPlan.contacts().size() == 1 &&
replicaPlan.contacts().get(0).isSelf())
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
new file mode 100644
index 0000000..a987ea3
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageProxy;
+
+public class RepairDigestTrackingTest extends DistributedTestBase implements
Serializable
+{
+
+ @Test
+ public void testInconsistenciesFound() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+
+ cluster.get(1).runOnInstance(() -> {
+
StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ });
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c
INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl (k, c, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
+
+ cluster.get(1).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
+ );
+ cluster.get(2).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
+ );
+
+ for (int i = 10; i < 20; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl (k, c, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
+
+ cluster.get(1).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
+ );
+ cluster.get(2).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
+ );
+
+ cluster.get(1).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
+ );
+ cluster.get(2).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
+ );
+
+ cluster.get(2).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired)
+ );
+
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
+ cluster.get(1).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
+ );
+ cluster.get(2).runOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired)
+ );
+
+ long ccBefore = cluster.get(1).callOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
+ );
+
+ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+ long ccAfter = cluster.get(1).callOnInstance(() ->
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
+ );
+
+ Assert.assertEquals("confirmed count should differ by 1 after
range read", ccBefore + 1, ccAfter);
+ }
+ }
+
+ private void assertNotRepaired(SSTableReader reader) {
+ Assert.assertTrue("repaired at is set for sstable: " +
reader.descriptor, getRepairedAt(reader) ==
ActiveRepairService.UNREPAIRED_SSTABLE);
+ }
+
+ private void assertRepaired(SSTableReader reader) {
+ Assert.assertTrue("repaired at is not set for sstable: " +
reader.descriptor, getRepairedAt(reader) > 0);
+ }
+
+ private long getRepairedAt(SSTableReader reader)
+ {
+ Descriptor descriptor = reader.descriptor;
+ try
+ {
+ Map<MetadataType, MetadataComponent> metadata =
descriptor.getMetadataSerializer()
+
.deserialize(descriptor, EnumSet.of(MetadataType.STATS));
+
+ StatsMetadata stats = (StatsMetadata)
metadata.get(MetadataType.STATS);
+ return stats.repairedAt;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private void markRepaired(SSTableReader reader) {
+ Descriptor descriptor = reader.descriptor;
+ try
+ {
+
descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor,
System.currentTimeMillis(), null, false);
+ reader.reloadSSTableMetadata();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]