This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 10c685222fc415586ae28a01e7896063a3f2f0d3
Merge: 33fd2dc 1bcfa08
Author: David Capwell <[email protected]>
AuthorDate: Thu Nov 18 17:29:23 2021 -0800

    Merge branch 'cassandra-4.0' into trunk

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/concurrent/Stage.java     |   1 +
 .../statements/schema/AlterKeyspaceStatement.java  |   8 --
 .../statements/schema/CreateKeyspaceStatement.java |  13 +--
 .../org/apache/cassandra/service/ClientWarn.java   |   5 -
 .../DebuggableThreadPoolExecutorTest.java          |  97 +++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |  27 +++++
 .../schema/SchemaStatementWarningsTest.java        | 117 +++++++++++++++++++++
 8 files changed, 244 insertions(+), 25 deletions(-)

diff --cc CHANGES.txt
index 676e92b,1d72c0f..316d14b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,65 -1,5 +1,66 @@@
 -4.0.2
 +4.1
 + * Log queries that fail on timeout or unavailable errors up to once per 
minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error 
handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming 
(CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap 
OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes 
the JVM to exit in some case as no non-daemon threads are active 
(CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys 
(CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the 
state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node 
(CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent 
(CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution 
(CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and 
default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual 
table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values 
of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is 
thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking 
(CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at 
startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much 
data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node 
(CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations 
(CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output 
(CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation 
allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go 
past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks 
(CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for 
which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit 
(CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores 
(CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL 
(CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond 
to single WaitingOnCommit data points (CASSANDRA-16701)
 + * Add a system property to set hostId if not yet initialized 
(CASSANDRA-14582)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version 
changes, fixed to rely on latest version (CASSANDRA-16651)
 +Merged from 4.0:
+  * DebuggableThreadPoolExecutor does not propagate client warnings 
(CASSANDRA-17072)
   * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes 
have new names. Backward compatibility with the old names added 
(CASSANDRA-17141)
   * Remove unused configuration parameters from cassandra.yaml 
(CASSANDRA-17132)
   * Queries performed with NODE_LOCAL consistency level do not update request 
metrics (CASSANDRA-17052)
diff --cc src/java/org/apache/cassandra/concurrent/Stage.java
index 66cd7cb,e00da7b..d8a5e54
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@@ -158,47 -166,39 +158,48 @@@ public enum Stag
          ExecutorUtils.awaitTermination(timeout, units, executors);
      }
  
 -    static LocalAwareExecutorService tracingExecutor(String jmxName, String 
jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener 
onSetMaximumPoolSize)
 +    private static ExecutorPlus tracingStage(String jmxName, String jmxType, 
int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
 +    {
 +        return executorFactory()
 +                .withJmx(jmxType)
 +                .configureSequential(jmxName)
 +                .withQueueLimit(1000)
 +                .withRejectedExecutionHandler((r, executor) -> 
MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE)).build();
 +    }
 +
 +    private static ExecutorPlus migrationStage(String jmxName, String 
jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
      {
 -        RejectedExecutionHandler reh = (r, executor) -> 
MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE);
 -        return new TracingExecutor(1,
 -                                   1,
 -                                   KEEP_ALIVE_SECONDS,
 -                                   TimeUnit.SECONDS,
 -                                   new ArrayBlockingQueue<>(1000),
 -                                   new NamedThreadFactory(jmxName),
 -                                   reh);
 +        return executorFactory()
++               .localAware()
 +               .withJmx(jmxType)
 +               .sequential(jmxName);
      }
  
 -    static LocalAwareExecutorService multiThreadedStage(String jmxName, 
String jmxType, int numThreads, 
LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    private static LocalAwareExecutorPlus singleThreadedStage(String jmxName, 
String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
      {
 -        return new JMXEnabledThreadPoolExecutor(numThreads,
 -                                                KEEP_ALIVE_SECONDS,
 -                                                TimeUnit.SECONDS,
 -                                                new LinkedBlockingQueue<>(),
 -                                                new 
NamedThreadFactory(jmxName),
 -                                                jmxType);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .sequential(jmxName);
      }
  
 -    static LocalAwareExecutorService multiThreadedLowSignalStage(String 
jmxName, String jmxType, int numThreads, 
LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus multiThreadedStage(String jmxName, String 
jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
      {
 -        return SharedExecutorPool.SHARED.newExecutor(numThreads, 
onSetMaximumPoolSize, jmxType, jmxName);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .pooled(jmxName, numThreads);
      }
  
 -    static LocalAwareExecutorService singleThreadedStage(String jmxName, 
String jmxType, int numThreads, 
LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus multiThreadedLowSignalStage(String jmxName, 
String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
      {
 -        return new JMXEnabledSingleThreadExecutor(jmxName, jmxType);
 +        return executorFactory()
 +                .localAware()
 +                .withJmx(jmxType)
 +                .shared(jmxName, numThreads, onSetMaximumPoolSize);
      }
  
 -    static LocalAwareExecutorService immediateExecutor(String jmxName, String 
jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener 
onSetMaximumPoolSize)
 +    static LocalAwareExecutorPlus immediateExecutor(String jmxName, String 
jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener 
onSetMaximumPoolSize)
      {
          return ImmediateExecutor.INSTANCE;
      }
diff --cc src/java/org/apache/cassandra/service/ClientWarn.java
index 46a42c7,5a6a878..6e2d3fc
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@@ -64,14 -64,9 +64,9 @@@ public class ClientWarn extends Executo
          return state.warnings;
      }
  
-     public int numWarnings()
-     {
-         return getWarnings() == null ? 0 : getWarnings().size();
-     }
- 
      public void resetWarnings()
      {
 -        warnLocal.remove();
 +        set(null);
      }
  
      public static class State
diff --cc 
test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
index 17253ec,43c0fdf..adb0962
--- 
a/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
+++ 
b/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
@@@ -29,6 -32,8 +29,7 @@@ import java.util.function.Supplier
  
  import com.google.common.base.Throwables;
  import com.google.common.net.InetAddresses;
 -import com.google.common.util.concurrent.ListenableFutureTask;
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Assert;
  import org.junit.BeforeClass;
  import org.junit.Test;
@@@ -38,11 -44,12 +40,16 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.TraceStateImpl;
  import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.WrappedRunnable;
+ import org.assertj.core.api.Assertions;
  
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +
+ import static java.util.concurrent.TimeUnit.MILLISECONDS;
++import static org.assertj.core.api.Assertions.assertThat;
+ 
  public class DebuggableThreadPoolExecutorTest
  {
      @BeforeClass
@@@ -75,14 -87,82 +82,104 @@@
      }
  
      @Test
+     public void testLocalStatePropagation()
+     {
 -        DebuggableThreadPoolExecutor executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("TEST", 1);
++        ExecutorPlus executor = 
executorFactory().localAware().sequential("TEST");
++        assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class);
++        try
++        {
++            checkLocalStateIsPropagated(executor);
++        }
++        finally
++        {
++            executor.shutdown();
++        }
++    }
++
++    @Test
++    public void testNoLocalStatePropagation() throws InterruptedException
++    {
++        ExecutorPlus executor = executorFactory().sequential("TEST");
++        assertThat(executor).isNotInstanceOf(LocalAwareExecutorPlus.class);
+         try
+         {
+             checkLocalStateIsPropagated(executor);
+         }
+         finally
+         {
+             executor.shutdown();
+         }
+     }
+ 
 -    public static void checkLocalStateIsPropagated(LocalAwareExecutorService 
executor)
++    public static void checkLocalStateIsPropagated(ExecutorPlus executor)
+     {
+         checkClientWarningsArePropagated(executor, () -> executor.execute(() 
-> ClientWarn.instance.warn("msg")));
+         checkClientWarningsArePropagated(executor, () -> executor.submit(() 
-> ClientWarn.instance.warn("msg")));
+         checkClientWarningsArePropagated(executor, () -> executor.submit(() 
-> ClientWarn.instance.warn("msg"), null));
+         checkClientWarningsArePropagated(executor, () -> 
executor.submit((Callable<Void>) () -> {
+             ClientWarn.instance.warn("msg");
+             return null;
+         }));
+ 
+         checkTracingIsPropagated(executor, () -> executor.execute(() -> 
Tracing.trace("msg")));
+         checkTracingIsPropagated(executor, () -> executor.submit(() -> 
Tracing.trace("msg")));
+         checkTracingIsPropagated(executor, () -> executor.submit(() -> 
Tracing.trace("msg"), null));
+         checkTracingIsPropagated(executor, () -> 
executor.submit((Callable<Void>) () -> {
+             Tracing.trace("msg");
+             return null;
+         }));
+     }
+ 
 -    public static void 
checkClientWarningsArePropagated(LocalAwareExecutorService executor, Runnable 
schedulingTask) {
++    public static void checkClientWarningsArePropagated(ExecutorPlus 
executor, Runnable schedulingTask) {
+         ClientWarn.instance.captureWarnings();
 -        
Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
++        assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
+ 
+         ClientWarn.instance.warn("msg0");
+         long initCompletedTasks = executor.getCompletedTaskCount();
+         schedulingTask.run();
+         while (executor.getCompletedTaskCount() == initCompletedTasks) 
Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);
+         ClientWarn.instance.warn("msg1");
+ 
 -        
Assertions.assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0",
 "msg", "msg1");
++        if (executor instanceof LocalAwareExecutorPlus)
++            
assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", 
"msg", "msg1");
++        else
++            
assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", 
"msg1");
+     }
+ 
 -    public static void checkTracingIsPropagated(LocalAwareExecutorService 
executor, Runnable schedulingTask) {
++    public static void checkTracingIsPropagated(ExecutorPlus executor, 
Runnable schedulingTask) {
+         ClientWarn.instance.captureWarnings();
 -        
Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
++        assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty();
+ 
+         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>();
+         Tracing.instance.set(new 
TraceState(FBUtilities.getLocalAddressAndPort(), UUID.randomUUID(), 
Tracing.TraceType.NONE)
+         {
+             @Override
+             protected void traceImpl(String message)
+             {
+                 q.add(message);
+             }
+         });
+         Tracing.trace("msg0");
+         long initCompletedTasks = executor.getCompletedTaskCount();
+         schedulingTask.run();
+         while (executor.getCompletedTaskCount() == initCompletedTasks) 
Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS);
+         Tracing.trace("msg1");
+ 
 -        Assertions.assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", 
"msg", "msg1");
++        if (executor instanceof LocalAwareExecutorPlus)
++            assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg", 
"msg1");
++        else
++            assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg1");
+     }
+ 
+     @Test
      public void testExecuteFutureTaskWhileTracing()
      {
 -        LinkedBlockingQueue<Runnable> q = new 
LinkedBlockingQueue<Runnable>(1);
 -        DebuggableThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(1,
 -                                                                              
   Integer.MAX_VALUE,
 -                                                                              
   TimeUnit.MILLISECONDS,
 -                                                                              
   q,
 -                                                                              
   new NamedThreadFactory("TEST"));
 +        SettableUncaughtExceptionHandler ueh = new 
SettableUncaughtExceptionHandler();
 +        ExecutorPlus executor = executorFactory()
 +                                .localAware()
 +                                .configureSequential("TEST")
 +                                .withUncaughtExceptionHandler(ueh)
 +                                .withQueueLimit(1).build();
          Runnable test = () -> executor.execute(failingTask());
          try
          {
diff --cc test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 2a8aeb9,97e389c..7a682ed
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@@ -36,6 -38,7 +38,8 @@@ import org.apache.cassandra.utils.FBUti
  
  import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.MINUTES;
+ import static 
org.apache.cassandra.concurrent.DebuggableThreadPoolExecutorTest.checkLocalStateIsPropagated;
++import static org.assertj.core.api.Assertions.assertThat;
  
  public class SEPExecutorTest
  {
@@@ -260,4 -270,20 +270,21 @@@
          // Will return true if all of the LatchWaiters count down before the 
timeout
          Assert.assertTrue("Test tasks did not hit max concurrency goal", 
concurrencyGoal.await(3L, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testLocalStatePropagation() throws InterruptedException, 
TimeoutException
+     {
+         SharedExecutorPool sharedPool = new SharedExecutorPool("TestPool");
+         try
+         {
 -            LocalAwareExecutorService executor = sharedPool.newExecutor(1, 
"TEST", "TEST");
++            LocalAwareExecutorPlus executor = sharedPool.newExecutor(1, 
"TEST", "TEST");
++            assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class);
+             checkLocalStateIsPropagated(executor);
+         }
+         finally
+         {
+             sharedPool.shutdownAndWait(1, TimeUnit.SECONDS);
+         }
+     }
+ 
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to