[ 
https://issues.apache.org/jira/browse/CASSANDRA-21188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18060787#comment-18060787
 ] 

Maxim Muzafarov commented on CASSANDRA-21188:
---------------------------------------------

Reproducer:


{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.compression;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.Uninterruptibles;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import 
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.ReflectionUtils;

import static org.apache.cassandra.Util.spinUntilTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;


public class CompressionDictionaryTrainingStatusTest extends CQLTester
{
    @BeforeClass
    public static void setup() throws Throwable
    {
        requireNetwork();
        startJMXServer();
    }

    @Test
    public void testTrainingStatusWhenSStablesCompactedBeforeRefAcquired() 
throws Throwable
    {
        String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data 
text) " +
                                   "WITH compression = {'class': 
'ZstdDictionaryCompressor', 'chunk_length_in_kb': '4'}");

        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(table);

        cfs.disableAutoCompaction();

        CompressionDictionaryManager manager = 
cfs.compressionDictionaryManager();
        ICompressionDictionaryTrainer trainer = manager.trainer();

        Field cacheField = 
ReflectionUtils.getField(CompressionDictionaryManager.class, "cache");
        cacheField.setAccessible(true);
        ICompressionDictionaryCache cache = (ICompressionDictionaryCache) 
cacheField.get(manager);

        CompressionDictionarySchedulerTester tester = new 
CompressionDictionarySchedulerTester(keyspace(),
                                                                                
               table,
                                                                                
               cfs.metadata().id.toLongString(),
                                                                                
               cache,
                                                                                
               true);
        ICompressionDictionaryScheduler originalScheduler = 
getScheduler(manager);

        try
        {
            setScheduler(manager, tester);

            assertFalse(tester.isManualTrainingRunning());
            Thread trainThread = new Thread(() -> manager.train(true, 
Collections.emptyMap()));
            trainThread.start();

            // trainer.start() updates the status
            spinUntilTrue(() -> trainer.getTrainingState().getStatus() == 
TrainingStatus.SAMPLING,
                          10, TimeUnit.SECONDS);

            cfs.enableAutoCompaction();
            compact();

            tester.scheduled.countDown();
            trainThread.join(TimeUnit.SECONDS.toMillis(30));

            spinUntilTrue(() -> !tester.isManualTrainingRunning(), 10, 
TimeUnit.SECONDS);

            TrainingStatus actual = trainer.getTrainingState().getStatus();
            assertNotEquals("Training status should not remain SAMPLING " +
                            "when all SSTables were compacted before tryRef()",
                            TrainingStatus.SAMPLING, actual);
        }
        finally
        {
            setScheduler(manager, originalScheduler);
        }
    }

    private static ICompressionDictionaryScheduler 
getScheduler(CompressionDictionaryManager manager) throws Exception
    {
        Field f = ReflectionUtils.getField(CompressionDictionaryManager.class, 
"scheduler");
        f.setAccessible(true);
        return (ICompressionDictionaryScheduler) f.get(manager);
    }

    private static void setScheduler(CompressionDictionaryManager manager, 
ICompressionDictionaryScheduler scheduler) throws Exception
    {
        Field f = ReflectionUtils.getField(CompressionDictionaryManager.class, 
"scheduler");
        f.setAccessible(true);
        f.set(manager, scheduler);
    }

    private static final class CompressionDictionarySchedulerTester extends 
CompressionDictionaryScheduler
    {
        private final CountDownLatch scheduled = new CountDownLatch(1);
        public CompressionDictionarySchedulerTester(String keyspaceName,
                                                  String tableName,
                                                  String tableId,
                                                  ICompressionDictionaryCache 
cache,
                                                  boolean isEnabled)
        {
            super(keyspaceName, tableName, tableId, cache, isEnabled);
        }

        @Override
        public void scheduleSSTableBasedTraining(ICompressionDictionaryTrainer 
trainer,
                                                 Set<SSTableReader> sstables,
                                                 
CompressionDictionaryTrainingConfig config,
                                                 boolean force)
        {
            Uninterruptibles.awaitUninterruptibly(scheduled);
            super.scheduleSSTableBasedTraining(trainer, sstables, config, 
force);
        }
    }
}
 {code}

> Race between compaction and dictionary compression training. Status stuck at 
> SAMPLING. ExportImportListCompressionDictionaryTest hangs
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-21188
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-21188
>             Project: Apache Cassandra
>          Issue Type: Bug
>            Reporter: Maxim Muzafarov
>            Priority: Normal
>
> There is a race beween compaction process and dictionary compression training 
> start:
>  # CompressionDictionaryManager: We collect all live sstables
>  # ICompressionDictionaryTrainer: start a new traning
>  # currentTrainingStatus moved to SAMPLING
>  # All SSTables get compacted within concurrent compaction thread
>  # SSTableSamplingTask: in the constructor sstable.tryRef return null
>  # We run this task on a thread pool and it exists in cancelManualTraining
>  # the currentTrainingStatus remains SAMPLING (should be *FAILED* or 
> {*}COMPLETED!{*})
> ExportImportListCompressionDictionaryTest hangs for 10 minutes (configured 
> constant) for now reason. 
> The logs:
> {code}
> INFO  [PerDiskMemtableFlushWriter_0:1] 2026-02-21T17:07:05,061 
> Flushing.java:157 - Writing 
> Memtable-table_testexportingspecificdictionary_strateg_18@1268950324(61.523KiB
>  serialized bytes, 1000 ops, 506.836KiB (0%) on-heap, 0B (0%) off-heap), 
> flushed range = [min(-9223372036854775808), max(9223372036854775807))
> INFO  [PerDiskMemtableFlushWriter_0:1] 2026-02-21T17:07:05,061 
> Flushing.java:197 - Completed flushing 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-11-big-Data.db
>  (28.198KiB) for commitlog position 
> CommitLogPosition(segmentId=1771693567140, position=654098), time spent: 0 
> ms, bytes flushed: 28875 / (rate: 28.198KiB/s), partitions flushed: 1000 / 
> (rate: 1000/s), rows: 1000 / (rate: 1000/s), cpu time: 0 ms, heap allocated: 
> 220.711KiB
> INFO  [MemtableFlushWriter:1] 2026-02-21T17:07:05,084 LogTransaction.java:266 
> - Unfinished transaction log, deleting 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa_txn_flush_bf3ac330-0f47-11f1-88d2-574197b4b378.log
>  
> DEBUG [MemtableFlushWriter:1] 2026-02-21T17:07:05,087 
> ColumnFamilyStore.java:1416 - Flushed to 
> [BigTableReader:big(path='/Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-11-big-Data.db')]
>  (1 sstables, 30.889KiB), biggest 30.889KiB, smallest 30.889KiB
> INFO  [main] 2026-02-21T17:07:05,091 ColumnFamilyStore.java:1088 - Enqueuing 
> flush of cql_test_keyspace.table_testexportingspecificdictionary_strateg_18, 
> Reason: UNIT_TESTS, Usage: 506.836KiB (0%) on-heap, 0B (0%) off-heap
> INFO  [PerDiskMemtableFlushWriter_0:2] 2026-02-21T17:07:05,092 
> Flushing.java:157 - Writing 
> Memtable-table_testexportingspecificdictionary_strateg_18@957877902(61.523KiB 
> serialized bytes, 1000 ops, 506.836KiB (0%) on-heap, 0B (0%) off-heap), 
> flushed range = [min(-9223372036854775808), max(9223372036854775807))
> INFO  [PerDiskMemtableFlushWriter_0:2] 2026-02-21T17:07:05,094 
> Flushing.java:197 - Completed flushing 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-12-big-Data.db
>  (28.201KiB) for commitlog position 
> CommitLogPosition(segmentId=1771693567140, position=726098), time spent: 0 
> ms, bytes flushed: 28878 / (rate: 28.201KiB/s), partitions flushed: 1000 / 
> (rate: 1000/s), rows: 1000 / (rate: 1000/s), cpu time: 0 ms, heap allocated: 
> 220.711KiB
> INFO  [MemtableFlushWriter:2] 2026-02-21T17:07:05,112 LogTransaction.java:266 
> - Unfinished transaction log, deleting 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa_txn_flush_bf3fa530-0f47-11f1-88d2-574197b4b378.log
>  
> DEBUG [MemtableFlushWriter:2] 2026-02-21T17:07:05,116 
> ColumnFamilyStore.java:1416 - Flushed to 
> [BigTableReader:big(path='/Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-12-big-Data.db')]
>  (1 sstables, 30.887KiB), biggest 30.887KiB, smallest 30.887KiB
> DEBUG [CompactionExecutor:2] 2026-02-21T17:07:05,117 Directories.java:554 - 
> FileStore /System/Volumes/Data (/dev/disk3s5) has 593792975872 bytes 
> available, checking if we can write 103847 bytes
> INFO  [CompactionExecutor:2] 2026-02-21T17:07:05,117 CompactionTask.java:229 
> - Compacting (bf4375c0-0f47-11f1-88d2-574197b4b378) 
> [/Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-11-big-Data.db,
>  
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-12-big-Data.db,
>  
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-9-big-Data.db,
>  
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-10-big-Data.db,
>  ]
> DEBUG [CompactionExecutor:2] 2026-02-21T17:07:05,118 CursorCompactor.java:152 
> - Cursor compaction for table: 
> table_testexportingspecificdictionary_strateg_18 keyspace: cql_test_keyspace 
> is supported.
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:05,146 
> CommandInvokerService.java:185 - Executing command 'train' with execution ID: 
> a5960218-7f58-41a2-a06f-d627acf20efd
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:05,147 
> CompressionDictionaryManager.java:237 - Starting SSTable-based training for 
> cql_test_keyspace.table_testexportingspecificdictionary_strateg_18 with 1 
> SSTables
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:05,150 
> CompressionDictionaryScheduler.java:101 - Starting SSTable-based dictionary 
> training for 
> cql_test_keyspace.table_testexportingspecificdictionary_strateg_18 from 1 
> SSTables
> DEBUG [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:05,150 
> CompressionDictionaryScheduler.java:198 - Couldn't acquire reference to 
> SSTable 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-13-big.
>  It may have been removed.
> WARN  [NonPeriodicTasks:1] 2026-02-21T17:07:05,150 
> CompressionDictionaryScheduler.java:213 - No SSTables available for sampling 
> in cql_test_keyspace.table_testexportingspecificdictionary_strateg_18
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:05,150 
> ToolRunner.java:927 - >>>> Polling training status...SAMPLING
> INFO  [CompactionExecutor:2] 2026-02-21T17:07:05,152 
> CursorCompactor.java:1574 - Compaction ended 
> bf4375c0-0f47-11f1-88d2-574197b4b378: { data bytes read = 294620, data bytes 
> written = 297868,  input (keys = [1:10000,] = 10000, rows = [1:10000,] = 
> 10000, cells = [1:10000,] = 10000),  output (keys = 10000, rows = 10000, 
> cells = 10000)}
> INFO  [CompactionExecutor:2] 2026-02-21T17:07:05,153 CompactionTask.java:336 
> - Compacted (bf4375c0-0f47-11f1-88d2-574197b4b378) 4 sstables to 
> [build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-13-big,]
>  to level=0.  101.413KiB to 91.637KiB (~90% of original) in 35ms.  Read 
> Throughput = 2.826MiB/s, Write Throughput = 2.554MiB/s, Row Throughput = 
> ~10,000/s.  10,000 total partitions merged to 10,000.  Partition merge counts 
> were {1:10000, }. Time spent writing keys = 10ms
> INFO  [NonPeriodicTasks:1] 2026-02-21T17:07:05,153 BigFormat.java:324 - 
> Deleting sstable: 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-11-big
> INFO  [NonPeriodicTasks:1] 2026-02-21T17:07:05,154 BigFormat.java:324 - 
> Deleting sstable: 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-12-big
> INFO  [NonPeriodicTasks:1] 2026-02-21T17:07:05,154 BigFormat.java:324 - 
> Deleting sstable: 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-9-big
> INFO  [NonPeriodicTasks:1] 2026-02-21T17:07:05,155 BigFormat.java:324 - 
> Deleting sstable: 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa-10-big
> INFO  [NonPeriodicTasks:1] 2026-02-21T17:07:05,155 LogTransaction.java:266 - 
> Unfinished transaction log, deleting 
> /Users/maxim.muzafarov/IdeaProjects/cassandra/build/test/cassandra/data/cql_test_keyspace/table_testexportingspecificdictionary_strateg_18-1b255f4def2540a60000000000000056/pa_txn_compaction_bf4375c0-0f47-11f1-88d2-574197b4b378.log
>  
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:06,155 
> ToolRunner.java:927 - >>>> Polling training status...SAMPLING
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:07,159 
> ToolRunner.java:927 - >>>> Polling training status...SAMPLING
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:08,163 
> ToolRunner.java:927 - >>>> Polling training status...SAMPLING
> INFO  [RMI TCP Connection(26)-127.0.0.1] 2026-02-21T17:07:09,168 
> ToolRunner.java:927 - >>>> Polling training status...SAMPLING
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to