This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6559983d6c00e964de4316a571df9482d4af854e
Merge: 93692a4b17 858a2b2321
Author: Jon Meredith <[email protected]>
AuthorDate: Thu Apr 25 14:00:52 2024 -0600

    Merge branch 'cassandra-4.1' into cassandra-5.0

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/lifecycle/LogFile.java |  36 ++--
 .../cassandra/db/lifecycle/LogReplicaSet.java      |   6 +-
 .../TransactionAlreadyCompletedException.java      |  36 ++++
 .../apache/cassandra/streaming/StreamSession.java  |  22 ++-
 .../streaming/StreamFailedWhileReceivingTest.java  | 208 +++++++++++++++++++++
 .../cassandra/db/lifecycle/LogTransactionTest.java |  40 +++-
 7 files changed, 333 insertions(+), 16 deletions(-)

diff --cc CHANGES.txt
index 1d51d2d27d,23dbee1941..ee0696f40a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -36,11 -2,11 +36,12 @@@ Merged from 4.1
   * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495)
   * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
   * Fix system_views.settings to handle array types (CASSANDRA-19475)
 + * Memoize Cassandra verion and add a backoff interval for failed schema 
pulls (CASSANDRA-18902)
 + * Fix StackOverflowError on ALTER after many previous schema changes 
(CASSANDRA-19166)
  Merged from 4.0:
+  * Streaming exception race creates corrupt transaction log files that 
prevent restart (CASSANDRA-18736)
   * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
   * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
 - * Optionally avoid hint transfer during decommission (CASSANDRA-19525)
   * Change logging to TRACE when failing to get peer certificate 
(CASSANDRA-19508)
   * Push LocalSessions info logs to debug (CASSANDRA-18335)
   * Filter remote DC replicas out when constructing the initial replica plan 
for the local read repair (CASSANDRA-19120)
diff --cc src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index abfb55a601,9db3cb477c..9decc248b9
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@@ -35,8 -26,12 +35,10 @@@ import java.util.regex.Matcher
  import java.util.regex.Pattern;
  import java.util.stream.Collectors;
  
+ import javax.annotation.concurrent.NotThreadSafe;
+ 
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterables;
 -
 -import org.apache.cassandra.io.util.File;
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index d76f1f6d25,17ceca137c..a942b9ee9a
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -50,18 -47,15 +50,19 @@@ import io.netty.util.concurrent.Future
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.compaction.CompactionStrategyManager;
+ import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.gms.*;
 +import org.apache.cassandra.io.util.File;
  import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
  import org.apache.cassandra.locator.Replica;
  import org.apache.cassandra.metrics.StreamingMetrics;
  import org.apache.cassandra.schema.TableId;
@@@ -703,20 -684,29 +714,29 @@@ public class StreamSessio
                               planId(),
                               peer.getHostAddressAndPort(),
                               e);
 -
 -                return closeSession(State.FAILED);
 +                return closeSession(State.FAILED, "Failed because there was 
an " + e.getClass().getCanonicalName() + " with state=" + state.name());
              }
          }
+         else if (e instanceof TransactionAlreadyCompletedException && 
isFailedOrAborted())
+         {
+             // StreamDeserializer threads may actively be writing SSTables 
when the stream
+             // is failed or canceled, which aborts the lifecycle transaction 
and throws an exception
+             // when any new SSTable is added.  Since the stream has already 
failed, suppress
+             // extra streaming log failure messages.
+             logger.debug("Stream lifecycle transaction already completed 
after stream failure (ignore)", e);
+             return null;
+         }
  
          logError(e);
  
          if (channel.connected())
          {
-             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state 
+             state(State.FAILED); // make sure subsequent error handling sees 
the session in a final state
              channel.sendControlMessage(new 
SessionFailedMessage()).awaitUninterruptibly();
          }
 -
 -        return closeSession(State.FAILED);
 +        StringBuilder failureReason = new StringBuilder("Failed because of an 
unknown exception\n");
 +        boundStackTrace(e, DEBUG_STACKTRACE_LIMIT, failureReason);
 +        return closeSession(State.FAILED, failureReason.toString());
      }
  
      private void logError(Throwable e)
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
index 0000000000,99dfd440b7..45cd92addf
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
@@@ -1,0 -1,208 +1,208 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.distributed.test.streaming;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.junit.Test;
+ 
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.MethodDelegation;
+ import net.bytebuddy.implementation.bind.annotation.SuperCall;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+ import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+ import org.apache.cassandra.db.streaming.CassandraStreamManager;
+ import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.test.TestBaseImpl;
+ import org.apache.cassandra.exceptions.StartupException;
+ import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
++import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.format.Version;
+ import org.apache.cassandra.io.util.File;
+ import org.apache.cassandra.schema.Schema;
+ import org.apache.cassandra.streaming.IncomingStream;
+ import org.apache.cassandra.streaming.StreamSession;
+ import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+ import org.apache.cassandra.utils.concurrent.CountDownLatch;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ 
+ /** This is a somewhat brittle test to demonstrate transaction log corruption
+     when streaming is aborted as streamed sstables are added to the
+     transaction log concurrently.
+ 
+     The transaction log should not be modified after streaming
+     has aborted or completed it.
+ */
+ public class StreamFailedWhileReceivingTest extends TestBaseImpl
+ {
+     @Test
+     public void zeroCopy() throws IOException
+     {
+         streamClose(true);
+     }
+ 
+     @Test
+     public void notZeroCopy() throws IOException
+     {
+         streamClose(false);
+     }
+ 
+     private void streamClose(boolean zeroCopyStreaming) throws IOException
+     {
+         try (Cluster cluster = Cluster.build(2)
+                                       
.withInstanceInitializer(BBHelper::install)
+                                       .withConfig(c -> 
c.with(Feature.values())
+                                                         
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                         
.set("autocompaction_on_startup_enabled", false))
+                                       .start())
+         {
+             init(cluster);
+ 
+             cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+             IInvokableInstance node1 = cluster.get(1);
+             IInvokableInstance node2 = cluster.get(2);
+             for (int i = 1; i <= 100; i++)
+                 node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) 
VALUES (?)"), i);
+             node1.flush(KEYSPACE);
+ 
+             // trigger streaming; expected to fail as streaming socket closed 
in the middle (currently this is an unrecoverable event)
+             node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+ 
+             node2.runOnInstance(() -> {
+                 try
+                 {
+                     // use the startup logic to check for corrupt txn 
logfiles from the streaming failure
+                     // quicker than restarting the instance to check
+                     
ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE,
 "tbl"));
+                 }
+                 catch (StartupException ex)
+                 {
+                     throw new RuntimeException(ex);
+                 }
+             });
+         }
+     }
+ 
+ 
+     public static class BBHelper
+     {
+         static volatile StreamSession firstSession;
+         static CountDownLatch firstStreamAbort = 
CountDownLatch.newCountDownLatch(1); // per-instance
+ 
+         // CassandraStreamManager.prepareIncomingStream
+         @SuppressWarnings("unused")
+         public static IncomingStream prepareIncomingStream(StreamSession 
session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper) 
throws Exception
+         {
+             if (firstSession == null)
+                 firstSession = session;
+             return zuper.call();
+         }
+ 
+         // CassandraStreamReceiver.abort
+         @SuppressWarnings("unused")
+         public static void abort(@SuperCall Callable<Integer> zuper) throws 
Exception
+         {
+             firstStreamAbort.decrement();
+             zuper.call();
+         }
+ 
+         // RangeAwareSSTableWriter.append
+         @SuppressWarnings("unused")
+         public static boolean append(UnfilteredRowIterator partition, 
@SuperCall Callable<Boolean> zuper) throws Exception
+         {
+             // handles compressed and non-compressed
+             if (isCaller(CassandraIncomingFile.class.getName(), "read"))
+              {
+                 if (firstSession != null)
+                 {
+                     firstSession.abort();
+                     // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
+                     firstStreamAbort.awaitUninterruptibly(1, 
TimeUnit.MINUTES);
+                 }
+              }
+             return zuper.call();
+         }
+ 
 -        // ColumnFamilyStore.createWriter - for entire sstable streaming, 
before adding to LogTransaction
++        // ColumnFamilyStore.newSSTableDescriptor - for entire sstable 
streaming, before adding to LogTransaction
+         @SuppressWarnings("unused")
 -        public static Descriptor newSSTableDescriptor(File directory, Version 
version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper) 
throws Exception
++        public static Descriptor newSSTableDescriptor(File directory, Version 
version, @SuperCall Callable<Descriptor> zuper) throws Exception
+         {
+             if (isCaller(CassandraEntireSSTableStreamReader.class.getName(), 
"read"))
+             // handles compressed and non-compressed
+             {
+                 if (firstSession != null)
+                 {
+                     firstSession.abort();
+                     // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
+                     firstStreamAbort.awaitUninterruptibly(1, 
TimeUnit.MINUTES);
+                 }
+             }
+             return zuper.call();
+         }
+ 
+         private static boolean isCaller(String klass, String method)
+         {
+             StackTraceElement[] stack = 
Thread.currentThread().getStackTrace();
+             for (int i = 0; i < stack.length; i++)
+             {
+                 StackTraceElement e = stack[i];
+                 if (klass.equals(e.getClassName()) && 
method.equals(e.getMethodName()))
+                     return true;
+             }
+             return false;
+         }
+ 
+         public static void install(ClassLoader classLoader, Integer num)
+         {
+             if (num != 2) // only target the second instance
+                 return;
+ 
+             new ByteBuddy().rebase(CassandraStreamManager.class)
+                            
.method(named("prepareIncomingStream").and(takesArguments(2)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                            .method(named("append").and(takesArguments(1)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(ColumnFamilyStore.class)
+                            
.method(named("newSSTableDescriptor").and(takesArguments(3)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+             new ByteBuddy().rebase(CassandraStreamReceiver.class)
+                            .method(named("abort").and(takesArguments(0)))
+                            .intercept(MethodDelegation.to(BBHelper.class))
+                            .make()
+                            .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 710c869fab,88164555cb..c70b0ea0de
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@@ -47,16 -46,13 +48,21 @@@ import org.apache.cassandra.db.Decorate
  import org.apache.cassandra.db.Directories;
  import org.apache.cassandra.db.SerializationHeader;
  import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.dht.AbstractBounds;
++import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.SSTable;
+ import org.apache.cassandra.io.sstable.SSTableId;
  import org.apache.cassandra.io.sstable.SequenceBasedSSTableId;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.big.BigFormat;
 +import org.apache.cassandra.io.sstable.format.big.BigFormat.Components;
 +import org.apache.cassandra.io.sstable.format.big.BigTableReader;
 +import org.apache.cassandra.io.sstable.format.bti.BtiFormat;
 +import org.apache.cassandra.io.sstable.format.bti.BtiTableReader;
 +import org.apache.cassandra.io.sstable.format.bti.PartitionIndex;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.sstable.metadata.MetadataType;
  import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
@@@ -1440,4 -1400,29 +1446,36 @@@ public class LogTransactionTest extend
                         
.flatMap(LogTransactionTest::toCanonicalIgnoringNotFound)
                         .collect(Collectors.toSet());
      }
- }
+ 
 -    final String DUMMY_KS = "ks";
 -    final String DUMMY_TBL = "tbl";
++    static final String DUMMY_KS = "ks";
++    static final String DUMMY_TBL = "tbl";
+     final File dir = new File(".");
+     Supplier<SequenceBasedSSTableId> idSupplier = 
SequenceBasedSSTableId.Builder.instance.generator(Stream.of());
 -    final Set<Component> dummyComponents = 
Collections.singleton(Component.DATA);
 -    final TableMetadataRef dummyMetadata = 
TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL));
 -    final DiskOptimizationStrategy dummyOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
++    final Set<Component> dummyComponents = 
Collections.singleton(SSTableFormat.Components.DATA);
+ 
+     SSTable dummySSTable()
+     {
+         SSTableId id = idSupplier.get();
+         Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id);
 -        return new SSTable(descriptor, dummyComponents, dummyMetadata, 
dummyOptimizationStrategy)
++        SSTable.Builder<?, ?> builder = new SSTable.Builder<>(descriptor);
++        builder.setComponents(dummyComponents);
++        return new SSTable(builder, null)
+         {
++            @Override
++            public DecoratedKey getFirst() { return null; }
++            @Override
++            public DecoratedKey getLast() { return null; }
++            @Override
++            public AbstractBounds<Token> getBounds() { return null; }
+         };
+     }
+ 
+     @Test(expected = TransactionAlreadyCompletedException.class)
+     public void useAfterCompletedTest()
+     {
 -        LogTransaction txnFile = new LogTransaction(OperationType.STREAM);
 -        txnFile.abort(); // this should complete the txn
 -        txnFile.trackNew(dummySSTable()); // expect an IllegalStateException 
here
 -    }
 -}
++        try (LogTransaction txnFile = new 
LogTransaction(OperationType.STREAM))
++        {
++            txnFile.abort(); // this should complete the txn
++            txnFile.trackNew(dummySSTable()); // expect an 
IllegalStateException here
++        }
++    }}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to