This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 98e7cd5d99 Flush SAI segment builder when current SSTable writer is 
switched
98e7cd5d99 is described below

commit 98e7cd5d999f9e6d9323bc8d4301751814b66059
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Mon Oct 13 15:17:27 2025 -0500

    Flush SAI segment builder when current SSTable writer is switched
    
    patch by Pranav Shenoy; reviewed by Caleb Rackliffe and Zhao Yang for 
CASSANDRA-20752
---
 CHANGES.txt                                        |  1 +
 .../index/sai/disk/PerColumnIndexWriter.java       |  6 +++
 .../index/sai/disk/StorageAttachedIndexWriter.java | 19 ++++++++
 .../index/sai/disk/v1/MemtableIndexWriter.java     |  7 +++
 .../index/sai/disk/v1/SSTableIndexWriter.java      | 12 +++++
 .../index/sai/disk/v1/segment/SegmentBuilder.java  |  2 +-
 .../cassandra/io/sstable/SSTableFlushObserver.java |  6 +++
 .../cassandra/io/sstable/SSTableRewriter.java      |  1 +
 .../cassandra/io/sstable/format/SSTableWriter.java |  6 +++
 .../index/sai/functional/CompactionTest.java       | 55 ++++++++++++++++++++++
 .../io/sstable/SSTableFlushObserverTest.java       | 14 +++++-
 11 files changed, 126 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c8ce598f49..4c983ca1e4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.6
+ * Flush SAI segment builder when current SSTable writer is switched 
(CASSANDRA-20752)
  * Throw RTE instead of FSError when RTE is thrown from FileUtis.write in 
TOCComponent (CASSANDRA-20917)
  * Upgrade jackson-dataformat-yaml to 2.19.2 and snakeyaml to 2.1 
(CASSANDRA-18875)
  * Represent complex settings as JSON on system_views.settings table 
(CASSANDRA-20827)
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java
index 8f1d657515..1eaa406c82 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java
@@ -41,6 +41,12 @@ public interface PerColumnIndexWriter
      */
     void complete(Stopwatch stopwatch) throws IOException;
 
+    /**
+     * Called when current SSTable writer is switched during sharded 
compaction to free any in-memory resources associated
+     * with the SSTable for current index without waiting for full transaction 
to complete
+     */
+    void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;
+    
     /**
      * Aborts accumulating data. Allows to clean up resources on error.
      * <p> 
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
index 341bcaac5e..cf899567b4 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java
@@ -164,6 +164,25 @@ public class StorageAttachedIndexWriter implements 
SSTableFlushObserver
         }
     }
 
+    @Override
+    public void onSSTableWriterSwitched()
+    {
+        if (aborted) return;
+
+        try
+        {
+            for (PerColumnIndexWriter w : perIndexWriters)
+            {
+                w.onSSTableWriterSwitched(stopwatch);
+            }
+        }
+        catch (Throwable t)
+        {
+            logger.error(indexDescriptor.logMessage("Failed to flush segment 
on sstable writer switched"), t);
+            abort(t, true);
+        }
+    }
+
     @Override
     public void complete()
     {
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index c5f833f4ac..846f65505b 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -166,6 +166,13 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
         }
     }
 
+    @Override
+    public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
+    {
+        // no-op for memtable index where all terms are already inside memory 
index, we can't get rid of memory index
+        // until full flush are completed
+    }
+
     private long flush(MemtableTermsIterator terms) throws IOException
     {
         SegmentWriter writer = indexTermType.isLiteral() ? new 
LiteralIndexWriter(indexDescriptor, indexIdentifier)
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
index cb62c91e1e..c3c83992b2 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java
@@ -100,6 +100,18 @@ public class SSTableIndexWriter implements 
PerColumnIndexWriter
         }
     }
 
+    @Override
+    public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
+    {
+        if (maybeAbort())
+            return;
+
+        boolean emptySegment = currentBuilder == null || 
currentBuilder.isEmpty();
+        logger.debug(index.identifier().logMessage("Flushing index with 
{}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");
+        if (!emptySegment)
+            flushSegment();
+    }
+
     @Override
     public void complete(Stopwatch stopwatch) throws IOException
     {
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java
index fbc0775363..d02f14582d 100644
--- 
a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java
+++ 
b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java
@@ -46,7 +46,7 @@ public abstract class SegmentBuilder
     public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 
2) - 1L;
     private static long testLastValidSegmentRowId = -1;
 
-    /** The number of column indexes being built globally. (Starts at one to 
avoid divide by zero.) */
+    /** The number of column indexes being built globally. */
     private static final AtomicInteger ACTIVE_BUILDER_COUNT = new 
AtomicInteger(0);
 
     /** Minimum flush size, dynamically updated as segment builds are started 
and completed/aborted. */
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java
index 159e0d43bd..0f28f62dab 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java
@@ -68,6 +68,12 @@ public interface SSTableFlushObserver
      */
     void complete();
 
+    /**
+     * Called when current sstable writer is switched during sharded 
compaction to free any in-memory resources associated
+     * with the sstable without waiting for full transaction to complete
+     */
+    default void onSSTableWriterSwitched() {}
+
     /**
      * Clean up resources on error. There should be no side effects if called 
multiple times.
      */
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 31df8b1120..6b20dc4320 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -277,6 +277,7 @@ public class SSTableRewriter extends 
Transactional.AbstractTransactional impleme
 
         currentlyOpenedEarlyAt = 0;
         bytesWritten += writer.getFilePointer();
+        writer.onSSTableWriterSwitched();
         writer = newWriter;
     }
 
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 90b8985730..6714b93a2c 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -282,6 +282,12 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
         txnProxy.prepareToCommit();
     }
 
+    // notify sstable flush observer about sstable writer switched
+    public final void onSSTableWriterSwitched()
+    {
+        observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
+    }
+
     public final Throwable commit(Throwable accumulate)
     {
         try
diff --git 
a/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java 
b/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java
index 5b38876de3..cb03bf16a0 100644
--- a/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java
@@ -22,6 +22,10 @@ package org.apache.cassandra.index.sai.functional;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import org.junit.Assert;
@@ -42,6 +46,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.sai.SAITester;
 import org.apache.cassandra.index.sai.utils.IndexIdentifier;
 import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter;
+import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
 import org.apache.cassandra.index.sai.utils.IndexTermType;
 import org.apache.cassandra.inject.ActionBuilder;
 import org.apache.cassandra.inject.Expression;
@@ -61,6 +66,7 @@ import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
@@ -352,4 +358,53 @@ public class CompactionTest extends SAITester
         .isInstanceOf(InvalidQueryException.class)
         .hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
     }
+
+    @Test
+    public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable
+    {
+        int shards = 64;
+        String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, 
v2 TEXT) WITH compaction = " +
+                             "{'class' : 'UnifiedCompactionStrategy', 
'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': 
'1KiB' }";
+        createTable(createTable);
+        createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
+        createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
+        disableCompaction(keyspace(), currentTable());
+
+        int rowsPerSSTable = 2000;
+        int numSSTables = 4;
+        int key = 0;
+        for (int s = 0; s < numSSTables; s++)
+        {
+            for (int i = 0; i < rowsPerSSTable; i++)
+            {
+                execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, 
'01e2wefnewirui32e21e21wre')", Integer.toString(key++));
+            }
+            flush();
+        }
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try
+        {
+            Future<?> future = executor.submit(() -> {
+                getCurrentColumnFamilyStore().forceMajorCompaction(false);
+                waitForCompactions();
+            });
+
+            // verify that it's not accumulating segment builders
+            while (!future.isDone())
+            {
+                // ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments 
for 2 indexes
+                
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2);
+            }
+            future.get(30, TimeUnit.SECONDS);
+
+            // verify results are sharded
+            
assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards);
+        }
+        finally
+        {
+            executor.shutdown();
+            assertThat(executor.awaitTermination(30, 
TimeUnit.SECONDS)).isTrue();
+        }
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java
index 4f3ff5a6fe..f9f46a1b8c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java
@@ -36,6 +36,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
@@ -73,6 +74,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+
 public class SSTableFlushObserverTest
 {
 
@@ -167,14 +169,15 @@ public class SSTableFlushObserverTest
                                                    
BufferCell.live(getColumn(cfm, "height"), now, 
LongType.instance.decompose(178L))));
 
                 writer.append(new RowIterator(cfm, key, 
Collections.singletonList(buildRow(expected.get(key)))));
-
+                writer.onSSTableWriterSwitched();
                 reader = writer.finish(true);
             }
             finally
             {
                 FileUtils.closeQuietly(writer);
             }
-
+            
+            Assert.assertTrue(observer.isWriterSwitched);
             Assert.assertTrue(observer.isComplete);
             Assert.assertEquals(expected.size(), observer.rows.size());
 
@@ -265,6 +268,7 @@ public class SSTableFlushObserverTest
         private boolean beginCalled;
         private boolean failOnBegin;
         private boolean abortCalled;
+        private boolean isWriterSwitched;
 
         @Override
         public void begin()
@@ -274,6 +278,12 @@ public class SSTableFlushObserverTest
                 throw new RuntimeException("Failed to initialize");
         }
 
+        @Override
+        public void onSSTableWriterSwitched()
+        {
+            isWriterSwitched = true;
+        }
+
         @Override
         public void startPartition(DecoratedKey key, long dataPosition, long 
indexPosition)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to