This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 9157d98e4c Streaming exception race creates corrupt transaction log 
files that prevent restart
9157d98e4c is described below

commit 9157d98e4cc5c00d74cef6128c16659ff43f3585
Author: Jon Meredith <[email protected]>
AuthorDate: Thu Apr 25 13:53:28 2024 -0600

    Streaming exception race creates corrupt transaction log files that prevent 
restart
    
    patch by Jon Meredith; reviewed by Caleb Rackliffe, David Capwell for 
CASSANDRA-18736
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/lifecycle/LogFile.java |  36 ++--
 .../cassandra/db/lifecycle/LogReplicaSet.java      |   6 +-
 .../TransactionAlreadyCompletedException.java      |  36 ++++
 .../apache/cassandra/streaming/StreamSession.java  |  20 ++
 .../streaming/StreamFailedWhileReceivingTest.java  | 208 +++++++++++++++++++++
 test/unit/org/apache/cassandra/Util.java           |   2 +
 .../cassandra/db/lifecycle/LogTransactionTest.java |  68 ++++++-
 8 files changed, 353 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f9a61a4237..64c63912ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * Streaming exception race creates corrupt transaction log files that prevent 
restart (CASSANDRA-18736)
  * Fix CQL tojson timestamp output on negative timestamp values before 
Gregorian calendar reform in 1582 (CASSANDRA-19566)
  * Fix few types issues and implement types compatibility tests 
(CASSANDRA-19479)
  * Optionally avoid hint transfer during decommission (CASSANDRA-19525)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 9053034dba..d67019008f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -27,6 +27,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
@@ -53,6 +55,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
  *
  * @see LogTransaction
  */
+@NotThreadSafe
 final class LogFile implements AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LogFile.class);
@@ -66,8 +69,9 @@ final class LogFile implements AutoCloseable
     private final LogReplicaSet replicas = new LogReplicaSet();
 
     // The transaction records, this set must be ORDER PRESERVING
-    private final Set<LogRecord> records = Collections.synchronizedSet(new 
LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
-    private final Set<LogRecord> onDiskRecords = 
Collections.synchronizedSet(new LinkedHashSet<>());
+    private final Set<LogRecord> records = new LinkedHashSet<>();
+    private final Set<LogRecord> onDiskRecords = new LinkedHashSet<>();
+    private boolean completed = false;
 
     // The type of the transaction
     private final OperationType type;
@@ -123,6 +127,11 @@ final class LogFile implements AutoCloseable
 
             deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
 
+            // safe to release memory for both types of records, the completed 
flag will prevent
+            // new records being added.
+            records.clear();
+            onDiskRecords.clear();
+
             // we sync the parent directories between contents and log deletion
             // to ensure there is a happens before edge between them
             Throwables.maybeFail(syncDirectory(accumulate));
@@ -162,6 +171,11 @@ final class LogFile implements AutoCloseable
             logger.error("Failed to read records for transaction log {}", 
this);
             return false;
         }
+        LogRecord lastRecord = getLastRecord();
+        if (lastRecord != null &&
+            (lastRecord.type == Type.COMMIT || lastRecord.type == Type.ABORT) 
&&
+            lastRecord.isValid())
+            completed = true;
 
         Set<String> absolutePaths = new HashSet<>();
         for (LogRecord record : records)
@@ -276,11 +290,13 @@ final class LogFile implements AutoCloseable
     void commit()
     {
         addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
+        completed = true;
     }
 
     void abort()
     {
         addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
+        completed = true;
     }
 
     private boolean isLastRecordValidWithType(Type type)
@@ -296,14 +312,9 @@ final class LogFile implements AutoCloseable
         return isLastRecordValidWithType(Type.COMMIT);
     }
 
-    boolean aborted()
-    {
-        return isLastRecordValidWithType(Type.ABORT);
-    }
-
     boolean completed()
     {
-        return committed() || aborted();
+        return completed;
     }
 
     void add(SSTable table)
@@ -353,8 +364,8 @@ final class LogFile implements AutoCloseable
 
     void addRecord(LogRecord record)
     {
-        if (completed())
-            throw new IllegalStateException("Transaction already completed");
+        if (completed)
+            throw TransactionAlreadyCompletedException.create(getFiles());
 
         if (records.contains(record))
             throw new IllegalStateException("Record already exists");
@@ -367,6 +378,9 @@ final class LogFile implements AutoCloseable
 
     void remove(SSTable table)
     {
+        if (completed)
+            throw TransactionAlreadyCompletedException.create(getFiles());
+
         LogRecord record = makeAddRecord(table);
         assert records.contains(record) : String.format("[%s] is not tracked 
by %s", record, id);
         assert record.absolutePath.isPresent();
@@ -401,8 +415,6 @@ final class LogFile implements AutoCloseable
 
         for (List<File> toDelete : existingFiles.values())
             LogFile.deleteRecordFiles(toDelete);
-
-        records.clear();
     }
 
     private static void deleteRecordFiles(List<File> existingFiles)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 0295357e8f..3fa4d95579 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +27,8 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,11 +44,12 @@ import org.apache.cassandra.utils.Throwables;
  * @see LogReplica
  * @see LogFile
  */
+@NotThreadSafe
 public class LogReplicaSet implements AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LogReplicaSet.class);
 
-    private final Map<File, LogReplica> replicasByFile = 
Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix 
CASSANDRA-14554
+    private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
 
     private Collection<LogReplica> replicas()
     {
diff --git 
a/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
 
b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
new file mode 100644
index 0000000000..0ee3c3e5cb
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.List;
+
+
+public class TransactionAlreadyCompletedException extends IllegalStateException
+{
+    private TransactionAlreadyCompletedException(List<File> files)
+    {
+        super("Transaction already completed. " + files);
+    }
+
+    static TransactionAlreadyCompletedException create(List<File> files)
+    {
+        return new TransactionAlreadyCompletedException(files);
+    }
+}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a2e479c524..21d5afe480 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -36,6 +36,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
@@ -560,6 +561,16 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return state == State.COMPLETE;
     }
 
+    /**
+     * Return if this session was failed or aborted
+     *
+     * @return true if session was failed or aborted
+     */
+    public boolean isFailedOrAborted()
+    {
+        return state == State.FAILED || state == State.ABORTED;
+    }
+
     public synchronized void messageReceived(StreamMessage message)
     {
         if (message.type != StreamMessage.Type.KEEP_ALIVE)
@@ -650,6 +661,15 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 return closeSession(State.FAILED);
             }
         }
+        else if (e instanceof TransactionAlreadyCompletedException && 
isFailedOrAborted())
+        {
+            // StreamDeserializer threads may actively be writing SSTables 
when the stream
+            // is failed or canceled, which aborts the lifecycle transaction 
and throws an exception
+            // when any new SSTable is added.  Since the stream has already 
failed, suppress
+            // extra streaming log failure messages.
+            logger.debug("Stream lifecycle transaction already completed after 
stream failure (ignore)", e);
+            return null;
+        }
 
         logError(e);
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
new file mode 100644
index 0000000000..7cd346cd9a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.db.streaming.CassandraStreamManager;
+import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+/** This is a somewhat brittle test to demonstrate transaction log corruption
+    when streaming is aborted as streamed sstables are added to the
+    transaction log concurrently.
+
+    The transaction log should not be modified after streaming
+    has aborted or completed it.
+*/
+public class StreamFailedWhileReceivingTest extends TestBaseImpl
+{
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        streamClose(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        streamClose(false);
+    }
+
+    private void streamClose(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      
.withInstanceInitializer(BBHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set("stream_entire_sstables", zeroCopyStreaming)
+                                                        
.set("autocompaction_on_startup_enabled", false))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int 
PRIMARY KEY)"));
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            for (int i = 1; i <= 100; i++)
+                node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) 
VALUES (?)"), i);
+            node1.flush(KEYSPACE);
+
+            // trigger streaming; expected to fail as streaming socket closed 
in the middle (currently this is an unrecoverable event)
+            node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+
+            node2.runOnInstance(() -> {
+                try
+                {
+                    // use the startup logic to check for corrupt txn logfiles 
from the streaming failure
+                    // quicker than restarting the instance to check
+                    
ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE,
 "tbl"));
+                }
+                catch (StartupException ex)
+                {
+                    throw new RuntimeException(ex);
+                }
+            });
+        }
+    }
+
+
+    public static class BBHelper
+    {
+        static volatile StreamSession firstSession;
+        static CountDownLatch firstStreamAbort = new CountDownLatch(1); // 
per-instance
+
+        // CassandraStreamManager.prepareIncomingStream
+        @SuppressWarnings("unused")
+        public static IncomingStream prepareIncomingStream(StreamSession 
session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper) 
throws Exception
+        {
+            if (firstSession == null)
+                firstSession = session;
+            return zuper.call();
+        }
+
+        // CassandraStreamReceiver.abort
+        @SuppressWarnings("unused")
+        public static void abort(@SuperCall Callable<Integer> zuper) throws 
Exception
+        {
+            firstStreamAbort.countDown();
+            zuper.call();
+        }
+
+        // RangeAwareSSTableWriter.append
+        @SuppressWarnings("unused")
+        public static boolean append(UnfilteredRowIterator partition, 
@SuperCall Callable<Boolean> zuper) throws Exception
+        {
+            // handles compressed and non-compressed
+            if (isCaller(CassandraIncomingFile.class.getName(), "read"))
+             {
+                if (firstSession != null)
+                {
+                    firstSession.abort();
+                    // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
+                    firstStreamAbort.await(1, TimeUnit.MINUTES);
+                }
+             }
+            return zuper.call();
+        }
+
+        // ColumnFamilyStore.createWriter - for entire sstable streaming, 
before adding to LogTransaction
+        @SuppressWarnings("unused")
+        public static Descriptor newSSTableDescriptor(File directory, Version 
version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper) 
throws Exception
+        {
+            if (isCaller(CassandraEntireSSTableStreamReader.class.getName(), 
"read"))
+            // handles compressed and non-compressed
+            {
+                if (firstSession != null)
+                {
+                    firstSession.abort();
+                    // delay here until CassandraStreamReceiver abort is 
called on NonPeriodic tasks
+                    firstStreamAbort.await(1, TimeUnit.MINUTES);
+                }
+            }
+            return zuper.call();
+        }
+
+        private static boolean isCaller(String klass, String method)
+        {
+            StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+            for (int i = 0; i < stack.length; i++)
+            {
+                StackTraceElement e = stack[i];
+                if (klass.equals(e.getClassName()) && 
method.equals(e.getMethodName()))
+                    return true;
+            }
+            return false;
+        }
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num != 2) // only target the second instance
+                return;
+
+            new ByteBuddy().rebase(CassandraStreamManager.class)
+                           
.method(named("prepareIncomingStream").and(takesArguments(2)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                           .method(named("append").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            new ByteBuddy().rebase(ColumnFamilyStore.class)
+                           
.method(named("newSSTableDescriptor").and(takesArguments(3)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            new ByteBuddy().rebase(CassandraStreamReceiver.class)
+                           .method(named("abort").and(takesArguments(0)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index caeb88c35c..ab1df99a20 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -23,8 +23,10 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOError;
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.Callable;
diff --git 
a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java 
b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index a4e74ce0d7..7d1cb39ae3 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -21,7 +21,9 @@ import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.io.UncheckedIOException;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,6 +34,7 @@ import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -45,15 +48,21 @@ import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
 import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
@@ -1359,6 +1368,28 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
         return listFiles(folder, Directories.FileType.FINAL);
     }
 
+    // Used by listFiles - this test is deliberately racing with files being
+    // removed and cleaned up, so it is possible that files are removed 
between listing and getting their
+    // canonical names, in which case they can be dropped from the stream.
+    private static Stream<File> toCanonicalIgnoringNotFound(File file)
+    {
+        try
+        {
+            return Stream.of(file.getCanonicalFile());
+        }
+        catch (IOException io)
+        {
+            if (Throwables.isCausedBy(io, t -> t instanceof 
NoSuchFileException))
+            {
+                return Stream.empty();
+            }
+            else
+            {
+                throw new UncheckedIOException(io);
+            }
+        }
+    }
+
     static Set<File> listFiles(File folder, Directories.FileType... types)
     {
         Collection<Directories.FileType> match = Arrays.asList(types);
@@ -1366,16 +1397,33 @@ public class LogTransactionTest extends 
AbstractTransactionalTest
                                       (file, type) -> match.contains(type),
                                       Directories.OnTxnErr.IGNORE).list()
                        .stream()
-                       .map(f -> {
-                           try
-                           {
-                               return f.getCanonicalFile();
-                           }
-                           catch (IOException e)
-                           {
-                               throw new IOError(e);
-                           }
-                       })
+                       
.flatMap(LogTransactionTest::toCanonicalIgnoringNotFound)
                        .collect(Collectors.toSet());
     }
+
+    final String DUMMY_KS = "ks";
+    final String DUMMY_TBL = "tbl";
+    final File dir = new File(".");
+    int nextGeneration = 1;
+    final Set<Component> dummyComponents = 
Collections.singleton(Component.DATA);
+    final TableMetadataRef dummyMetadata = 
TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL));
+    final DiskOptimizationStrategy dummyOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
+
+    SSTable dummySSTable()
+    {
+        int id = nextGeneration;
+        nextGeneration++;
+        Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id);
+        return new SSTable(descriptor, dummyComponents, dummyMetadata, 
dummyOptimizationStrategy)
+        {
+        };
+    }
+
+    @Test(expected = TransactionAlreadyCompletedException.class)
+    public void useAfterCompletedTest()
+    {
+        LogTransaction txnFile = new LogTransaction(OperationType.STREAM);
+        txnFile.abort(); // this should complete the txn
+        txnFile.trackNew(dummySSTable()); // expect an IllegalStateException 
here
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to