This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a6f690 Limit the maximum hints size per host
3a6f690 is described below
commit 3a6f6907314670fdb2b316db8f08ffd85da88851
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Dec 7 18:54:58 2021 -0800
Limit the maximum hints size per host
patch by Yifan Cai; reviewed by Dinesh Joshi, Francisco Guerrero for
CASSANDRA-17142
---
NEWS.txt | 3 +
conf/cassandra.yaml | 5 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 15 +++
.../apache/cassandra/hints/HintsDescriptor.java | 10 ++
.../cassandra/hints/HintsDispatchExecutor.java | 4 +-
.../org/apache/cassandra/hints/HintsService.java | 13 +++
.../org/apache/cassandra/hints/HintsStore.java | 18 ++-
.../org/apache/cassandra/hints/HintsWriter.java | 4 +-
.../org/apache/cassandra/service/StorageProxy.java | 70 +++++++++---
.../cassandra/service/StorageProxyMBean.java | 2 +
.../cassandra/hints/HintsWriteThenReadTest.java | 6 +-
.../org/apache/cassandra/hints/AlteredHints.java | 2 +-
.../apache/cassandra/hints/HintsCatalogTest.java | 65 ++++++-----
.../cassandra/hints/HintsDescriptorTest.java | 2 +-
.../apache/cassandra/hints/HintsReaderTest.java | 2 +-
.../cassandra/net/WriteCallbackInfoTest.java | 22 +++-
.../apache/cassandra/service/StorageProxyTest.java | 121 +++++++++++++++++++++
18 files changed, 308 insertions(+), 57 deletions(-)
diff --git a/NEWS.txt b/NEWS.txt
index 966a729..f5d76d5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - New configuration max_hints_size_per_host to limit the size of local
hints files per host in megabytes. Setting to
+ non-positive value disables the limit, which is the default behavior.
Setting to a positive value to ensure
+ the total size of the hints files per host does not exceed the limit.
- Added ability to configure auth caches through corresponding `nodetool`
commands.
- CDC data flushing now can be configured to be non-blocking with the
configuration cdc_block_writes. Setting to true,
any writes to the CDC-enabled tables will be blocked when reaching to
the limit for CDC data on disk, which is the
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5e8bc55..f2feb40 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -86,6 +86,11 @@ hints_flush_period: 10000ms
# Maximum size for a single hints file, in megabytes.
max_hints_file_size: 128MiB
+# The file size limit to store hints for an unreachable host, in megabytes.
+# Once the local hints files have reached the limit, no more new hints will be
created.
+# Set a non-positive value will disable the size limit.
+# max_hints_size_per_host: 0MiB
+
# Enable / disable automatic cleanup for the expired and orphaned hints file.
# Disable the option in order to preserve those hints on the disk.
auto_hints_cleanup_enabled: false
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index df20954..f56066c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -365,6 +365,7 @@ public class Config
public SmallestDurationMilliseconds hints_flush_period = new
SmallestDurationMilliseconds("10s");
@Replaces(oldName = "max_hints_file_size_in_mb", converter =
Converters.MEBIBYTES_DATA_STORAGE, deprecated = true)
public SmallestDataStorageMebibytes max_hints_file_size = new
SmallestDataStorageMebibytes("128MiB");
+ public volatile DataStorageSpec max_hints_size_per_host = new
DataStorageSpec("0B"); // 0 means disabled
public ParameterizedClass hints_compression;
public volatile boolean auto_hints_cleanup_enabled = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 74c8b26..50cda68 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2643,6 +2643,21 @@ public class DatabaseDescriptor
return conf.max_hint_window.toMillisecondsAsInt();
}
+ public static void setMaxHintsSizePerHostInMb(int value)
+ {
+ conf.max_hints_size_per_host = DataStorageSpec.inMebibytes(value);
+ }
+
+ public static int getMaxHintsSizePerHostInMb()
+ {
+ return conf.max_hints_size_per_host.toMebibytesAsInt();
+ }
+
+ public static long getMaxHintsSizePerHost()
+ {
+ return conf.max_hints_size_per_host.toBytes();
+ }
+
public static File getHintsDirectory()
{
return new File(conf.hints_directory);
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 4c7acf1..8e1f782 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -209,6 +209,16 @@ final class HintsDescriptor
return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
}
+ File file(File hintsDirectory)
+ {
+ return new File(hintsDirectory, fileName());
+ }
+
+ File checksumFile(File hintsDirectory)
+ {
+ return new File(hintsDirectory, checksumFileName());
+ }
+
int messagingVersion()
{
return messagingVersion(version);
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 991e2a2..0f34db6 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -276,7 +276,7 @@ final class HintsDispatchExecutor
private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort
address)
{
- File file = new File(hintsDirectory, descriptor.fileName());
+ File file = descriptor.file(hintsDirectory);
InputPosition offset = store.getDispatchOffset(descriptor);
BooleanSupplier shouldAbort = () -> !isAlive.test(address) ||
isPaused.get();
@@ -321,7 +321,7 @@ final class HintsDispatchExecutor
// for each hint in the hints file for a node that isn't part of the
ring anymore, write RF hints for each replica
private void convert(HintsDescriptor descriptor)
{
- File file = new File(hintsDirectory, descriptor.fileName());
+ File file = descriptor.file(hintsDirectory);
try (HintsReader reader = HintsReader.open(file, rateLimiter))
{
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java
b/src/java/org/apache/cassandra/hints/HintsService.java
index e80865b..8fbfab8 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -247,6 +247,19 @@ public final class HintsService implements
HintsServiceMBean
}
/**
+ * Get the total size in bytes of all the hints files associating with the
host on disk.
+ * @param hostId, belonging host
+ * @return total file size, in bytes
+ */
+ public long getTotalHintsSize(UUID hostId)
+ {
+ HintsStore store = catalog.getNullable(hostId);
+ if (store == null)
+ return 0;
+ return store.getTotalFileSize();
+ }
+
+ /**
* Gracefully and blockingly shut down the service.
*
* Will abort dispatch sessions that are currently in progress (which is
okay, it's idempotent),
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java
b/src/java/org/apache/cassandra/hints/HintsStore.java
index bc2fee7..a042cc3 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -209,7 +209,7 @@ final class HintsStore
void delete(HintsDescriptor descriptor)
{
- File hintsFile = new File(hintsDirectory, descriptor.fileName());
+ File hintsFile = descriptor.file(hintsDirectory);
if (hintsFile.tryDelete())
logger.info("Deleted hint file {}", descriptor.fileName());
else if (hintsFile.exists())
@@ -218,7 +218,7 @@ final class HintsStore
logger.info("Already deleted hint file {}", descriptor.fileName());
//noinspection ResultOfMethodCallIgnored
- new File(hintsDirectory, descriptor.checksumFileName()).tryDelete();
+ descriptor.checksumFile(hintsDirectory).tryDelete();
}
boolean hasFiles()
@@ -236,6 +236,20 @@ final class HintsStore
dispatchPositions.put(descriptor, inputPosition);
}
+
+ /**
+ * @return the total size of all files belonging to the hints store, in
bytes.
+ */
+ long getTotalFileSize()
+ {
+ long total = 0;
+ for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue,
corruptedFiles))
+ {
+ total += descriptor.file(hintsDirectory).length();
+ }
+ return total;
+ }
+
void cleanUp(HintsDescriptor descriptor)
{
dispatchPositions.remove(descriptor);
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java
b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 468eee2..663427a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -67,7 +67,7 @@ class HintsWriter implements AutoCloseable
@SuppressWarnings("resource") // HintsWriter owns channel
static HintsWriter create(File directory, HintsDescriptor descriptor)
throws IOException
{
- File file = new File(directory, descriptor.fileName());
+ File file = descriptor.file(directory);
FileChannel channel = FileChannel.open(file.toPath(),
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
int fd = NativeLibrary.getfd(channel);
@@ -102,7 +102,7 @@ class HintsWriter implements AutoCloseable
private void writeChecksum()
{
- File checksumFile = new File(directory, descriptor.checksumFileName());
+ File checksumFile = descriptor.checksumFile(directory);
try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
{
out.write(Integer.toHexString((int)
globalCRC.getValue()).getBytes(StandardCharsets.UTF_8));
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 686192c..69c231d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2275,16 +2275,40 @@ public class StorageProxy implements StorageProxyMBean
DatabaseDescriptor.setMaxHintWindow(ms);
}
+ public int getMaxHintsSizePerHostInMb()
+ {
+ return DatabaseDescriptor.getMaxHintsSizePerHostInMb();
+ }
+
+ public void setMaxHintsSizePerHostInMb(int value)
+ {
+ DatabaseDescriptor.setMaxHintsSizePerHostInMb(value);
+ }
+
public static boolean shouldHint(Replica replica)
{
return shouldHint(replica, true);
}
+ /**
+ * Determines whether a hint should be stored or not.
+ * It rejects early if any of the condition is met:
+ * - Hints disabled entirely or for the belonging datacetner of the replica
+ * - The replica is transient or is the self node
+ * - The replica is no longer part of the ring
+ * - The hint window has expired
+ * - The hints have reached to the size limit for the node
+ * Otherwise, it permits.
+ *
+ * @param replica, the replica for the hint
+ * @param tryEnablePersistentWindow, true to consider
hint_window_persistent_enabled; otherwise, ignores
+ * @return true to permit or false to reject hint
+ */
public static boolean shouldHint(Replica replica, boolean
tryEnablePersistentWindow)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
- return false;
- if (replica.isTransient() || replica.isSelf())
+ if (!DatabaseDescriptor.hintedHandoffEnabled()
+ || replica.isTransient()
+ || replica.isSelf())
return false;
Set<String> disabledDCs =
DatabaseDescriptor.hintedHandoffDisabledDCs();
@@ -2303,26 +2327,40 @@ public class StorageProxy implements StorageProxyMBean
long endpointDowntime =
Gossiper.instance.getEndpointDowntime(endpoint);
boolean hintWindowExpired = endpointDowntime > maxHintWindow;
- if (tryEnablePersistentWindow && !hintWindowExpired &&
DatabaseDescriptor.hintWindowPersistentEnabled())
+ UUID hostIdForEndpoint =
StorageService.instance.getHostIdForEndpoint(endpoint);
+ if (hostIdForEndpoint == null)
{
- UUID hostIdForEndpoint =
StorageService.instance.getHostIdForEndpoint(endpoint);
- if (hostIdForEndpoint != null)
- {
- long earliestHint =
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
- hintWindowExpired = Clock.Global.currentTimeMillis() -
maxHintWindow > earliestHint;
- if (hintWindowExpired)
- Tracing.trace("Not hinting {} for which there is the
earliest hint stored at {}", replica, earliestHint);
- }
+ Tracing.trace("Discarding hint for endpoint not part of ring: {}",
endpoint);
+ return false;
}
- else if (hintWindowExpired)
+
+ // if persisting hints window, hintWindowExpired might be updated
according to the timestamp of the earliest hint
+ if (tryEnablePersistentWindow && !hintWindowExpired &&
DatabaseDescriptor.hintWindowPersistentEnabled())
{
- Tracing.trace("Not hinting {} which has been down {} ms", replica,
endpointDowntime);
+ long earliestHint =
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
+ hintWindowExpired = Clock.Global.currentTimeMillis() -
maxHintWindow > earliestHint;
+ if (hintWindowExpired)
+ Tracing.trace("Not hinting {} for which there is the earliest
hint stored at {}", replica, earliestHint);
}
if (hintWindowExpired)
- HintsService.instance.metrics.incrPastWindow(replica.endpoint());
+ {
+ HintsService.instance.metrics.incrPastWindow(endpoint);
+ Tracing.trace("Not hinting {} which has been down {} ms",
endpoint, endpointDowntime);
+ return false;
+ }
+
+ long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
+ long actualTotalHintsSize =
HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
+ boolean hasHintsReachedMaxSize = maxHintsSize > 0 &&
actualTotalHintsSize > maxHintsSize;
+ if (hasHintsReachedMaxSize)
+ {
+ Tracing.trace("Not hinting {} which has reached to the max hints
size {} bytes on disk. The actual hints size on disk: {}",
+ endpoint, maxHintsSize, actualTotalHintsSize);
+ return false;
+ }
- return !hintWindowExpired;
+ return true;
}
/**
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index cce7ff0..7ac83ae 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -32,6 +32,8 @@ public interface StorageProxyMBean
public Set<String> getHintedHandoffDisabledDCs();
public int getMaxHintWindow();
public void setMaxHintWindow(int ms);
+ public int getMaxHintsSizePerHostInMb();
+ public void setMaxHintsSizePerHostInMb(int value);
public int getMaxHintsInProgress();
public void setMaxHintsInProgress(int qs);
public int getHintsInProgress();
diff --git a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
index e6a6478..1b78c48 100644
--- a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
+++ b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
@@ -98,8 +98,8 @@ public class HintsWriteThenReadTest
private static void verifyChecksum(File directory, HintsDescriptor
descriptor) throws IOException
{
- File hintsFile = new File(directory, descriptor.fileName());
- File checksumFile = new File(directory, descriptor.checksumFileName());
+ File hintsFile = descriptor.file(directory);
+ File checksumFile = descriptor.checksumFile(directory);
assertTrue(checksumFile.exists());
@@ -114,7 +114,7 @@ public class HintsWriteThenReadTest
long baseTimestamp = descriptor.timestamp;
int index = 0;
- try (HintsReader reader = HintsReader.open(new File(directory,
descriptor.fileName())))
+ try (HintsReader reader = HintsReader.open(descriptor.file(directory)))
{
for (HintsReader.Page page : reader)
{
diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java
b/test/unit/org/apache/cassandra/hints/AlteredHints.java
index 0379c41..25975e6 100644
--- a/test/unit/org/apache/cassandra/hints/AlteredHints.java
+++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java
@@ -103,7 +103,7 @@ public abstract class AlteredHints
}
}
- try (HintsReader reader = HintsReader.open(new File(dir,
descriptor.fileName())))
+ try (HintsReader reader = HintsReader.open(descriptor.file(dir)))
{
Assert.assertTrue(looksLegit(reader.getInput()));
List<Hint> deserialized = new ArrayList<>(hintNum);
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 1f8c95d..8b377b2 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.*;
import com.google.common.collect.ImmutableMap;
@@ -28,9 +27,12 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import static junit.framework.Assert.*;
import static org.apache.cassandra.Util.dk;
@@ -54,18 +56,14 @@ public class HintsCatalogTest
SchemaLoader.standardCFMD(KEYSPACE, TABLE2));
}
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
@Test
public void loadCompletenessAndOrderTest() throws IOException
{
- File directory = new File(Files.createTempDirectory(null));
- try
- {
- loadCompletenessAndOrderTest(directory);
- }
- finally
- {
- directory.deleteOnExit();
- }
+ File directory = new File(testFolder.newFolder());
+ loadCompletenessAndOrderTest(directory);
}
private void loadCompletenessAndOrderTest(File directory) throws
IOException
@@ -73,10 +71,10 @@ public class HintsCatalogTest
UUID hostId1 = UUID.randomUUID();
UUID hostId2 = UUID.randomUUID();
- long timestamp1 = System.currentTimeMillis();
- long timestamp2 = System.currentTimeMillis() + 1;
- long timestamp3 = System.currentTimeMillis() + 2;
- long timestamp4 = System.currentTimeMillis() + 3;
+ long timestamp1 = Clock.Global.currentTimeMillis();
+ long timestamp2 = Clock.Global.currentTimeMillis() + 1;
+ long timestamp3 = Clock.Global.currentTimeMillis() + 2;
+ long timestamp4 = Clock.Global.currentTimeMillis() + 3;
HintsDescriptor descriptor1 = new HintsDescriptor(hostId1, timestamp1);
HintsDescriptor descriptor2 = new HintsDescriptor(hostId2, timestamp3);
@@ -107,10 +105,10 @@ public class HintsCatalogTest
@Test
public void deleteHintsTest() throws IOException
{
- File directory = new File(Files.createTempDirectory(null));
+ File directory = new File(testFolder.newFolder());
UUID hostId1 = UUID.randomUUID();
UUID hostId2 = UUID.randomUUID();
- long now = System.currentTimeMillis();
+ long now = Clock.Global.currentTimeMillis();
writeDescriptor(directory, new HintsDescriptor(hostId1, now));
writeDescriptor(directory, new HintsDescriptor(hostId1, now + 1));
writeDescriptor(directory, new HintsDescriptor(hostId2, now + 2));
@@ -138,14 +136,27 @@ public class HintsCatalogTest
@Test
public void exciseHintFiles() throws IOException
{
- File directory = new File(Files.createTempDirectory(null));
- try
- {
- exciseHintFiles(directory);
- }
- finally
+ File directory = new File(testFolder.newFolder());
+ exciseHintFiles(directory);
+ }
+
+ @Test
+ public void hintsTotalSizeTest() throws IOException
+ {
+ File directory = new File(testFolder.newFolder());
+ UUID hostId = UUID.randomUUID();
+ long now = Clock.Global.currentTimeMillis();
+ long totalSize = 0;
+ HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+ HintsStore store = catalog.get(hostId);
+ assertEquals(totalSize, store.getTotalFileSize());
+ for (int i = 0; i < 3; i++)
{
- directory.deleteOnExit();
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i);
+ writeDescriptor(directory, descriptor);
+ store.offerLast(descriptor);
+ assertTrue("Total file size should increase after writing more
hints", store.getTotalFileSize() > totalSize);
+ totalSize = store.getTotalFileSize();
}
}
@@ -153,10 +164,10 @@ public class HintsCatalogTest
{
UUID hostId = UUID.randomUUID();
- HintsDescriptor descriptor1 = new HintsDescriptor(hostId,
System.currentTimeMillis());
- HintsDescriptor descriptor2 = new HintsDescriptor(hostId,
System.currentTimeMillis() + 1);
- HintsDescriptor descriptor3 = new HintsDescriptor(hostId,
System.currentTimeMillis() + 2);
- HintsDescriptor descriptor4 = new HintsDescriptor(hostId,
System.currentTimeMillis() + 3);
+ HintsDescriptor descriptor1 = new HintsDescriptor(hostId,
Clock.Global.currentTimeMillis());
+ HintsDescriptor descriptor2 = new HintsDescriptor(hostId,
Clock.Global.currentTimeMillis() + 1);
+ HintsDescriptor descriptor3 = new HintsDescriptor(hostId,
Clock.Global.currentTimeMillis() + 2);
+ HintsDescriptor descriptor4 = new HintsDescriptor(hostId,
Clock.Global.currentTimeMillis() + 3);
createHintFile(directory, descriptor1);
createHintFile(directory, descriptor2);
diff --git a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
index ee79f89..04fd8c3 100644
--- a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
@@ -110,7 +110,7 @@ public class HintsDescriptorTest
try (HintsWriter ignored = HintsWriter.create(directory, expected))
{
}
- HintsDescriptor actual = HintsDescriptor.readFromFile(new
File(directory, expected.fileName()));
+ HintsDescriptor actual =
HintsDescriptor.readFromFile(expected.file(directory));
assertEquals(expected, actual);
}
finally
diff --git a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
index af1c89b..41f86a0 100644
--- a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
@@ -108,7 +108,7 @@ public class HintsReaderTest
{
long baseTimestamp = descriptor.timestamp;
int index = 0;
- try (HintsReader reader = HintsReader.open(new File(directory,
descriptor.fileName())))
+ try (HintsReader reader = HintsReader.open(descriptor.file(directory)))
{
for (HintsReader.Page page : reader)
{
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index b4bf8b7..80d6d87 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.net;
import java.util.UUID;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -32,6 +34,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -39,12 +42,27 @@ import static
org.apache.cassandra.locator.ReplicaUtils.full;
public class WriteCallbackInfoTest
{
+ private InetAddressAndPort testEp;
+
@BeforeClass
public static void initDD()
{
DatabaseDescriptor.daemonInitialization();
}
+ @Before
+ public void setup() throws Exception
+ {
+ testEp = InetAddressAndPort.getByName("192.168.1.1");
+
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(),
testEp);
+ }
+
+ @After
+ public void teardown()
+ {
+ StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
+ }
+
@Test
public void testShouldHint() throws Exception
{
@@ -57,14 +75,14 @@ public class WriteCallbackInfoTest
}
}
- private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean
allowHints, boolean expectHint) throws Exception
+ private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean
allowHints, boolean expectHint)
{
TableMetadata metadata = MockSchema.newTableMetadata("", "");
Object payload = verb == Verb.PAXOS_COMMIT_REQ
? new Commit(UUID.randomUUID(), new
PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER,
RegularAndStaticColumns.NONE, 1).build())
: new
Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
- RequestCallbacks.WriteCallbackInfo wcbi = new
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload),
full(InetAddressAndPort.getByName("192.168.1.1")), null, cl, allowHints);
+ RequestCallbacks.WriteCallbackInfo wcbi = new
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), full(testEp),
null, cl, allowHints);
Assert.assertEquals(expectHint, wcbi.shouldHint());
if (expectHint)
{
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java
b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
new file mode 100644
index 0000000..77e9952
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(BMUnitRunner.class)
+public class StorageProxyTest
+{
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ ServerTestUtils.mkdirs();
+ }
+
+ @Test
+ public void testShouldHint() throws Exception
+ {
+ // HAPPY PATH with all defaults
+ shouldHintTest(replica -> {
+ assertThat(StorageProxy.shouldHint(replica)).isTrue();
+ assertThat(StorageProxy.shouldHint(replica, /*
tryEnablePersistentWindow */ false)).isTrue();
+ });
+ }
+
+ @Test
+ public void testShouldHintOnWindowExpiry() throws Exception
+ {
+ shouldHintTest(replica -> {
+ // wait for 5 ms, we will shorten the hints window later
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
+
+ final int originalHintWindow =
DatabaseDescriptor.getMaxHintWindow();
+ try
+ {
+ DatabaseDescriptor.setMaxHintWindow(1); // 1 ms. It should not
hint
+ assertThat(StorageProxy.shouldHint(replica)).isFalse();
+ }
+ finally
+ {
+ DatabaseDescriptor.setMaxHintWindow(originalHintWindow);
+ }
+ });
+ }
+
+ @Test
+ @BMRule(name = "Hints size exceeded the limit",
+ targetClass="org.apache.cassandra.hints.HintsService",
+ targetMethod="getTotalHintsSize",
+ action="return 2097152;") // 2MB
+ public void testShouldHintOnExceedingSize() throws Exception
+ {
+ shouldHintTest(replica -> {
+ final int originalHintsSizeLimit =
DatabaseDescriptor.getMaxHintsSizePerHostInMb();
+ try
+ {
+ DatabaseDescriptor.setMaxHintsSizePerHostInMb(1);
+ assertThat(StorageProxy.shouldHint(replica)).isFalse();
+ }
+ finally
+ {
+
DatabaseDescriptor.setMaxHintsSizePerHostInMb(originalHintsSizeLimit);
+ }
+ });
+ }
+
+ private void shouldHintTest(Consumer<Replica> test) throws Exception
+ {
+ InetAddressAndPort testEp =
InetAddressAndPort.getByName("192.168.1.1");
+ Replica replica = full(testEp);
+
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(),
testEp);
+ EndpointState state = new EndpointState(new HeartBeatState(0, 0));
+ Gossiper.runInGossipStageBlocking(() ->
Gossiper.instance.markDead(replica.endpoint(), state));
+
+ try
+ {
+ test.accept(replica);
+ }
+ finally
+ {
+ StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]