[ 
https://issues.apache.org/jira/browse/CASSANDRA-20085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904295#comment-17904295
 ] 

Jon Haddad edited comment on CASSANDRA-20085 at 12/10/24 12:06 AM:
-------------------------------------------------------------------

Unfortunately, the approach I thought I could use here won't work.  Here's the 
branch: 
https://github.com/apache/cassandra/compare/trunk...rustyrazorblade:cassandra:CASSANDRA-20085?expand=1

I'm currently getting this exception:

{code}
java.lang.IllegalStateException: The UnfilteredRowIterator returned by the last 
call to next() was initialized: it must be closed before calling hasNext() or 
next() again.
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner$BaseKeyScanningIterator.computeNext(SSTableScanner.java:257)
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner$BaseKeyScanningIterator.computeNext(SSTableScanner.java:244)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner.hasNext(SSTableScanner.java:206)
        at 
org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:375)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:187)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:156)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
        at 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:201)
        at 
org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:90)
        at 
org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:304)
        at 
org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:224)
        at 
org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:26)
        at 
org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:94)
        at 
org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100)
        at 
org.apache.cassandra.db.compaction.CompactionManager$9.runMayThrow(CompactionManager.java:1014)
        at 
org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:26)
        at 
org.apache.cassandra.concurrent.FutureTask$3.call(FutureTask.java:141)
        at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)
        at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
{code}

I think I need to consider a different approach here, as there's a lot of lazy 
iterators here, and if I understand the code correctly, they were designed 
without the intention to be used the way I'm trying to use them.  I think to 
work correctly, I'd need to greedily materialize the data before pushing it 
into the writer, and I don't think that's supported right now, although I could 
be wrong. 

If doing it greedily isn't possible, It might be possible to do this by going 
further into the stack, potentially in the SequentialWriter.  The risk there is 
I'm not sure what code is relying on this code to be synchronous, and there's 
points where we throw IO exceptions.

I'm going to look into materializing the data earlier first.


was (Author: rustyrazorblade):
Unfortunately, the approach I thought I could use here won't work.  Here's the 
branch: 
https://github.com/apache/cassandra/compare/trunk...rustyrazorblade:cassandra:CASSANDRA-20085?expand=1

I'm currently getting this exception:

{quote}
java.lang.IllegalStateException: The UnfilteredRowIterator returned by the last 
call to next() was initialized: it must be closed before calling hasNext() or 
next() again.
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner$BaseKeyScanningIterator.computeNext(SSTableScanner.java:257)
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner$BaseKeyScanningIterator.computeNext(SSTableScanner.java:244)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
        at 
org.apache.cassandra.io.sstable.format.SSTableScanner.hasNext(SSTableScanner.java:206)
        at 
org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:375)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:187)
        at 
org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:156)
        at 
org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
        at 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators$2.hasNext(UnfilteredPartitionIterators.java:201)
        at 
org.apache.cassandra.db.transform.BasePartitions.hasNext(BasePartitions.java:90)
        at 
org.apache.cassandra.db.compaction.CompactionIterator.hasNext(CompactionIterator.java:304)
        at 
org.apache.cassandra.db.compaction.CompactionTask.runMayThrow(CompactionTask.java:224)
        at 
org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:26)
        at 
org.apache.cassandra.db.compaction.CompactionTask.executeInternal(CompactionTask.java:94)
        at 
org.apache.cassandra.db.compaction.AbstractCompactionTask.execute(AbstractCompactionTask.java:100)
        at 
org.apache.cassandra.db.compaction.CompactionManager$9.runMayThrow(CompactionManager.java:1014)
        at 
org.apache.cassandra.utils.WrappedRunnable.run(WrappedRunnable.java:26)
        at 
org.apache.cassandra.concurrent.FutureTask$3.call(FutureTask.java:141)
        at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)
        at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
{quote}

I think I need to consider a different approach here, as there's a lot of lazy 
iterators here, and if I understand the code correctly, they were designed 
without the intention to be used the way I'm trying to use them.  I think to 
work correctly, I'd need to greedily materialize the data before pushing it 
into the writer, and I don't think that's supported right now, although I could 
be wrong. 

If doing it greedily isn't possible, It might be possible to do this by going 
further into the stack, potentially in the SequentialWriter.  The risk there is 
I'm not sure what code is relying on this code to be synchronous, and there's 
points where we throw IO exceptions.

I'm going to look into materializing the data earlier first.

> Move CompactionAwareWriter.append to separate thread
> ----------------------------------------------------
>
>                 Key: CASSANDRA-20085
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-20085
>             Project: Apache Cassandra
>          Issue Type: Improvement
>            Reporter: Jon Haddad
>            Assignee: Jon Haddad
>            Priority: Normal
>         Attachments: image-2024-11-15-09-45-51-450.png, 
> image-2024-11-15-09-50-41-854.png
>
>
> I'm examining a 5.0 cluster, mostly default settings except these overrides:
> {noformat}
> cluster_name: "density"
> num_tokens: 4
> concurrent_reads: 64
> concurrent_writes: 64
> trickle_fsync: true
> endpoint_snitch: "Ec2Snitch"
> compaction_throughput: 0MiB/s
> key_cache_migrate_during_compaction: false
> {noformat}
> Here's a flame graph of compaction taken with these settings:
> {noformat}
> asprof -e wall -X '*Unsafe.park*' -X '*Native.epollWait' -I '*compaction*' 
> {noformat}
> [^image-2024-11-15-09-50-41-854.png]
> and in reverse:
> [^image-2024-11-15-09-45-51-450.png]
> It's clear we can get an easy win by moving the writing portion of compaction 
> to a dedicated thread (ExecutorService.newSingleThreadExecutor() ?), instead 
> of blocking in the same one. For this use case, it would improve compaction 
> throughput by about 2x.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to