This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new e721705152 Avoid re-initializing underlying iterator in
LazilyInitializedUnfilteredRowIterator after closing
e721705152 is described below
commit e721705152259ff5d4e9ec28d7fa25a1bced573f
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Oct 8 15:44:23 2025 +0200
Avoid re-initializing underlying iterator in
LazilyInitializedUnfilteredRowIterator after closing
Patch by marcuse; reviewed by Aleksey Yeschenko and Branimir Lambov for
CASSANDRA-20972
---
CHANGES.txt | 1 +
.../LazilyInitializedUnfilteredRowIterator.java | 12 +++---
.../distributed/test/DistinctReadTest.java | 47 ++++++++++++++++++++++
.../db/compaction/AntiCompactionTest.java | 40 +++++++++---------
.../compaction/LeveledCompactionStrategyTest.java | 8 +++-
.../cassandra/db/compaction/TTLExpiryTest.java | 6 ++-
.../db/rows/ThrottledUnfilteredIteratorTest.java | 6 +--
.../cassandra/io/sstable/SSTableScannerTest.java | 7 +++-
8 files changed, 96 insertions(+), 31 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c983ca1e4..aebbae0693 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.6
+ * Avoid re-initializing underlying iterator in
LazilyInitializedUnfilteredRowIterator after closing (CASSANDRA-20972)
* Flush SAI segment builder when current SSTable writer is switched
(CASSANDRA-20752)
* Throw RTE instead of FSError when RTE is thrown from FileUtis.write in
TOCComponent (CASSANDRA-20917)
* Upgrade jackson-dataformat-yaml to 2.19.2 and snakeyaml to 2.1
(CASSANDRA-18875)
diff --git
a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
index 8a8b22966c..516b1d8cb2 100644
---
a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
+++
b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java
@@ -32,8 +32,8 @@ import org.apache.cassandra.utils.AbstractIterator;
public abstract class LazilyInitializedUnfilteredRowIterator extends
AbstractIterator<Unfiltered> implements UnfilteredRowIterator
{
private final DecoratedKey partitionKey;
-
private UnfilteredRowIterator iterator;
+ private boolean closed = false;
public LazilyInitializedUnfilteredRowIterator(DecoratedKey partitionKey)
{
@@ -97,15 +97,17 @@ public abstract class
LazilyInitializedUnfilteredRowIterator extends AbstractIte
public void close()
{
+ // don't use iterator == null as indicator if this is closed since
some methods are called after the iterator is
+ // closed and maybeInit would re-initialize the underlying iterator in
that case
+ closed = true;
if (iterator != null)
- {
iterator.close();
- iterator = null;
- }
}
public boolean isOpen()
{
- return iterator != null;
+ if (closed)
+ return false;
+ return iterator != null; // for backwards compatibility - if
`maybeInit` has not been run on this class, consider it not open, for example
SSTableExport seems to rely on this
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java
new file mode 100644
index 0000000000..5bdb0910f8
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/DistinctReadTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+public class DistinctReadTest extends TestBaseImpl
+{
+ @Test
+ public void test() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build()
+ .withNodes(1)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int, ck
int, x int, PRIMARY KEY (id, ck))"));
+ cluster.coordinator(1).execute(withKeyspace("DELETE FROM %s.tbl
USING TIMESTAMP 100 WHERE id = 1 AND ck < 10 "), ConsistencyLevel.ONE);
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl
(id, ck, x) VALUES (1, 5, 7) USING TIMESTAMP 101"), ConsistencyLevel.ONE);
+ cluster.get(1).flush(KEYSPACE);
+ // all these failed before fix;
+ cluster.coordinator(1).execute(withKeyspace("select distinct id
from %s.tbl where token(id) > " + Long.MIN_VALUE), ConsistencyLevel.ONE);
+ cluster.coordinator(1).execute(withKeyspace("select distinct id
from %s.tbl where id > 0 allow filtering"), ConsistencyLevel.ONE);
+ cluster.coordinator(1).execute(withKeyspace("select id from %s.tbl
where token(id) > " + Long.MIN_VALUE +" PER PARTITION LIMIT 1"),
ConsistencyLevel.ONE);
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 97da2a4076..2b1181e6c6 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -169,26 +169,28 @@ public class AntiCompactionTest
{
while (scanner.hasNext())
{
- UnfilteredRowIterator row = scanner.next();
- Token token = row.partitionKey().getToken();
- if (sstable.isPendingRepair() && !sstable.isTransient())
+ try (UnfilteredRowIterator row = scanner.next())
{
- assertTrue(fullContains.test(token));
- assertFalse(transContains.test(token));
- stats.pendingKeys++;
- }
- else if (sstable.isPendingRepair() &&
sstable.isTransient())
- {
-
- assertTrue(transContains.test(token));
- assertFalse(fullContains.test(token));
- stats.transKeys++;
- }
- else
- {
- assertFalse(fullContains.test(token));
- assertFalse(transContains.test(token));
- stats.unrepairedKeys++;
+ Token token = row.partitionKey().getToken();
+ if (sstable.isPendingRepair() &&
!sstable.isTransient())
+ {
+ assertTrue(fullContains.test(token));
+ assertFalse(transContains.test(token));
+ stats.pendingKeys++;
+ }
+ else if (sstable.isPendingRepair() &&
sstable.isTransient())
+ {
+
+ assertTrue(transContains.test(token));
+ assertFalse(fullContains.test(token));
+ stats.transKeys++;
+ }
+ else
+ {
+ assertFalse(fullContains.test(token));
+ assertFalse(transContains.test(token));
+ stats.unrepairedKeys++;
+ }
}
}
}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index ec8c8ad975..3aeacb0fbb 100644
---
a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -286,7 +287,12 @@ public class LeveledCompactionStrategyTest
ISSTableScanner scanner = scanners.get(0);
// scan through to the end
while (scanner.hasNext())
- scanner.next();
+ {
+ try (UnfilteredRowIterator ignored = scanner.next())
+ {
+ // just close the iterator
+ }
+ }
// scanner.getCurrentPosition should be equal to total bytes of L1
sstables
assertEquals(scanner.getCurrentPosition(),
SSTableReader.getTotalUncompressedBytes(sstables));
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index e7e97bed99..6e1dd7fd36 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -258,8 +258,10 @@ public class TTLExpiryTest
assertTrue(scanner.hasNext());
while(scanner.hasNext())
{
- UnfilteredRowIterator iter = scanner.next();
- assertEquals(Util.dk(noTTLKey), iter.partitionKey());
+ try (UnfilteredRowIterator iter = scanner.next())
+ {
+ assertEquals(Util.dk(noTTLKey), iter.partitionKey());
+ }
}
scanner.close();
}
diff --git
a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
index d2a9aa7824..3b78fffbdb 100644
---
a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
+++
b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
@@ -214,8 +214,6 @@ public class ThrottledUnfilteredIteratorTest extends
CQLTester
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- // only 1 partition data
- assertFalse(scanner.hasNext());
List<Unfiltered> expectedUnfiltereds = new ArrayList<>();
rowIterator.forEachRemaining(expectedUnfiltereds::add);
@@ -227,15 +225,17 @@ public class ThrottledUnfilteredIteratorTest extends
CQLTester
assertTrue(scannerForThrottle.hasNext());
try (UnfilteredRowIterator rowIteratorForThrottle =
scannerForThrottle.next())
{
- assertFalse(scannerForThrottle.hasNext());
verifyThrottleIterator(expectedUnfiltereds,
rowIteratorForThrottle,
new
ThrottledUnfilteredIterator(rowIteratorForThrottle, throttle),
throttle);
}
+ assertFalse(scannerForThrottle.hasNext());
}
}
}
+ // only 1 partition data
+ assertFalse(scanner.hasNext());
}
}
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index 73195b0617..136b23ff86 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -345,7 +345,12 @@ public class SSTableScannerTest
// full range scan
ISSTableScanner scanner = sstable.getScanner();
for (int i = 4; i < 10; i++)
- assertEquals(toKey(i), new
String(scanner.next().partitionKey().getKey().array()));
+ {
+ try (UnfilteredRowIterator row = scanner.next())
+ {
+ assertEquals(toKey(i), new
String(row.partitionKey().getKey().array()));
+ }
+ }
scanner.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]