This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6559983d6c00e964de4316a571df9482d4af854e Merge: 93692a4b17 858a2b2321 Author: Jon Meredith <[email protected]> AuthorDate: Thu Apr 25 14:00:52 2024 -0600 Merge branch 'cassandra-4.1' into cassandra-5.0 CHANGES.txt | 1 + .../org/apache/cassandra/db/lifecycle/LogFile.java | 36 ++-- .../cassandra/db/lifecycle/LogReplicaSet.java | 6 +- .../TransactionAlreadyCompletedException.java | 36 ++++ .../apache/cassandra/streaming/StreamSession.java | 22 ++- .../streaming/StreamFailedWhileReceivingTest.java | 208 +++++++++++++++++++++ .../cassandra/db/lifecycle/LogTransactionTest.java | 40 +++- 7 files changed, 333 insertions(+), 16 deletions(-) diff --cc CHANGES.txt index 1d51d2d27d,23dbee1941..ee0696f40a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -36,11 -2,11 +36,12 @@@ Merged from 4.1 * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495) * Do not go to disk for reading hints file sizes (CASSANDRA-19477) * Fix system_views.settings to handle array types (CASSANDRA-19475) + * Memoize Cassandra verion and add a backoff interval for failed schema pulls (CASSANDRA-18902) + * Fix StackOverflowError on ALTER after many previous schema changes (CASSANDRA-19166) Merged from 4.0: + * Streaming exception race creates corrupt transaction log files that prevent restart (CASSANDRA-18736) * Fix CQL tojson timestamp output on negative timestamp values before Gregorian calendar reform in 1582 (CASSANDRA-19566) * Fix few types issues and implement types compatibility tests (CASSANDRA-19479) - * Optionally avoid hint transfer during decommission (CASSANDRA-19525) * Change logging to TRACE when failing to get peer certificate (CASSANDRA-19508) * Push LocalSessions info logs to debug (CASSANDRA-18335) * Filter remote DC replicas out when constructing the initial replica plan for the local read repair (CASSANDRA-19120) diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java index abfb55a601,9db3cb477c..9decc248b9 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@@ -35,8 -26,12 +35,10 @@@ import java.util.regex.Matcher import java.util.regex.Pattern; import java.util.stream.Collectors; + import javax.annotation.concurrent.NotThreadSafe; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; - -import org.apache.cassandra.io.util.File; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java index d76f1f6d25,17ceca137c..a942b9ee9a --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@@ -50,18 -47,15 +50,19 @@@ import io.netty.util.concurrent.Future import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.CompactionStrategyManager; + import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.schema.TableId; @@@ -703,20 -684,29 +714,29 @@@ public class StreamSessio planId(), peer.getHostAddressAndPort(), e); - - return closeSession(State.FAILED); + return closeSession(State.FAILED, "Failed because there was an " + e.getClass().getCanonicalName() + " with state=" + state.name()); } } + else if (e instanceof TransactionAlreadyCompletedException && isFailedOrAborted()) + { + // StreamDeserializer threads may actively be writing SSTables when the stream + // is failed or canceled, which aborts the lifecycle transaction and throws an exception + // when any new SSTable is added. Since the stream has already failed, suppress + // extra streaming log failure messages. + logger.debug("Stream lifecycle transaction already completed after stream failure (ignore)", e); + return null; + } logError(e); if (channel.connected()) { - state(State.FAILED); // make sure subsequent error handling sees the session in a final state + state(State.FAILED); // make sure subsequent error handling sees the session in a final state channel.sendControlMessage(new SessionFailedMessage()).awaitUninterruptibly(); } - - return closeSession(State.FAILED); + StringBuilder failureReason = new StringBuilder("Failed because of an unknown exception\n"); + boundStackTrace(e, DEBUG_STACKTRACE_LIMIT, failureReason); + return closeSession(State.FAILED, failureReason.toString()); } private void logError(Throwable e) diff --cc test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java index 0000000000,99dfd440b7..45cd92addf mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java @@@ -1,0 -1,208 +1,208 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.distributed.test.streaming; + + import java.io.IOException; + import java.util.concurrent.Callable; + import java.util.concurrent.TimeUnit; + + import org.junit.Test; + + import net.bytebuddy.ByteBuddy; + import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; + import net.bytebuddy.implementation.MethodDelegation; + import net.bytebuddy.implementation.bind.annotation.SuperCall; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader; + import org.apache.cassandra.db.streaming.CassandraIncomingFile; + import org.apache.cassandra.db.streaming.CassandraStreamManager; + import org.apache.cassandra.db.streaming.CassandraStreamReceiver; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.test.TestBaseImpl; + import org.apache.cassandra.exceptions.StartupException; + import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; ++import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter; + import org.apache.cassandra.io.sstable.format.SSTableFormat; + import org.apache.cassandra.io.sstable.format.Version; + import org.apache.cassandra.io.util.File; + import org.apache.cassandra.schema.Schema; + import org.apache.cassandra.streaming.IncomingStream; + import org.apache.cassandra.streaming.StreamSession; + import org.apache.cassandra.streaming.messages.StreamMessageHeader; + import org.apache.cassandra.utils.concurrent.CountDownLatch; + + import static net.bytebuddy.matcher.ElementMatchers.named; + import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + + /** This is a somewhat brittle test to demonstrate transaction log corruption + when streaming is aborted as streamed sstables are added to the + transaction log concurrently. + + The transaction log should not be modified after streaming + has aborted or completed it. + */ + public class StreamFailedWhileReceivingTest extends TestBaseImpl + { + @Test + public void zeroCopy() throws IOException + { + streamClose(true); + } + + @Test + public void notZeroCopy() throws IOException + { + streamClose(false); + } + + private void streamClose(boolean zeroCopyStreaming) throws IOException + { + try (Cluster cluster = Cluster.build(2) + .withInstanceInitializer(BBHelper::install) + .withConfig(c -> c.with(Feature.values()) + .set("stream_entire_sstables", zeroCopyStreaming) + .set("autocompaction_on_startup_enabled", false)) + .start()) + { + init(cluster); + + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)")); + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + for (int i = 1; i <= 100; i++) + node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); + node1.flush(KEYSPACE); + + // trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event) + node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl"); + + node2.runOnInstance(() -> { + try + { + // use the startup logic to check for corrupt txn logfiles from the streaming failure + // quicker than restarting the instance to check + ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE, "tbl")); + } + catch (StartupException ex) + { + throw new RuntimeException(ex); + } + }); + } + } + + + public static class BBHelper + { + static volatile StreamSession firstSession; + static CountDownLatch firstStreamAbort = CountDownLatch.newCountDownLatch(1); // per-instance + + // CassandraStreamManager.prepareIncomingStream + @SuppressWarnings("unused") + public static IncomingStream prepareIncomingStream(StreamSession session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper) throws Exception + { + if (firstSession == null) + firstSession = session; + return zuper.call(); + } + + // CassandraStreamReceiver.abort + @SuppressWarnings("unused") + public static void abort(@SuperCall Callable<Integer> zuper) throws Exception + { + firstStreamAbort.decrement(); + zuper.call(); + } + + // RangeAwareSSTableWriter.append + @SuppressWarnings("unused") + public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception + { + // handles compressed and non-compressed + if (isCaller(CassandraIncomingFile.class.getName(), "read")) + { + if (firstSession != null) + { + firstSession.abort(); + // delay here until CassandraStreamReceiver abort is called on NonPeriodic tasks + firstStreamAbort.awaitUninterruptibly(1, TimeUnit.MINUTES); + } + } + return zuper.call(); + } + - // ColumnFamilyStore.createWriter - for entire sstable streaming, before adding to LogTransaction ++ // ColumnFamilyStore.newSSTableDescriptor - for entire sstable streaming, before adding to LogTransaction + @SuppressWarnings("unused") - public static Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper) throws Exception ++ public static Descriptor newSSTableDescriptor(File directory, Version version, @SuperCall Callable<Descriptor> zuper) throws Exception + { + if (isCaller(CassandraEntireSSTableStreamReader.class.getName(), "read")) + // handles compressed and non-compressed + { + if (firstSession != null) + { + firstSession.abort(); + // delay here until CassandraStreamReceiver abort is called on NonPeriodic tasks + firstStreamAbort.awaitUninterruptibly(1, TimeUnit.MINUTES); + } + } + return zuper.call(); + } + + private static boolean isCaller(String klass, String method) + { + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + for (int i = 0; i < stack.length; i++) + { + StackTraceElement e = stack[i]; + if (klass.equals(e.getClassName()) && method.equals(e.getMethodName())) + return true; + } + return false; + } + + public static void install(ClassLoader classLoader, Integer num) + { + if (num != 2) // only target the second instance + return; + + new ByteBuddy().rebase(CassandraStreamManager.class) + .method(named("prepareIncomingStream").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(RangeAwareSSTableWriter.class) + .method(named("append").and(takesArguments(1))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(ColumnFamilyStore.class) + .method(named("newSSTableDescriptor").and(takesArguments(3))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + new ByteBuddy().rebase(CassandraStreamReceiver.class) + .method(named("abort").and(takesArguments(0))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + } + } diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java index 710c869fab,88164555cb..c70b0ea0de --- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java @@@ -47,16 -46,13 +48,21 @@@ import org.apache.cassandra.db.Decorate import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.OperationType; ++import org.apache.cassandra.dht.AbstractBounds; ++import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; + import org.apache.cassandra.io.sstable.SSTable; + import org.apache.cassandra.io.sstable.SSTableId; import org.apache.cassandra.io.sstable.SequenceBasedSSTableId; + import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.io.sstable.format.big.BigFormat.Components; +import org.apache.cassandra.io.sstable.format.big.BigTableReader; +import org.apache.cassandra.io.sstable.format.bti.BtiFormat; +import org.apache.cassandra.io.sstable.format.bti.BtiTableReader; +import org.apache.cassandra.io.sstable.format.bti.PartitionIndex; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; @@@ -1440,4 -1400,29 +1446,36 @@@ public class LogTransactionTest extend .flatMap(LogTransactionTest::toCanonicalIgnoringNotFound) .collect(Collectors.toSet()); } - } + - final String DUMMY_KS = "ks"; - final String DUMMY_TBL = "tbl"; ++ static final String DUMMY_KS = "ks"; ++ static final String DUMMY_TBL = "tbl"; + final File dir = new File("."); + Supplier<SequenceBasedSSTableId> idSupplier = SequenceBasedSSTableId.Builder.instance.generator(Stream.of()); - final Set<Component> dummyComponents = Collections.singleton(Component.DATA); - final TableMetadataRef dummyMetadata = TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL)); - final DiskOptimizationStrategy dummyOptimizationStrategy = new SpinningDiskOptimizationStrategy(); ++ final Set<Component> dummyComponents = Collections.singleton(SSTableFormat.Components.DATA); + + SSTable dummySSTable() + { + SSTableId id = idSupplier.get(); + Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id); - return new SSTable(descriptor, dummyComponents, dummyMetadata, dummyOptimizationStrategy) ++ SSTable.Builder<?, ?> builder = new SSTable.Builder<>(descriptor); ++ builder.setComponents(dummyComponents); ++ return new SSTable(builder, null) + { ++ @Override ++ public DecoratedKey getFirst() { return null; } ++ @Override ++ public DecoratedKey getLast() { return null; } ++ @Override ++ public AbstractBounds<Token> getBounds() { return null; } + }; + } + + @Test(expected = TransactionAlreadyCompletedException.class) + public void useAfterCompletedTest() + { - LogTransaction txnFile = new LogTransaction(OperationType.STREAM); - txnFile.abort(); // this should complete the txn - txnFile.trackNew(dummySSTable()); // expect an IllegalStateException here - } -} ++ try (LogTransaction txnFile = new LogTransaction(OperationType.STREAM)) ++ { ++ txnFile.abort(); // this should complete the txn ++ txnFile.trackNew(dummySSTable()); // expect an IllegalStateException here ++ } ++ }} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
