This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2ccd0f ensure hint window is persistent across restarts of a node
b2ccd0f is described below
commit b2ccd0f3f588a34cd68222bdacd1914478914ac9
Author: kurt <[email protected]>
AuthorDate: Tue Feb 20 03:49:33 2018 +0000
ensure hint window is persistent across restarts of a node
patch by Kurt Greaves; reviewed by Brandon Williams, Mick Semb Wever and
Stefan Miklosovic for CASSANDRA-14309
---
CHANGES.txt | 1 +
NEWS.txt | 10 +++++
conf/cassandra.yaml | 14 +++++++
doc/source/configuration/cass_yaml_file.rst | 15 +++++++
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 5 +++
.../org/apache/cassandra/hints/HintsBuffer.java | 26 ++++++++++++
.../apache/cassandra/hints/HintsBufferPool.java | 17 ++++++++
.../org/apache/cassandra/hints/HintsService.java | 17 +++++++-
.../org/apache/cassandra/hints/HintsStore.java | 8 ++++
.../apache/cassandra/hints/HintsWriteExecutor.java | 28 +++++++++++--
.../org/apache/cassandra/service/StorageProxy.java | 34 +++++++++++++--
.../apache/cassandra/hints/HintsBufferTest.java | 47 +++++++++++++++++++++
.../apache/cassandra/hints/HintsServiceTest.java | 48 ++++++++++++++++++++++
14 files changed, 262 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0103be2..3208ff8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
* Allow to GRANT or REVOKE multiple permissions in a single statement
(CASSANDRA-17030)
* Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
* Log time spent writing keys during compaction (CASSANDRA-17037)
diff --git a/NEWS.txt b/NEWS.txt
index 5c48685..7850b09 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -54,6 +54,16 @@ New features
(track_warnings.local_read_size.warn_threshold_kb and
track_warnings.local_read_size.abort_threshold_kb),
and RowIndexEntry estimated memory size
(track_warnings.row_index_size.warn_threshold_kb and
track_warnings.row_index_size.abort_threshold_kb) are supported; more
checks will be added over time.
+ - Prior to this version, the hint system was storing a window of hints as
defined by
+ configuration property max_hint_window_in_ms, however this window is not
persistent across restarts.
+ For example, if a node is restarted, it will be still eligible for a
hint to be sent to it because it
+ was down less than max_hint_window_in_ms. Hence if that node continues
restarting without hint delivery completing,
+ hints will be sent to that node indefinitely which would occupy more and
more disk space.
+ This behaviour was changed in CASSANDRA-14309. From now on, by default,
if a node is not down longer than
+ max_hint_window_in_ms, there is an additional check to see if there is a
hint to be delivered which is older
+ than max_window_in_ms. If there is, a hint is not persisted. If there is
not, it is.
+ This behaviour might be reverted as it was in previous version by
property hint_window_persistent_enabled by
+ setting it to false. This property is by default set to true.
Upgrading
---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a9efa8d..87df25a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -98,6 +98,20 @@ auto_hints_cleanup_enabled: false
# parameters:
# -
+# Enable / disable persistent hint windows.
+#
+# If set to false, a hint will be stored only in case a respective node
+# that hint is for is down less than or equal to max_hint_window_in_ms.
+#
+# If set to true, a hint will be stored in case there is not any
+# hint which was stored earlier than max_hint_window_in_ms. This is for cases
+# when a node keeps to restart and hints are not delivered yet, we would be
saving
+# hints for that node indefinitely.
+#
+# Defaults to true.
+#
+# hint_window_persistent_enabled: true
+
# Maximum throttle in KBs per second, total. This will be
# reduced proportionally to the number of nodes in the cluster.
batchlog_replay_throttle_in_kb: 1024
diff --git a/doc/source/configuration/cass_yaml_file.rst
b/doc/source/configuration/cass_yaml_file.rst
index 8a14336..09b5cb4 100644
--- a/doc/source/configuration/cass_yaml_file.rst
+++ b/doc/source/configuration/cass_yaml_file.rst
@@ -150,6 +150,21 @@ are supported.
# parameters:
# -
+``hint_window_persistent_enabled``
+-------------------------
+
+*This option is commented out by default.*
+
+If set to false, a hint will be stored only in case a respective node
+that hint is for is down less than or equal to max_hint_window_in_ms.
+
+If set to true, a hint will be stored in case there is not any
+hint which was stored earlier than max_hint_window_in_ms. This is for cases
+when a node keeps restarting and hints are not delivered yet, we would be
saving
+hints for that node indefinitely.
+
+*Default Value:* true
+
``batchlog_replay_throttle_in_kb``
----------------------------------
Maximum throttle in KBs per second, total. This will be
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 3b574ac..b9b5975 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -74,6 +74,7 @@ public class Config
public Set<String> hinted_handoff_disabled_datacenters =
Sets.newConcurrentHashSet();
public volatile int max_hint_window_in_ms = 3 * 3600 * 1000; // three hours
public String hints_directory;
+ public boolean hint_window_persistent_enabled = true;
public ParameterizedClass seed_provider;
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5195646..7a3164a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2514,6 +2514,11 @@ public class DatabaseDescriptor
return new File(conf.hints_directory);
}
+ public static boolean hintWindowPersistentEnabled()
+ {
+ return conf.hint_window_persistent_enabled;
+ }
+
public static File getSerializedCachePath(CacheType cacheType, String
version, String extension)
{
String name = cacheType.toString()
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java
b/src/java/org/apache/cassandra/hints/HintsBuffer.java
index d944b4d..e86ce26 100644
--- a/src/java/org/apache/cassandra/hints/HintsBuffer.java
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -26,11 +26,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
@@ -58,6 +60,7 @@ final class HintsBuffer
private final ConcurrentMap<UUID, Queue<Integer>> offsets;
private final OpOrder appendOrder;
+ private final ConcurrentMap<UUID, Long> earliestHintByHost; // Stores time
of the earliest hint in the buffer for each host
private HintsBuffer(ByteBuffer slab)
{
@@ -66,6 +69,7 @@ final class HintsBuffer
position = new AtomicLong();
offsets = new ConcurrentHashMap<>();
appendOrder = new OpOrder();
+ earliestHintByHost = new ConcurrentHashMap<>();
}
static HintsBuffer create(int slabSize)
@@ -141,6 +145,21 @@ final class HintsBuffer
};
}
+ /**
+ * Retrieve the time of the earliest hint in the buffer for a specific node
+ * @param hostId UUID of the node
+ * @return timestamp for the earliest hint in the buffer, or {@link
System#currentTimeMillis()}
+ */
+ long getEarliestHintTime(UUID hostId)
+ {
+ return earliestHintByHost.getOrDefault(hostId,
Clock.Global.currentTimeMillis());
+ }
+
+ void clearEarliestHintForHostId(UUID hostId)
+ {
+ earliestHintByHost.remove(hostId);
+ }
+
@SuppressWarnings("resource")
Allocation allocate(int hintSize)
{
@@ -222,8 +241,15 @@ final class HintsBuffer
void write(Iterable<UUID> hostIds, Hint hint)
{
write(hint);
+ long ts = Clock.Global.currentTimeMillis();
for (UUID hostId : hostIds)
+ {
+ // We only need the time of the first hint in the buffer
+ if (DatabaseDescriptor.hintWindowPersistentEnabled())
+ earliestHintByHost.putIfAbsent(hostId, ts);
+
put(hostId, offset);
+ }
}
public void close()
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java
b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
index 8d1db8d..78f07dd 100644
--- a/src/java/org/apache/cassandra/hints/HintsBufferPool.java
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.hints;
import java.io.Closeable;
+import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -65,6 +66,22 @@ final class HintsBufferPool implements Closeable
}
}
+ /**
+ * Get the earliest hint for a specific node from all buffers
+ * @param hostId UUID of the node
+ * @return timestamp for the earliest hint
+ */
+ long getEarliestHintForHost(UUID hostId)
+ {
+ long min = currentBuffer().getEarliestHintTime(hostId);
+ Iterator<HintsBuffer> it = reserveBuffers.iterator();
+
+ while (it.hasNext())
+ min = Math.min(min, it.next().getEarliestHintTime(hostId));
+
+ return min;
+ }
+
private HintsBuffer.Allocation allocate(int hintSize)
{
HintsBuffer current = currentBuffer();
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java
b/src/java/org/apache/cassandra/hints/HintsService.java
index 3fcc00e..f3957a1 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -193,7 +194,7 @@ public final class HintsService implements HintsServiceMBean
// judicious use of streams: eagerly materializing probably cheaper
// than performing filters / translations 2x extra via
Iterables.filter/transform
List<UUID> hostIds = replicas.stream()
- .filter(StorageProxy::shouldHint)
+ .filter(replica -> StorageProxy.shouldHint(replica, false))
.map(replica ->
StorageService.instance.getHostIdForEndpoint(replica.endpoint()))
.collect(Collectors.toList());
@@ -428,6 +429,20 @@ public final class HintsService implements
HintsServiceMBean
return dispatchExecutor.transfer(catalog, hostIdSupplier);
}
+ /**
+ * Get the earliest hint written for a particular node,
+ * @param hostId UUID of the node to check it's hints.
+ * @return earliest hint as per unix time or Long.MIN_VALUE if hostID is
null
+ */
+ public long getEarliestHintForHost(UUID hostId)
+ {
+ // Need to check only the first descriptor + all buffers.
+ HintsStore store = catalog.get(hostId);
+ HintsDescriptor desc = store.getFirstDescriptor();
+ long timestamp = desc == null ? Clock.Global.currentTimeMillis() :
desc.timestamp;
+ return Math.min(timestamp, bufferPool.getEarliestHintForHost(hostId));
+ }
+
HintsCatalog getCatalog()
{
return catalog;
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java
b/src/java/org/apache/cassandra/hints/HintsStore.java
index 29e79bf..bc2fee7 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -247,6 +247,14 @@ final class HintsStore
corruptedFiles.add(descriptor);
}
+ /**
+ * @return a copy of the first {@link HintsDescriptor} in the queue for
dispatch or {@code null} if queue is empty.
+ */
+ HintsDescriptor getFirstDescriptor()
+ {
+ return dispatchDequeue.peekFirst();
+ }
+
/*
* Methods dealing with HintsWriter.
*
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index c4bfff0..8089d61 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -194,7 +194,7 @@ final class HintsWriteExecutor
{
HintsBuffer buffer = bufferPool.currentBuffer();
buffer.waitForModifications();
- stores.forEach(store ->
flush(buffer.consumingHintsIterator(store.hostId), store));
+ stores.forEach(store ->
flush(buffer.consumingHintsIterator(store.hostId), store, buffer));
}
}
@@ -216,10 +216,10 @@ final class HintsWriteExecutor
private void flush(HintsBuffer buffer)
{
- buffer.hostIds().forEach(hostId ->
flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
+ buffer.hostIds().forEach(hostId ->
flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId), buffer));
}
- private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
+ private void flush(Iterator<ByteBuffer> iterator, HintsStore store,
HintsBuffer buffer)
{
while (true)
{
@@ -231,7 +231,27 @@ final class HintsWriteExecutor
// exceeded the size limit for an individual file, but still have
more to write
// close the current writer and continue flushing to a new one in
the next iteration
- store.closeWriter();
+ try
+ {
+ store.closeWriter();
+ }
+ finally
+ {
+ /*
+ We remove the earliest hint for a respective hostId of the
store from the buffer,
+ we are removing it specifically after we closed the store
above in try block
+ so hints are persisted on disk before.
+
+ There is a periodic flushing of a buffer driven by
hints_flush_period_in_ms and clearing
+ this entry upon every flush would remove the information what
is the earliest hint in the buffer
+ for a respective node prematurely.
+
+ Since this flushing method is called for every host id a
buffer holds, we will eventually
+ remove all hostIds of the earliest hints of the buffer, and it
will be added again as soon as there
+ is a new hint for that node to be delivered.
+ */
+ buffer.clearEarliestHintForHostId(store.hostId);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c02a77d..c8495dc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,6 +44,8 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.slf4j.Logger;
@@ -2192,6 +2194,11 @@ public class StorageProxy implements StorageProxyMBean
public static boolean shouldHint(Replica replica)
{
+ return shouldHint(replica, true);
+ }
+
+ public static boolean shouldHint(Replica replica, boolean
tryEnablePersistentWindow)
+ {
if (!DatabaseDescriptor.hintedHandoffEnabled())
return false;
if (replica.isTransient() || replica.isSelf())
@@ -2207,12 +2214,31 @@ public class StorageProxy implements StorageProxyMBean
return false;
}
}
- boolean hintWindowExpired =
Gossiper.instance.getEndpointDowntime(replica.endpoint()) >
DatabaseDescriptor.getMaxHintWindow();
- if (hintWindowExpired)
+
+ InetAddressAndPort endpoint = replica.endpoint();
+ int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
+ long endpointDowntime =
Gossiper.instance.getEndpointDowntime(endpoint);
+ boolean hintWindowExpired = endpointDowntime > maxHintWindow;
+
+ if (tryEnablePersistentWindow && !hintWindowExpired &&
DatabaseDescriptor.hintWindowPersistentEnabled())
{
- HintsService.instance.metrics.incrPastWindow(replica.endpoint());
- Tracing.trace("Not hinting {} which has been down {} ms", replica,
Gossiper.instance.getEndpointDowntime(replica.endpoint()));
+ UUID hostIdForEndpoint =
StorageService.instance.getHostIdForEndpoint(endpoint);
+ if (hostIdForEndpoint != null)
+ {
+ long earliestHint =
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
+ hintWindowExpired = Clock.Global.currentTimeMillis() -
maxHintWindow > earliestHint;
+ if (hintWindowExpired)
+ Tracing.trace("Not hinting {} for which there is the
earliest hint stored at {}", replica, earliestHint);
+ }
+ }
+ else if (hintWindowExpired)
+ {
+ Tracing.trace("Not hinting {} which has been down {} ms", replica,
endpointDowntime);
}
+
+ if (hintWindowExpired)
+ HintsService.instance.metrics.incrPastWindow(replica.endpoint());
+
return !hintWindowExpired;
}
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index 53c1c77..3020fb9 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -26,6 +26,7 @@ import java.util.zip.CRC32;
import com.google.common.collect.Iterables;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -40,12 +41,16 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.Clock;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import static junit.framework.Assert.*;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+@RunWith(BMUnitRunner.class)
public class HintsBufferTest
{
private static final String KEYSPACE = "hints_buffer_test";
@@ -157,6 +162,48 @@ public class HintsBufferTest
buffer.free();
}
+ static volatile long timestampForHint = 0;
+ // BM rule to get the timestamp that was used to store the hint so that we
avoid any flakiness in timestamps between
+ // when we send the hint and when it actually got written.
+ @Test
+ @BMRule(name = "GetHintTS",
+ targetClass="HintsBuffer$Allocation",
+ targetMethod="write(Iterable, Hint)",
+ targetLocation="AFTER INVOKE putIfAbsent",
+
action="org.apache.cassandra.hints.HintsBufferTest.timestampForHint = $ts")
+ public void testEarliestHintTime()
+ {
+ int hintSize = (int) Hint.serializer.serializedSize(createHint(0,
Clock.Global.currentTimeMillis()), MessagingService.current_version);
+ int entrySize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE;
+ // allocate a slab to fit 10 hints
+ int slabSize = entrySize * 10;
+
+ // use a fixed timestamp base for all mutation timestamps
+ long baseTimestamp = Clock.Global.currentTimeMillis();
+
+ HintsBuffer buffer = HintsBuffer.create(slabSize);
+ UUID uuid = UUID.randomUUID();
+ // Track the first hints time
+ try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize))
+ {
+ Hint hint = createHint(100, baseTimestamp);
+ allocation.write(Collections.singleton(uuid), hint);
+ }
+ long oldestHintTime = timestampForHint;
+
+ // Write some more hints to ensure we actually test getting the
earliest
+ for (int i = 0; i < 9; i++)
+ {
+ try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize))
+ {
+ Hint hint = createHint(i, baseTimestamp);
+ allocation.write(Collections.singleton(uuid), hint);
+ }
+ }
+ long earliest = buffer.getEarliestHintTime(uuid);
+ assertEquals(oldestHintTime, earliest);
+ }
+
private static int validateEntry(UUID hostId, ByteBuffer buffer, long
baseTimestamp, UUID[] load) throws IOException
{
CRC32 crc = new CRC32();
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index dd0eb5a..10a7040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.hints;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -29,9 +31,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import com.datastax.driver.core.utils.MoreFutures;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MockMessagingService;
@@ -40,12 +46,16 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import static org.apache.cassandra.hints.HintsTestUtil.MockFailureDetector;
import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+@RunWith(BMUnitRunner.class)
public class HintsServiceTest
{
private static final String KEYSPACE = "hints_service_test";
@@ -173,4 +183,42 @@ public class HintsServiceTest
assertTrue(dispatchOffset != null);
assertTrue(((ChecksummedDataInput.Position)
dispatchOffset).sourcePosition > 0);
}
+
+ // BM rule to get the timestamp that was used to store the hint so that we
avoid any flakiness in timestamps between
+ // when we send the hint and when it actually got written.
+ static volatile long timestampForHint = 0L;
+ @Test
+ @BMRule(name = "GetHintTS",
+ targetClass="HintsBuffer$Allocation",
+ targetMethod="write(Iterable, Hint)",
+ targetLocation="AFTER INVOKE putIfAbsent",
+ action="org.apache.cassandra.hints.HintsServiceTest.timestampForHint =
$ts")
+ public void testEarliestHint() throws InterruptedException
+ {
+ // create and write noOfHints using service
+ UUID hostId = StorageService.instance.getLocalHostUUID();
+ TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE,
TABLE);
+
+ long ts = System.currentTimeMillis();
+ DecoratedKey dkey = Util.dk(String.valueOf(1));
+ PartitionUpdate.SimpleBuilder builder =
PartitionUpdate.simpleBuilder(metadata, dkey).timestamp(ts);
+ builder.row("column0").add("val", "value0");
+ Hint hint = Hint.create(builder.buildAsMutation(), ts);
+ HintsService.instance.write(hostId, hint);
+ long oldestHintTime = timestampForHint;
+ Thread.sleep(1);
+ HintsService.instance.write(hostId, hint);
+ Thread.sleep(1);
+ HintsService.instance.write(hostId, hint);
+
+ // Close and fsync so that we get the timestamp from the descriptor
rather than the buffer.
+ HintsStore store = HintsService.instance.getCatalog().get(hostId);
+
HintsService.instance.flushAndFsyncBlockingly(Collections.singletonList(hostId));
+ store.closeWriter();
+
+ long earliest = HintsService.instance.getEarliestHintForHost(hostId);
+ assertEquals(oldestHintTime, earliest);
+ assertNotEquals(oldestHintTime, timestampForHint);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]