This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 10c685222fc415586ae28a01e7896063a3f2f0d3 Merge: 33fd2dc 1bcfa08 Author: David Capwell <[email protected]> AuthorDate: Thu Nov 18 17:29:23 2021 -0800 Merge branch 'cassandra-4.0' into trunk CHANGES.txt | 1 + .../org/apache/cassandra/concurrent/Stage.java | 1 + .../statements/schema/AlterKeyspaceStatement.java | 8 -- .../statements/schema/CreateKeyspaceStatement.java | 13 +-- .../org/apache/cassandra/service/ClientWarn.java | 5 - .../DebuggableThreadPoolExecutorTest.java | 97 +++++++++++++++++ .../cassandra/concurrent/SEPExecutorTest.java | 27 +++++ .../schema/SchemaStatementWarningsTest.java | 117 +++++++++++++++++++++ 8 files changed, 244 insertions(+), 25 deletions(-) diff --cc CHANGES.txt index 676e92b,1d72c0f..316d14b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,65 -1,5 +1,66 @@@ -4.0.2 +4.1 + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159) + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069) + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130) + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065) + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914) + * Actively update auth cache in the background (CASSANDRA-16957) + * Add unix time conversion functions (CASSANDRA-17029) + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128) + * Forbid other Future implementations with checkstyle (CASSANDRA-17055) + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085) + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106) + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054) + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023) + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309) + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030) + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027) + * Log time spent writing keys during compaction (CASSANDRA-17037) + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976) + * Add metrics and logging around index summary redistribution (CASSANDRA-17036) + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557) + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795) + * Add broadcast_rpc_address to system.local (CASSANDRA-11181) + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337) + * add credentials file support to CQLSH (CASSANDRA-16983) + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026) + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016) + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879) + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666) + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896) + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290) + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988) + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844) + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153) + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Reduce native transport max frame size to 16MB (CASSANDRA-16886) + * Add support for filtering using IN restrictions (CASSANDRA-14344) + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404) + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850) + * Add TTL support to nodetool snapshots (CASSANDRA-16789) + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842) + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859) + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663) + * Implement nodetool getauditlog command (CASSANDRA-16725) + * Clean up repair code (CASSANDRA-13720) + * Background schedule to clean up orphaned hints files (CASSANDRA-16815) + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776) + * Batch the token metadata update to improve the speed (CASSANDRA-15291) + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775) + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760) + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671) + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510) + * Log when compacting many tombstones (CASSANDRA-16780) + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799) + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701) + * Add a system property to set hostId if not yet initialized (CASSANDRA-14582) + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651) +Merged from 4.0: + * DebuggableThreadPoolExecutor does not propagate client warnings (CASSANDRA-17072) * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes have new names. Backward compatibility with the old names added (CASSANDRA-17141) * Remove unused configuration parameters from cassandra.yaml (CASSANDRA-17132) * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052) diff --cc src/java/org/apache/cassandra/concurrent/Stage.java index 66cd7cb,e00da7b..d8a5e54 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@@ -158,47 -166,39 +158,48 @@@ public enum Stag ExecutorUtils.awaitTermination(timeout, units, executors); } - static LocalAwareExecutorService tracingExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize) + private static ExecutorPlus tracingStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) + { + return executorFactory() + .withJmx(jmxType) + .configureSequential(jmxName) + .withQueueLimit(1000) + .withRejectedExecutionHandler((r, executor) -> MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE)).build(); + } + + private static ExecutorPlus migrationStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) { - RejectedExecutionHandler reh = (r, executor) -> MessagingService.instance().metrics.recordSelfDroppedMessage(Verb._TRACE); - return new TracingExecutor(1, - 1, - KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1000), - new NamedThreadFactory(jmxName), - reh); + return executorFactory() ++ .localAware() + .withJmx(jmxType) + .sequential(jmxName); } - static LocalAwareExecutorService multiThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize) + private static LocalAwareExecutorPlus singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) { - return new JMXEnabledThreadPoolExecutor(numThreads, - KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new NamedThreadFactory(jmxName), - jmxType); + return executorFactory() + .localAware() + .withJmx(jmxType) + .sequential(jmxName); } - static LocalAwareExecutorService multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize) + static LocalAwareExecutorPlus multiThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) { - return SharedExecutorPool.SHARED.newExecutor(numThreads, onSetMaximumPoolSize, jmxType, jmxName); + return executorFactory() + .localAware() + .withJmx(jmxType) + .pooled(jmxName, numThreads); } - static LocalAwareExecutorService singleThreadedStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize) + static LocalAwareExecutorPlus multiThreadedLowSignalStage(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) { - return new JMXEnabledSingleThreadExecutor(jmxName, jmxType); + return executorFactory() + .localAware() + .withJmx(jmxType) + .shared(jmxName, numThreads, onSetMaximumPoolSize); } - static LocalAwareExecutorService immediateExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorService.MaximumPoolSizeListener onSetMaximumPoolSize) + static LocalAwareExecutorPlus immediateExecutor(String jmxName, String jmxType, int numThreads, LocalAwareExecutorPlus.MaximumPoolSizeListener onSetMaximumPoolSize) { return ImmediateExecutor.INSTANCE; } diff --cc src/java/org/apache/cassandra/service/ClientWarn.java index 46a42c7,5a6a878..6e2d3fc --- a/src/java/org/apache/cassandra/service/ClientWarn.java +++ b/src/java/org/apache/cassandra/service/ClientWarn.java @@@ -64,14 -64,9 +64,9 @@@ public class ClientWarn extends Executo return state.warnings; } - public int numWarnings() - { - return getWarnings() == null ? 0 : getWarnings().size(); - } - public void resetWarnings() { - warnLocal.remove(); + set(null); } public static class State diff --cc test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java index 17253ec,43c0fdf..adb0962 --- a/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java @@@ -29,6 -32,8 +29,7 @@@ import java.util.function.Supplier import com.google.common.base.Throwables; import com.google.common.net.InetAddresses; -import com.google.common.util.concurrent.ListenableFutureTask; + import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@@ -38,11 -44,12 +40,16 @@@ import org.apache.cassandra.service.Cli import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.TraceStateImpl; import org.apache.cassandra.tracing.Tracing; + import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; + import org.assertj.core.api.Assertions; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; + + import static java.util.concurrent.TimeUnit.MILLISECONDS; ++import static org.assertj.core.api.Assertions.assertThat; + public class DebuggableThreadPoolExecutorTest { @BeforeClass @@@ -75,14 -87,82 +82,104 @@@ } @Test + public void testLocalStatePropagation() + { - DebuggableThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("TEST", 1); ++ ExecutorPlus executor = executorFactory().localAware().sequential("TEST"); ++ assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class); ++ try ++ { ++ checkLocalStateIsPropagated(executor); ++ } ++ finally ++ { ++ executor.shutdown(); ++ } ++ } ++ ++ @Test ++ public void testNoLocalStatePropagation() throws InterruptedException ++ { ++ ExecutorPlus executor = executorFactory().sequential("TEST"); ++ assertThat(executor).isNotInstanceOf(LocalAwareExecutorPlus.class); + try + { + checkLocalStateIsPropagated(executor); + } + finally + { + executor.shutdown(); + } + } + - public static void checkLocalStateIsPropagated(LocalAwareExecutorService executor) ++ public static void checkLocalStateIsPropagated(ExecutorPlus executor) + { + checkClientWarningsArePropagated(executor, () -> executor.execute(() -> ClientWarn.instance.warn("msg"))); + checkClientWarningsArePropagated(executor, () -> executor.submit(() -> ClientWarn.instance.warn("msg"))); + checkClientWarningsArePropagated(executor, () -> executor.submit(() -> ClientWarn.instance.warn("msg"), null)); + checkClientWarningsArePropagated(executor, () -> executor.submit((Callable<Void>) () -> { + ClientWarn.instance.warn("msg"); + return null; + })); + + checkTracingIsPropagated(executor, () -> executor.execute(() -> Tracing.trace("msg"))); + checkTracingIsPropagated(executor, () -> executor.submit(() -> Tracing.trace("msg"))); + checkTracingIsPropagated(executor, () -> executor.submit(() -> Tracing.trace("msg"), null)); + checkTracingIsPropagated(executor, () -> executor.submit((Callable<Void>) () -> { + Tracing.trace("msg"); + return null; + })); + } + - public static void checkClientWarningsArePropagated(LocalAwareExecutorService executor, Runnable schedulingTask) { ++ public static void checkClientWarningsArePropagated(ExecutorPlus executor, Runnable schedulingTask) { + ClientWarn.instance.captureWarnings(); - Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty(); ++ assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty(); + + ClientWarn.instance.warn("msg0"); + long initCompletedTasks = executor.getCompletedTaskCount(); + schedulingTask.run(); + while (executor.getCompletedTaskCount() == initCompletedTasks) Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS); + ClientWarn.instance.warn("msg1"); + - Assertions.assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg", "msg1"); ++ if (executor instanceof LocalAwareExecutorPlus) ++ assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg", "msg1"); ++ else ++ assertThat(ClientWarn.instance.getWarnings()).containsExactlyInAnyOrder("msg0", "msg1"); + } + - public static void checkTracingIsPropagated(LocalAwareExecutorService executor, Runnable schedulingTask) { ++ public static void checkTracingIsPropagated(ExecutorPlus executor, Runnable schedulingTask) { + ClientWarn.instance.captureWarnings(); - Assertions.assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty(); ++ assertThat(ClientWarn.instance.getWarnings()).isNullOrEmpty(); + + ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<>(); + Tracing.instance.set(new TraceState(FBUtilities.getLocalAddressAndPort(), UUID.randomUUID(), Tracing.TraceType.NONE) + { + @Override + protected void traceImpl(String message) + { + q.add(message); + } + }); + Tracing.trace("msg0"); + long initCompletedTasks = executor.getCompletedTaskCount(); + schedulingTask.run(); + while (executor.getCompletedTaskCount() == initCompletedTasks) Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS); + Tracing.trace("msg1"); + - Assertions.assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg", "msg1"); ++ if (executor instanceof LocalAwareExecutorPlus) ++ assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg", "msg1"); ++ else ++ assertThat(q.toArray()).containsExactlyInAnyOrder("msg0", "msg1"); + } + + @Test public void testExecuteFutureTaskWhileTracing() { - LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1); - DebuggableThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(1, - Integer.MAX_VALUE, - TimeUnit.MILLISECONDS, - q, - new NamedThreadFactory("TEST")); + SettableUncaughtExceptionHandler ueh = new SettableUncaughtExceptionHandler(); + ExecutorPlus executor = executorFactory() + .localAware() + .configureSequential("TEST") + .withUncaughtExceptionHandler(ueh) + .withQueueLimit(1).build(); Runnable test = () -> executor.execute(failingTask()); try { diff --cc test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index 2a8aeb9,97e389c..7a682ed --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@@ -36,6 -38,7 +38,8 @@@ import org.apache.cassandra.utils.FBUti import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; + import static org.apache.cassandra.concurrent.DebuggableThreadPoolExecutorTest.checkLocalStateIsPropagated; ++import static org.assertj.core.api.Assertions.assertThat; public class SEPExecutorTest { @@@ -260,4 -270,20 +270,21 @@@ // Will return true if all of the LatchWaiters count down before the timeout Assert.assertTrue("Test tasks did not hit max concurrency goal", concurrencyGoal.await(3L, TimeUnit.SECONDS)); } + + @Test + public void testLocalStatePropagation() throws InterruptedException, TimeoutException + { + SharedExecutorPool sharedPool = new SharedExecutorPool("TestPool"); + try + { - LocalAwareExecutorService executor = sharedPool.newExecutor(1, "TEST", "TEST"); ++ LocalAwareExecutorPlus executor = sharedPool.newExecutor(1, "TEST", "TEST"); ++ assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class); + checkLocalStateIsPropagated(executor); + } + finally + { + sharedPool.shutdownAndWait(1, TimeUnit.SECONDS); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
