This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 9157d98e4c Streaming exception race creates corrupt transaction log
files that prevent restart
9157d98e4c is described below
commit 9157d98e4cc5c00d74cef6128c16659ff43f3585
Author: Jon Meredith <[email protected]>
AuthorDate: Thu Apr 25 13:53:28 2024 -0600
Streaming exception race creates corrupt transaction log files that prevent
restart
patch by Jon Meredith; reviewed by Caleb Rackliffe, David Capwell for
CASSANDRA-18736
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/lifecycle/LogFile.java | 36 ++--
.../cassandra/db/lifecycle/LogReplicaSet.java | 6 +-
.../TransactionAlreadyCompletedException.java | 36 ++++
.../apache/cassandra/streaming/StreamSession.java | 20 ++
.../streaming/StreamFailedWhileReceivingTest.java | 208 +++++++++++++++++++++
test/unit/org/apache/cassandra/Util.java | 2 +
.../cassandra/db/lifecycle/LogTransactionTest.java | 68 ++++++-
8 files changed, 353 insertions(+), 24 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f9a61a4237..64c63912ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.13
+ * Streaming exception race creates corrupt transaction log files that prevent
restart (CASSANDRA-18736)
* Fix CQL tojson timestamp output on negative timestamp values before
Gregorian calendar reform in 1582 (CASSANDRA-19566)
* Fix few types issues and implement types compatibility tests
(CASSANDRA-19479)
* Optionally avoid hint transfer during decommission (CASSANDRA-19525)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 9053034dba..d67019008f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -27,6 +27,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.NotThreadSafe;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
@@ -53,6 +55,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
*
* @see LogTransaction
*/
+@NotThreadSafe
final class LogFile implements AutoCloseable
{
private static final Logger logger =
LoggerFactory.getLogger(LogFile.class);
@@ -66,8 +69,9 @@ final class LogFile implements AutoCloseable
private final LogReplicaSet replicas = new LogReplicaSet();
// The transaction records, this set must be ORDER PRESERVING
- private final Set<LogRecord> records = Collections.synchronizedSet(new
LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
- private final Set<LogRecord> onDiskRecords =
Collections.synchronizedSet(new LinkedHashSet<>());
+ private final Set<LogRecord> records = new LinkedHashSet<>();
+ private final Set<LogRecord> onDiskRecords = new LinkedHashSet<>();
+ private boolean completed = false;
// The type of the transaction
private final OperationType type;
@@ -123,6 +127,11 @@ final class LogFile implements AutoCloseable
deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
+ // safe to release memory for both types of records, the completed
flag will prevent
+ // new records being added.
+ records.clear();
+ onDiskRecords.clear();
+
// we sync the parent directories between contents and log deletion
// to ensure there is a happens before edge between them
Throwables.maybeFail(syncDirectory(accumulate));
@@ -162,6 +171,11 @@ final class LogFile implements AutoCloseable
logger.error("Failed to read records for transaction log {}",
this);
return false;
}
+ LogRecord lastRecord = getLastRecord();
+ if (lastRecord != null &&
+ (lastRecord.type == Type.COMMIT || lastRecord.type == Type.ABORT)
&&
+ lastRecord.isValid())
+ completed = true;
Set<String> absolutePaths = new HashSet<>();
for (LogRecord record : records)
@@ -276,11 +290,13 @@ final class LogFile implements AutoCloseable
void commit()
{
addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
+ completed = true;
}
void abort()
{
addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
+ completed = true;
}
private boolean isLastRecordValidWithType(Type type)
@@ -296,14 +312,9 @@ final class LogFile implements AutoCloseable
return isLastRecordValidWithType(Type.COMMIT);
}
- boolean aborted()
- {
- return isLastRecordValidWithType(Type.ABORT);
- }
-
boolean completed()
{
- return committed() || aborted();
+ return completed;
}
void add(SSTable table)
@@ -353,8 +364,8 @@ final class LogFile implements AutoCloseable
void addRecord(LogRecord record)
{
- if (completed())
- throw new IllegalStateException("Transaction already completed");
+ if (completed)
+ throw TransactionAlreadyCompletedException.create(getFiles());
if (records.contains(record))
throw new IllegalStateException("Record already exists");
@@ -367,6 +378,9 @@ final class LogFile implements AutoCloseable
void remove(SSTable table)
{
+ if (completed)
+ throw TransactionAlreadyCompletedException.create(getFiles());
+
LogRecord record = makeAddRecord(table);
assert records.contains(record) : String.format("[%s] is not tracked
by %s", record, id);
assert record.absolutePath.isPresent();
@@ -401,8 +415,6 @@ final class LogFile implements AutoCloseable
for (List<File> toDelete : existingFiles.values())
LogFile.deleteRecordFiles(toDelete);
-
- records.clear();
}
private static void deleteRecordFiles(List<File> existingFiles)
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 0295357e8f..3fa4d95579 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -28,6 +27,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import javax.annotation.concurrent.NotThreadSafe;
+
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,12 @@ import org.apache.cassandra.utils.Throwables;
* @see LogReplica
* @see LogFile
*/
+@NotThreadSafe
public class LogReplicaSet implements AutoCloseable
{
private static final Logger logger =
LoggerFactory.getLogger(LogReplicaSet.class);
- private final Map<File, LogReplica> replicasByFile =
Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix
CASSANDRA-14554
+ private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
private Collection<LogReplica> replicas()
{
diff --git
a/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
new file mode 100644
index 0000000000..0ee3c3e5cb
--- /dev/null
+++
b/src/java/org/apache/cassandra/db/lifecycle/TransactionAlreadyCompletedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.List;
+
+
+public class TransactionAlreadyCompletedException extends IllegalStateException
+{
+ private TransactionAlreadyCompletedException(List<File> files)
+ {
+ super("Transaction already completed. " + files);
+ }
+
+ static TransactionAlreadyCompletedException create(List<File> files)
+ {
+ return new TransactionAlreadyCompletedException(files);
+ }
+}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a2e479c524..21d5afe480 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -36,6 +36,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.TransactionAlreadyCompletedException;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
@@ -560,6 +561,16 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
return state == State.COMPLETE;
}
+ /**
+ * Return if this session was failed or aborted
+ *
+ * @return true if session was failed or aborted
+ */
+ public boolean isFailedOrAborted()
+ {
+ return state == State.FAILED || state == State.ABORTED;
+ }
+
public synchronized void messageReceived(StreamMessage message)
{
if (message.type != StreamMessage.Type.KEEP_ALIVE)
@@ -650,6 +661,15 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
return closeSession(State.FAILED);
}
}
+ else if (e instanceof TransactionAlreadyCompletedException &&
isFailedOrAborted())
+ {
+ // StreamDeserializer threads may actively be writing SSTables
when the stream
+ // is failed or canceled, which aborts the lifecycle transaction
and throws an exception
+ // when any new SSTable is added. Since the stream has already
failed, suppress
+ // extra streaming log failure messages.
+ logger.debug("Stream lifecycle transaction already completed after
stream failure (ignore)", e);
+ return null;
+ }
logError(e);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
new file mode 100644
index 0000000000..7cd346cd9a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailedWhileReceivingTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.db.streaming.CassandraStreamManager;
+import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+/** This is a somewhat brittle test to demonstrate transaction log corruption
+ when streaming is aborted as streamed sstables are added to the
+ transaction log concurrently.
+
+ The transaction log should not be modified after streaming
+ has aborted or completed it.
+*/
+public class StreamFailedWhileReceivingTest extends TestBaseImpl
+{
+ @Test
+ public void zeroCopy() throws IOException
+ {
+ streamClose(true);
+ }
+
+ @Test
+ public void notZeroCopy() throws IOException
+ {
+ streamClose(false);
+ }
+
+ private void streamClose(boolean zeroCopyStreaming) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+
.set("autocompaction_on_startup_enabled", false))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ for (int i = 1; i <= 100; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk)
VALUES (?)"), i);
+ node1.flush(KEYSPACE);
+
+ // trigger streaming; expected to fail as streaming socket closed
in the middle (currently this is an unrecoverable event)
+ node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl");
+
+ node2.runOnInstance(() -> {
+ try
+ {
+ // use the startup logic to check for corrupt txn logfiles
from the streaming failure
+ // quicker than restarting the instance to check
+
ColumnFamilyStore.scrubDataDirectories(Schema.instance.getTableMetadata(KEYSPACE,
"tbl"));
+ }
+ catch (StartupException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+ }
+
+
+ public static class BBHelper
+ {
+ static volatile StreamSession firstSession;
+ static CountDownLatch firstStreamAbort = new CountDownLatch(1); //
per-instance
+
+ // CassandraStreamManager.prepareIncomingStream
+ @SuppressWarnings("unused")
+ public static IncomingStream prepareIncomingStream(StreamSession
session, StreamMessageHeader header, @SuperCall Callable<IncomingStream> zuper)
throws Exception
+ {
+ if (firstSession == null)
+ firstSession = session;
+ return zuper.call();
+ }
+
+ // CassandraStreamReceiver.abort
+ @SuppressWarnings("unused")
+ public static void abort(@SuperCall Callable<Integer> zuper) throws
Exception
+ {
+ firstStreamAbort.countDown();
+ zuper.call();
+ }
+
+ // RangeAwareSSTableWriter.append
+ @SuppressWarnings("unused")
+ public static boolean append(UnfilteredRowIterator partition,
@SuperCall Callable<Boolean> zuper) throws Exception
+ {
+ // handles compressed and non-compressed
+ if (isCaller(CassandraIncomingFile.class.getName(), "read"))
+ {
+ if (firstSession != null)
+ {
+ firstSession.abort();
+ // delay here until CassandraStreamReceiver abort is
called on NonPeriodic tasks
+ firstStreamAbort.await(1, TimeUnit.MINUTES);
+ }
+ }
+ return zuper.call();
+ }
+
+ // ColumnFamilyStore.createWriter - for entire sstable streaming,
before adding to LogTransaction
+ @SuppressWarnings("unused")
+ public static Descriptor newSSTableDescriptor(File directory, Version
version, SSTableFormat.Type format, @SuperCall Callable<Descriptor> zuper)
throws Exception
+ {
+ if (isCaller(CassandraEntireSSTableStreamReader.class.getName(),
"read"))
+ // handles compressed and non-compressed
+ {
+ if (firstSession != null)
+ {
+ firstSession.abort();
+ // delay here until CassandraStreamReceiver abort is
called on NonPeriodic tasks
+ firstStreamAbort.await(1, TimeUnit.MINUTES);
+ }
+ }
+ return zuper.call();
+ }
+
+ private static boolean isCaller(String klass, String method)
+ {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (int i = 0; i < stack.length; i++)
+ {
+ StackTraceElement e = stack[i];
+ if (klass.equals(e.getClassName()) &&
method.equals(e.getMethodName()))
+ return true;
+ }
+ return false;
+ }
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num != 2) // only target the second instance
+ return;
+
+ new ByteBuddy().rebase(CassandraStreamManager.class)
+
.method(named("prepareIncomingStream").and(takesArguments(2)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+ .method(named("append").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ new ByteBuddy().rebase(ColumnFamilyStore.class)
+
.method(named("newSSTableDescriptor").and(takesArguments(3)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ new ByteBuddy().rebase(CassandraStreamReceiver.class)
+ .method(named("abort").and(takesArguments(0)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/Util.java
b/test/unit/org/apache/cassandra/Util.java
index caeb88c35c..ab1df99a20 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -23,8 +23,10 @@ import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOError;
+import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Callable;
diff --git
a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index a4e74ce0d7..7d1cb39ae3 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -21,7 +21,9 @@ import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.io.UncheckedIOException;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +34,7 @@ import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -45,15 +48,21 @@ import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -1359,6 +1368,28 @@ public class LogTransactionTest extends
AbstractTransactionalTest
return listFiles(folder, Directories.FileType.FINAL);
}
+ // Used by listFiles - this test is deliberately racing with files being
+ // removed and cleaned up, so it is possible that files are removed
between listing and getting their
+ // canonical names, in which case they can be dropped from the stream.
+ private static Stream<File> toCanonicalIgnoringNotFound(File file)
+ {
+ try
+ {
+ return Stream.of(file.getCanonicalFile());
+ }
+ catch (IOException io)
+ {
+ if (Throwables.isCausedBy(io, t -> t instanceof
NoSuchFileException))
+ {
+ return Stream.empty();
+ }
+ else
+ {
+ throw new UncheckedIOException(io);
+ }
+ }
+ }
+
static Set<File> listFiles(File folder, Directories.FileType... types)
{
Collection<Directories.FileType> match = Arrays.asList(types);
@@ -1366,16 +1397,33 @@ public class LogTransactionTest extends
AbstractTransactionalTest
(file, type) -> match.contains(type),
Directories.OnTxnErr.IGNORE).list()
.stream()
- .map(f -> {
- try
- {
- return f.getCanonicalFile();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- })
+
.flatMap(LogTransactionTest::toCanonicalIgnoringNotFound)
.collect(Collectors.toSet());
}
+
+ final String DUMMY_KS = "ks";
+ final String DUMMY_TBL = "tbl";
+ final File dir = new File(".");
+ int nextGeneration = 1;
+ final Set<Component> dummyComponents =
Collections.singleton(Component.DATA);
+ final TableMetadataRef dummyMetadata =
TableMetadataRef.forOfflineTools(TableMetadata.minimal(DUMMY_KS, DUMMY_TBL));
+ final DiskOptimizationStrategy dummyOptimizationStrategy = new
SpinningDiskOptimizationStrategy();
+
+ SSTable dummySSTable()
+ {
+ int id = nextGeneration;
+ nextGeneration++;
+ Descriptor descriptor = new Descriptor(dir, DUMMY_KS, DUMMY_TBL, id);
+ return new SSTable(descriptor, dummyComponents, dummyMetadata,
dummyOptimizationStrategy)
+ {
+ };
+ }
+
+ @Test(expected = TransactionAlreadyCompletedException.class)
+ public void useAfterCompletedTest()
+ {
+ LogTransaction txnFile = new LogTransaction(OperationType.STREAM);
+ txnFile.abort(); // this should complete the txn
+ txnFile.trackNew(dummySSTable()); // expect an IllegalStateException
here
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]