This is an automated email from the ASF dual-hosted git repository.
bereng pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new ada958eaf7 org.apache.cassandra.db.commitlog.GroupCommitLogTest tests
failing on trunk
ada958eaf7 is described below
commit ada958eaf7bd37c38cda55ef469c87f1e7e958bb
Author: Bereng <[email protected]>
AuthorDate: Fri May 27 09:29:00 2022 +0200
org.apache.cassandra.db.commitlog.GroupCommitLogTest tests failing on trunk
patch by Berenguer Blasi; reviewed by Andres de la Peña for CASSANDRA-17232
---
.../commitlog/AbstractCommitLogSegmentManager.java | 37 +++++++++++++++++++---
.../apache/cassandra/db/commitlog/CommitLog.java | 3 +-
2 files changed, 34 insertions(+), 6 deletions(-)
diff --git
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 627c885cd0..d8eb0e720f 100644
---
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.io.util.SimpleCachedBufferPool;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.*;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -131,6 +132,7 @@ public abstract class AbstractCommitLogSegmentManager
@Override
public void run(Interruptible.State state) throws InterruptedException
{
+ boolean interrupted = false;
try
{
switch (state)
@@ -148,7 +150,7 @@ public abstract class AbstractCommitLogSegmentManager
synchronized (this)
{
- Thread.interrupted();
+ interrupted = Thread.interrupted();
logger.trace("No segments in reserve; creating a
fresh one");
availableSegment = createSegment();
@@ -168,7 +170,10 @@ public abstract class AbstractCommitLogSegmentManager
catch (Throwable t)
{
if (!CommitLog.handleCommitError("Failed managing commit log
segments", t))
+ {
+ discardAvailableSegment();
throw new TerminateException();
+ }
// sleep some arbitrary period to avoid spamming CL
Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
@@ -177,7 +182,25 @@ public abstract class AbstractCommitLogSegmentManager
// There could be a new segment in next not offered, but only
on failure to discard it while
// shutting down-- nothing more can or needs to be done in
that case.
}
- WaitQueue.waitOnCondition(managerThreadWaitCondition,
managerThreadWaitQueue);
+
+ interrupted = interrupted || Thread.interrupted();
+ if (!interrupted)
+ {
+ try
+ {
+ WaitQueue.waitOnCondition(managerThreadWaitCondition,
managerThreadWaitQueue);
+ }
+ catch(InterruptedException e)
+ {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted)
+ {
+ discardAvailableSegment();
+ throw new InterruptedException();
+ }
}
}
@@ -445,6 +468,7 @@ public abstract class AbstractCommitLogSegmentManager
* Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
* Only call this after the AbstractCommitLogService is shut down.
*/
+ @VisibleForTesting
public void stopUnsafe(boolean deleteSegments)
{
logger.debug("CLSM closing and clearing existing commit log
segments...");
@@ -452,7 +476,8 @@ public abstract class AbstractCommitLogSegmentManager
shutdown();
try
{
- awaitTermination();
+ // On heavily loaded test envs we need a longer wait
+ assert awaitTermination(5L, TimeUnit.MINUTES) : "Assert waiting
for termination failed on " + FBUtilities.now().toString();
}
catch (InterruptedException e)
{
@@ -522,14 +547,16 @@ public abstract class AbstractCommitLogSegmentManager
/**
* Returns when the management thread terminates.
*/
- public void awaitTermination() throws InterruptedException
+ public boolean awaitTermination(long timeout, TimeUnit units) throws
InterruptedException
{
- executor.awaitTermination(1L, TimeUnit.MINUTES);
+ boolean res = executor.awaitTermination(timeout, units);
for (CommitLogSegment segment : activeSegments)
segment.close();
if (bufferPool != null)
bufferPool.emptyBufferPool();
+
+ return res;
}
/**
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 37df1f9451..a832b5ea42 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.zip.CRC32;
@@ -456,7 +457,7 @@ public class CommitLog implements CommitLogMBean
executor.shutdown();
executor.awaitTermination();
segmentManager.shutdown();
- segmentManager.awaitTermination();
+ segmentManager.awaitTermination(1L, TimeUnit.MINUTES);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]