This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new ada958eaf7 org.apache.cassandra.db.commitlog.GroupCommitLogTest tests 
failing on trunk
ada958eaf7 is described below

commit ada958eaf7bd37c38cda55ef469c87f1e7e958bb
Author: Bereng <[email protected]>
AuthorDate: Fri May 27 09:29:00 2022 +0200

    org.apache.cassandra.db.commitlog.GroupCommitLogTest tests failing on trunk
    
    patch by Berenguer Blasi; reviewed by Andres de la Peña for CASSANDRA-17232
---
 .../commitlog/AbstractCommitLogSegmentManager.java | 37 +++++++++++++++++++---
 .../apache/cassandra/db/commitlog/CommitLog.java   |  3 +-
 2 files changed, 34 insertions(+), 6 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 627c885cd0..d8eb0e720f 100644
--- 
a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.io.util.SimpleCachedBufferPool;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.*;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -131,6 +132,7 @@ public abstract class AbstractCommitLogSegmentManager
         @Override
         public void run(Interruptible.State state) throws InterruptedException
         {
+            boolean interrupted = false;
             try
             {
                 switch (state)
@@ -148,7 +150,7 @@ public abstract class AbstractCommitLogSegmentManager
 
                         synchronized (this)
                         {
-                            Thread.interrupted();
+                            interrupted = Thread.interrupted();
                             logger.trace("No segments in reserve; creating a 
fresh one");
                             availableSegment = createSegment();
 
@@ -168,7 +170,10 @@ public abstract class AbstractCommitLogSegmentManager
             catch (Throwable t)
             {
                 if (!CommitLog.handleCommitError("Failed managing commit log 
segments", t))
+                {
+                    discardAvailableSegment();
                     throw new TerminateException();
+                }
 
                 // sleep some arbitrary period to avoid spamming CL
                 Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
@@ -177,7 +182,25 @@ public abstract class AbstractCommitLogSegmentManager
                 // There could be a new segment in next not offered, but only 
on failure to discard it while
                 // shutting down-- nothing more can or needs to be done in 
that case.
             }
-            WaitQueue.waitOnCondition(managerThreadWaitCondition, 
managerThreadWaitQueue);
+
+            interrupted = interrupted || Thread.interrupted();
+            if (!interrupted)
+            {
+                try
+                {
+                    WaitQueue.waitOnCondition(managerThreadWaitCondition, 
managerThreadWaitQueue);
+                }
+                catch(InterruptedException e)
+                {
+                    interrupted = true;
+                }
+            }
+            
+            if (interrupted)
+            {
+                discardAvailableSegment();
+                throw new InterruptedException();
+            }
         }
     }
 
@@ -445,6 +468,7 @@ public abstract class AbstractCommitLogSegmentManager
      * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
      * Only call this after the AbstractCommitLogService is shut down.
      */
+    @VisibleForTesting
     public void stopUnsafe(boolean deleteSegments)
     {
         logger.debug("CLSM closing and clearing existing commit log 
segments...");
@@ -452,7 +476,8 @@ public abstract class AbstractCommitLogSegmentManager
         shutdown();
         try
         {
-            awaitTermination();
+            // On heavily loaded test envs we need a longer wait
+            assert awaitTermination(5L, TimeUnit.MINUTES) : "Assert waiting 
for termination failed on " + FBUtilities.now().toString();
         }
         catch (InterruptedException e)
         {
@@ -522,14 +547,16 @@ public abstract class AbstractCommitLogSegmentManager
     /**
      * Returns when the management thread terminates.
      */
-    public void awaitTermination() throws InterruptedException
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
     {
-        executor.awaitTermination(1L, TimeUnit.MINUTES);
+        boolean res = executor.awaitTermination(timeout, units);
         for (CommitLogSegment segment : activeSegments)
             segment.close();
 
         if (bufferPool != null)
             bufferPool.emptyBufferPool();
+
+        return res;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 37df1f9451..a832b5ea42 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.FileStore;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.zip.CRC32;
@@ -456,7 +457,7 @@ public class CommitLog implements CommitLogMBean
         executor.shutdown();
         executor.awaitTermination();
         segmentManager.shutdown();
-        segmentManager.awaitTermination();
+        segmentManager.awaitTermination(1L, TimeUnit.MINUTES);
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to