This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a6f690  Limit the maximum hints size per host
3a6f690 is described below

commit 3a6f6907314670fdb2b316db8f08ffd85da88851
Author: Yifan Cai <[email protected]>
AuthorDate: Tue Dec 7 18:54:58 2021 -0800

    Limit the maximum hints size per host
    
    patch by Yifan Cai; reviewed by Dinesh Joshi, Francisco Guerrero for 
CASSANDRA-17142
---
 NEWS.txt                                           |   3 +
 conf/cassandra.yaml                                |   5 +
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |  15 +++
 .../apache/cassandra/hints/HintsDescriptor.java    |  10 ++
 .../cassandra/hints/HintsDispatchExecutor.java     |   4 +-
 .../org/apache/cassandra/hints/HintsService.java   |  13 +++
 .../org/apache/cassandra/hints/HintsStore.java     |  18 ++-
 .../org/apache/cassandra/hints/HintsWriter.java    |   4 +-
 .../org/apache/cassandra/service/StorageProxy.java |  70 +++++++++---
 .../cassandra/service/StorageProxyMBean.java       |   2 +
 .../cassandra/hints/HintsWriteThenReadTest.java    |   6 +-
 .../org/apache/cassandra/hints/AlteredHints.java   |   2 +-
 .../apache/cassandra/hints/HintsCatalogTest.java   |  65 ++++++-----
 .../cassandra/hints/HintsDescriptorTest.java       |   2 +-
 .../apache/cassandra/hints/HintsReaderTest.java    |   2 +-
 .../cassandra/net/WriteCallbackInfoTest.java       |  22 +++-
 .../apache/cassandra/service/StorageProxyTest.java | 121 +++++++++++++++++++++
 18 files changed, 308 insertions(+), 57 deletions(-)

diff --git a/NEWS.txt b/NEWS.txt
index 966a729..f5d76d5 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - New configuration max_hints_size_per_host to limit the size of local 
hints files per host in megabytes. Setting to
+      non-positive value disables the limit, which is the default behavior. 
Setting to a positive value to ensure
+      the total size of the hints files per host does not exceed the limit.
     - Added ability to configure auth caches through corresponding `nodetool` 
commands.
     - CDC data flushing now can be configured to be non-blocking with the 
configuration cdc_block_writes. Setting to true,
       any writes to the CDC-enabled tables will be blocked when reaching to 
the limit for CDC data on disk, which is the
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5e8bc55..f2feb40 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -86,6 +86,11 @@ hints_flush_period: 10000ms
 # Maximum size for a single hints file, in megabytes.
 max_hints_file_size: 128MiB
 
+# The file size limit to store hints for an unreachable host, in megabytes.
+# Once the local hints files have reached the limit, no more new hints will be 
created.
+# Set a non-positive value will disable the size limit.
+# max_hints_size_per_host: 0MiB
+
 # Enable / disable automatic cleanup for the expired and orphaned hints file.
 # Disable the option in order to preserve those hints on the disk.
 auto_hints_cleanup_enabled: false
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index df20954..f56066c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -365,6 +365,7 @@ public class Config
     public SmallestDurationMilliseconds hints_flush_period = new 
SmallestDurationMilliseconds("10s");
     @Replaces(oldName = "max_hints_file_size_in_mb", converter = 
Converters.MEBIBYTES_DATA_STORAGE, deprecated = true)
     public SmallestDataStorageMebibytes max_hints_file_size = new 
SmallestDataStorageMebibytes("128MiB");
+    public volatile DataStorageSpec max_hints_size_per_host = new 
DataStorageSpec("0B"); // 0 means disabled
 
     public ParameterizedClass hints_compression;
     public volatile boolean auto_hints_cleanup_enabled = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 74c8b26..50cda68 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2643,6 +2643,21 @@ public class DatabaseDescriptor
         return conf.max_hint_window.toMillisecondsAsInt();
     }
 
+    public static void setMaxHintsSizePerHostInMb(int value)
+    {
+        conf.max_hints_size_per_host = DataStorageSpec.inMebibytes(value);
+    }
+
+    public static int getMaxHintsSizePerHostInMb()
+    {
+        return conf.max_hints_size_per_host.toMebibytesAsInt();
+    }
+
+    public static long getMaxHintsSizePerHost()
+    {
+        return conf.max_hints_size_per_host.toBytes();
+    }
+
     public static File getHintsDirectory()
     {
         return new File(conf.hints_directory);
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java 
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 4c7acf1..8e1f782 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -209,6 +209,16 @@ final class HintsDescriptor
         return String.format("%s-%s-%s.crc32", hostId, timestamp, version);
     }
 
+    File file(File hintsDirectory)
+    {
+        return new File(hintsDirectory, fileName());
+    }
+
+    File checksumFile(File hintsDirectory)
+    {
+        return new File(hintsDirectory, checksumFileName());
+    }
+
     int messagingVersion()
     {
         return messagingVersion(version);
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 991e2a2..0f34db6 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -276,7 +276,7 @@ final class HintsDispatchExecutor
 
         private boolean deliver(HintsDescriptor descriptor, InetAddressAndPort 
address)
         {
-            File file = new File(hintsDirectory, descriptor.fileName());
+            File file = descriptor.file(hintsDirectory);
             InputPosition offset = store.getDispatchOffset(descriptor);
 
             BooleanSupplier shouldAbort = () -> !isAlive.test(address) || 
isPaused.get();
@@ -321,7 +321,7 @@ final class HintsDispatchExecutor
         // for each hint in the hints file for a node that isn't part of the 
ring anymore, write RF hints for each replica
         private void convert(HintsDescriptor descriptor)
         {
-            File file = new File(hintsDirectory, descriptor.fileName());
+            File file = descriptor.file(hintsDirectory);
 
             try (HintsReader reader = HintsReader.open(file, rateLimiter))
             {
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index e80865b..8fbfab8 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -247,6 +247,19 @@ public final class HintsService implements 
HintsServiceMBean
     }
 
     /**
+     * Get the total size in bytes of all the hints files associating with the 
host on disk.
+     * @param hostId, belonging host
+     * @return total file size, in bytes
+     */
+    public long getTotalHintsSize(UUID hostId)
+    {
+        HintsStore store = catalog.getNullable(hostId);
+        if (store == null)
+            return 0;
+        return store.getTotalFileSize();
+    }
+
+    /**
      * Gracefully and blockingly shut down the service.
      *
      * Will abort dispatch sessions that are currently in progress (which is 
okay, it's idempotent),
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java 
b/src/java/org/apache/cassandra/hints/HintsStore.java
index bc2fee7..a042cc3 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -209,7 +209,7 @@ final class HintsStore
 
     void delete(HintsDescriptor descriptor)
     {
-        File hintsFile = new File(hintsDirectory, descriptor.fileName());
+        File hintsFile = descriptor.file(hintsDirectory);
         if (hintsFile.tryDelete())
             logger.info("Deleted hint file {}", descriptor.fileName());
         else if (hintsFile.exists())
@@ -218,7 +218,7 @@ final class HintsStore
             logger.info("Already deleted hint file {}", descriptor.fileName());
 
         //noinspection ResultOfMethodCallIgnored
-        new File(hintsDirectory, descriptor.checksumFileName()).tryDelete();
+        descriptor.checksumFile(hintsDirectory).tryDelete();
     }
 
     boolean hasFiles()
@@ -236,6 +236,20 @@ final class HintsStore
         dispatchPositions.put(descriptor, inputPosition);
     }
 
+
+    /**
+     * @return the total size of all files belonging to the hints store, in 
bytes.
+     */
+    long getTotalFileSize()
+    {
+        long total = 0;
+        for (HintsDescriptor descriptor : Iterables.concat(dispatchDequeue, 
corruptedFiles))
+        {
+            total += descriptor.file(hintsDirectory).length();
+        }
+        return total;
+    }
+
     void cleanUp(HintsDescriptor descriptor)
     {
         dispatchPositions.remove(descriptor);
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java 
b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 468eee2..663427a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -67,7 +67,7 @@ class HintsWriter implements AutoCloseable
     @SuppressWarnings("resource") // HintsWriter owns channel
     static HintsWriter create(File directory, HintsDescriptor descriptor) 
throws IOException
     {
-        File file = new File(directory, descriptor.fileName());
+        File file = descriptor.file(directory);
 
         FileChannel channel = FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
         int fd = NativeLibrary.getfd(channel);
@@ -102,7 +102,7 @@ class HintsWriter implements AutoCloseable
 
     private void writeChecksum()
     {
-        File checksumFile = new File(directory, descriptor.checksumFileName());
+        File checksumFile = descriptor.checksumFile(directory);
         try (OutputStream out = Files.newOutputStream(checksumFile.toPath()))
         {
             out.write(Integer.toHexString((int) 
globalCRC.getValue()).getBytes(StandardCharsets.UTF_8));
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 686192c..69c231d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2275,16 +2275,40 @@ public class StorageProxy implements StorageProxyMBean
         DatabaseDescriptor.setMaxHintWindow(ms);
     }
 
+    public int getMaxHintsSizePerHostInMb()
+    {
+        return DatabaseDescriptor.getMaxHintsSizePerHostInMb();
+    }
+
+    public void setMaxHintsSizePerHostInMb(int value)
+    {
+        DatabaseDescriptor.setMaxHintsSizePerHostInMb(value);
+    }
+
     public static boolean shouldHint(Replica replica)
     {
         return shouldHint(replica, true);
     }
 
+    /**
+     * Determines whether a hint should be stored or not.
+     * It rejects early if any of the condition is met:
+     * - Hints disabled entirely or for the belonging datacetner of the replica
+     * - The replica is transient or is the self node
+     * - The replica is no longer part of the ring
+     * - The hint window has expired
+     * - The hints have reached to the size limit for the node
+     * Otherwise, it permits.
+     *
+     * @param replica, the replica for the hint
+     * @param tryEnablePersistentWindow, true to consider 
hint_window_persistent_enabled; otherwise, ignores
+     * @return true to permit or false to reject hint
+     */
     public static boolean shouldHint(Replica replica, boolean 
tryEnablePersistentWindow)
     {
-        if (!DatabaseDescriptor.hintedHandoffEnabled())
-            return false;
-        if (replica.isTransient() || replica.isSelf())
+        if (!DatabaseDescriptor.hintedHandoffEnabled()
+            || replica.isTransient()
+            || replica.isSelf())
             return false;
 
         Set<String> disabledDCs = 
DatabaseDescriptor.hintedHandoffDisabledDCs();
@@ -2303,26 +2327,40 @@ public class StorageProxy implements StorageProxyMBean
         long endpointDowntime = 
Gossiper.instance.getEndpointDowntime(endpoint);
         boolean hintWindowExpired = endpointDowntime > maxHintWindow;
 
-        if (tryEnablePersistentWindow && !hintWindowExpired && 
DatabaseDescriptor.hintWindowPersistentEnabled())
+        UUID hostIdForEndpoint = 
StorageService.instance.getHostIdForEndpoint(endpoint);
+        if (hostIdForEndpoint == null)
         {
-            UUID hostIdForEndpoint = 
StorageService.instance.getHostIdForEndpoint(endpoint);
-            if (hostIdForEndpoint != null)
-            {
-                long earliestHint = 
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
-                hintWindowExpired = Clock.Global.currentTimeMillis() - 
maxHintWindow > earliestHint;
-                if (hintWindowExpired)
-                    Tracing.trace("Not hinting {} for which there is the 
earliest hint stored at {}", replica, earliestHint);
-            }
+            Tracing.trace("Discarding hint for endpoint not part of ring: {}", 
endpoint);
+            return false;
         }
-        else if (hintWindowExpired)
+
+        // if persisting hints window, hintWindowExpired might be updated 
according to the timestamp of the earliest hint
+        if (tryEnablePersistentWindow && !hintWindowExpired && 
DatabaseDescriptor.hintWindowPersistentEnabled())
         {
-            Tracing.trace("Not hinting {} which has been down {} ms", replica, 
endpointDowntime);
+            long earliestHint = 
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
+            hintWindowExpired = Clock.Global.currentTimeMillis() - 
maxHintWindow > earliestHint;
+            if (hintWindowExpired)
+                Tracing.trace("Not hinting {} for which there is the earliest 
hint stored at {}", replica, earliestHint);
         }
 
         if (hintWindowExpired)
-            HintsService.instance.metrics.incrPastWindow(replica.endpoint());
+        {
+            HintsService.instance.metrics.incrPastWindow(endpoint);
+            Tracing.trace("Not hinting {} which has been down {} ms", 
endpoint, endpointDowntime);
+            return false;
+        }
+
+        long maxHintsSize = DatabaseDescriptor.getMaxHintsSizePerHost();
+        long actualTotalHintsSize = 
HintsService.instance.getTotalHintsSize(hostIdForEndpoint);
+        boolean hasHintsReachedMaxSize = maxHintsSize > 0 && 
actualTotalHintsSize > maxHintsSize;
+        if (hasHintsReachedMaxSize)
+        {
+            Tracing.trace("Not hinting {} which has reached to the max hints 
size {} bytes on disk. The actual hints size on disk: {}",
+                          endpoint, maxHintsSize, actualTotalHintsSize);
+            return false;
+        }
 
-        return !hintWindowExpired;
+        return true;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index cce7ff0..7ac83ae 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -32,6 +32,8 @@ public interface StorageProxyMBean
     public Set<String> getHintedHandoffDisabledDCs();
     public int getMaxHintWindow();
     public void setMaxHintWindow(int ms);
+    public int getMaxHintsSizePerHostInMb();
+    public void setMaxHintsSizePerHostInMb(int value);
     public int getMaxHintsInProgress();
     public void setMaxHintsInProgress(int qs);
     public int getHintsInProgress();
diff --git a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java 
b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
index e6a6478..1b78c48 100644
--- a/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
+++ b/test/long/org/apache/cassandra/hints/HintsWriteThenReadTest.java
@@ -98,8 +98,8 @@ public class HintsWriteThenReadTest
 
     private static void verifyChecksum(File directory, HintsDescriptor 
descriptor) throws IOException
     {
-        File hintsFile = new File(directory, descriptor.fileName());
-        File checksumFile = new File(directory, descriptor.checksumFileName());
+        File hintsFile = descriptor.file(directory);
+        File checksumFile = descriptor.checksumFile(directory);
 
         assertTrue(checksumFile.exists());
 
@@ -114,7 +114,7 @@ public class HintsWriteThenReadTest
         long baseTimestamp = descriptor.timestamp;
         int index = 0;
 
-        try (HintsReader reader = HintsReader.open(new File(directory, 
descriptor.fileName())))
+        try (HintsReader reader = HintsReader.open(descriptor.file(directory)))
         {
             for (HintsReader.Page page : reader)
             {
diff --git a/test/unit/org/apache/cassandra/hints/AlteredHints.java 
b/test/unit/org/apache/cassandra/hints/AlteredHints.java
index 0379c41..25975e6 100644
--- a/test/unit/org/apache/cassandra/hints/AlteredHints.java
+++ b/test/unit/org/apache/cassandra/hints/AlteredHints.java
@@ -103,7 +103,7 @@ public abstract class AlteredHints
             }
         }
 
-        try (HintsReader reader = HintsReader.open(new File(dir, 
descriptor.fileName())))
+        try (HintsReader reader = HintsReader.open(descriptor.file(dir)))
         {
             Assert.assertTrue(looksLegit(reader.getInput()));
             List<Hint> deserialized = new ArrayList<>(hintNum);
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java 
b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index 1f8c95d..8b377b2 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.hints;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
@@ -28,9 +27,12 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static junit.framework.Assert.*;
 import static org.apache.cassandra.Util.dk;
@@ -54,18 +56,14 @@ public class HintsCatalogTest
                 SchemaLoader.standardCFMD(KEYSPACE, TABLE2));
     }
 
+    @Rule
+    public TemporaryFolder testFolder = new TemporaryFolder();
+
     @Test
     public void loadCompletenessAndOrderTest() throws IOException
     {
-        File directory = new File(Files.createTempDirectory(null));
-        try
-        {
-            loadCompletenessAndOrderTest(directory);
-        }
-        finally
-        {
-            directory.deleteOnExit();
-        }
+        File directory = new File(testFolder.newFolder());
+        loadCompletenessAndOrderTest(directory);
     }
 
     private void loadCompletenessAndOrderTest(File directory) throws 
IOException
@@ -73,10 +71,10 @@ public class HintsCatalogTest
         UUID hostId1 = UUID.randomUUID();
         UUID hostId2 = UUID.randomUUID();
 
-        long timestamp1 = System.currentTimeMillis();
-        long timestamp2 = System.currentTimeMillis() + 1;
-        long timestamp3 = System.currentTimeMillis() + 2;
-        long timestamp4 = System.currentTimeMillis() + 3;
+        long timestamp1 = Clock.Global.currentTimeMillis();
+        long timestamp2 = Clock.Global.currentTimeMillis() + 1;
+        long timestamp3 = Clock.Global.currentTimeMillis() + 2;
+        long timestamp4 = Clock.Global.currentTimeMillis() + 3;
 
         HintsDescriptor descriptor1 = new HintsDescriptor(hostId1, timestamp1);
         HintsDescriptor descriptor2 = new HintsDescriptor(hostId2, timestamp3);
@@ -107,10 +105,10 @@ public class HintsCatalogTest
     @Test
     public void deleteHintsTest() throws IOException
     {
-        File directory = new File(Files.createTempDirectory(null));
+        File directory = new File(testFolder.newFolder());
         UUID hostId1 = UUID.randomUUID();
         UUID hostId2 = UUID.randomUUID();
-        long now = System.currentTimeMillis();
+        long now = Clock.Global.currentTimeMillis();
         writeDescriptor(directory, new HintsDescriptor(hostId1, now));
         writeDescriptor(directory, new HintsDescriptor(hostId1, now + 1));
         writeDescriptor(directory, new HintsDescriptor(hostId2, now + 2));
@@ -138,14 +136,27 @@ public class HintsCatalogTest
     @Test
     public void exciseHintFiles() throws IOException
     {
-        File directory = new File(Files.createTempDirectory(null));
-        try
-        {
-            exciseHintFiles(directory);
-        }
-        finally
+        File directory = new File(testFolder.newFolder());
+        exciseHintFiles(directory);
+    }
+
+    @Test
+    public void hintsTotalSizeTest() throws IOException
+    {
+        File directory = new File(testFolder.newFolder());
+        UUID hostId = UUID.randomUUID();
+        long now = Clock.Global.currentTimeMillis();
+        long totalSize = 0;
+        HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of());
+        HintsStore store = catalog.get(hostId);
+        assertEquals(totalSize, store.getTotalFileSize());
+        for (int i = 0; i < 3; i++)
         {
-            directory.deleteOnExit();
+            HintsDescriptor descriptor = new HintsDescriptor(hostId, now + i);
+            writeDescriptor(directory, descriptor);
+            store.offerLast(descriptor);
+            assertTrue("Total file size should increase after writing more 
hints", store.getTotalFileSize() > totalSize);
+            totalSize = store.getTotalFileSize();
         }
     }
 
@@ -153,10 +164,10 @@ public class HintsCatalogTest
     {
         UUID hostId = UUID.randomUUID();
 
-        HintsDescriptor descriptor1 = new HintsDescriptor(hostId, 
System.currentTimeMillis());
-        HintsDescriptor descriptor2 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 1);
-        HintsDescriptor descriptor3 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 2);
-        HintsDescriptor descriptor4 = new HintsDescriptor(hostId, 
System.currentTimeMillis() + 3);
+        HintsDescriptor descriptor1 = new HintsDescriptor(hostId, 
Clock.Global.currentTimeMillis());
+        HintsDescriptor descriptor2 = new HintsDescriptor(hostId, 
Clock.Global.currentTimeMillis() + 1);
+        HintsDescriptor descriptor3 = new HintsDescriptor(hostId, 
Clock.Global.currentTimeMillis() + 2);
+        HintsDescriptor descriptor4 = new HintsDescriptor(hostId, 
Clock.Global.currentTimeMillis() + 3);
 
         createHintFile(directory, descriptor1);
         createHintFile(directory, descriptor2);
diff --git a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java 
b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
index ee79f89..04fd8c3 100644
--- a/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsDescriptorTest.java
@@ -110,7 +110,7 @@ public class HintsDescriptorTest
             try (HintsWriter ignored = HintsWriter.create(directory, expected))
             {
             }
-            HintsDescriptor actual = HintsDescriptor.readFromFile(new 
File(directory, expected.fileName()));
+            HintsDescriptor actual = 
HintsDescriptor.readFromFile(expected.file(directory));
             assertEquals(expected, actual);
         }
         finally
diff --git a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java 
b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
index af1c89b..41f86a0 100644
--- a/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsReaderTest.java
@@ -108,7 +108,7 @@ public class HintsReaderTest
     {
         long baseTimestamp = descriptor.timestamp;
         int index = 0;
-        try (HintsReader reader = HintsReader.open(new File(directory, 
descriptor.fileName())))
+        try (HintsReader reader = HintsReader.open(descriptor.file(directory)))
         {
             for (HintsReader.Page page : reader)
             {
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java 
b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index b4bf8b7..80d6d87 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.net;
 
 import java.util.UUID;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -32,6 +34,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -39,12 +42,27 @@ import static 
org.apache.cassandra.locator.ReplicaUtils.full;
 
 public class WriteCallbackInfoTest
 {
+    private InetAddressAndPort testEp;
+
     @BeforeClass
     public static void initDD()
     {
         DatabaseDescriptor.daemonInitialization();
     }
 
+    @Before
+    public void setup() throws Exception
+    {
+        testEp = InetAddressAndPort.getByName("192.168.1.1");
+        
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), 
testEp);
+    }
+
+    @After
+    public void teardown()
+    {
+        StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
+    }
+
     @Test
     public void testShouldHint() throws Exception
     {
@@ -57,14 +75,14 @@ public class WriteCallbackInfoTest
         }
     }
 
-    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean 
allowHints, boolean expectHint) throws Exception
+    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean 
allowHints, boolean expectHint)
     {
         TableMetadata metadata = MockSchema.newTableMetadata("", "");
         Object payload = verb == Verb.PAXOS_COMMIT_REQ
                          ? new Commit(UUID.randomUUID(), new 
PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
RegularAndStaticColumns.NONE, 1).build())
                          : new 
Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
 
-        RequestCallbacks.WriteCallbackInfo wcbi = new 
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), 
full(InetAddressAndPort.getByName("192.168.1.1")), null, cl, allowHints);
+        RequestCallbacks.WriteCallbackInfo wcbi = new 
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), full(testEp), 
null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {
diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java 
b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
new file mode 100644
index 0000000..77e9952
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(BMUnitRunner.class)
+public class StorageProxyTest
+{
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ServerTestUtils.mkdirs();
+    }
+
+    @Test
+    public void testShouldHint() throws Exception
+    {
+        // HAPPY PATH with all defaults
+        shouldHintTest(replica -> {
+            assertThat(StorageProxy.shouldHint(replica)).isTrue();
+            assertThat(StorageProxy.shouldHint(replica, /* 
tryEnablePersistentWindow */ false)).isTrue();
+        });
+    }
+
+    @Test
+    public void testShouldHintOnWindowExpiry() throws Exception
+    {
+        shouldHintTest(replica -> {
+            // wait for 5 ms, we will shorten the hints window later
+            Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
+
+            final int originalHintWindow = 
DatabaseDescriptor.getMaxHintWindow();
+            try
+            {
+                DatabaseDescriptor.setMaxHintWindow(1); // 1 ms. It should not 
hint
+                assertThat(StorageProxy.shouldHint(replica)).isFalse();
+            }
+            finally
+            {
+                DatabaseDescriptor.setMaxHintWindow(originalHintWindow);
+            }
+        });
+    }
+
+    @Test
+    @BMRule(name = "Hints size exceeded the limit",
+            targetClass="org.apache.cassandra.hints.HintsService",
+            targetMethod="getTotalHintsSize",
+            action="return 2097152;") // 2MB
+    public void testShouldHintOnExceedingSize() throws Exception
+    {
+        shouldHintTest(replica -> {
+            final int originalHintsSizeLimit = 
DatabaseDescriptor.getMaxHintsSizePerHostInMb();
+            try
+            {
+                DatabaseDescriptor.setMaxHintsSizePerHostInMb(1);
+                assertThat(StorageProxy.shouldHint(replica)).isFalse();
+            }
+            finally
+            {
+                
DatabaseDescriptor.setMaxHintsSizePerHostInMb(originalHintsSizeLimit);
+            }
+        });
+    }
+
+    private void shouldHintTest(Consumer<Replica> test) throws Exception
+    {
+        InetAddressAndPort testEp = 
InetAddressAndPort.getByName("192.168.1.1");
+        Replica replica = full(testEp);
+        
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), 
testEp);
+        EndpointState state = new EndpointState(new HeartBeatState(0, 0));
+        Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.markDead(replica.endpoint(), state));
+
+        try
+        {
+            test.accept(replica);
+        }
+        finally
+        {
+            StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to