This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2427d273 CASSANALYTICS-127: Fixing CdcTests.testMockedCdc broken due
to incorrect position update in BufferingCommitLogReader (#179)
2427d273 is described below
commit 2427d2736524b0949df87e4783965f815dca0ac6
Author: Jyothsna konisa <[email protected]>
AuthorDate: Wed Mar 11 13:38:02 2026 -0700
CASSANALYTICS-127: Fixing CdcTests.testMockedCdc broken due to incorrect
position update in BufferingCommitLogReader (#179)
Patch by Jyothsna Konisa; Reviewed by Josh McKenzie & Lukasz Antoniak for
CASSANALYTICS-127
---
CHANGES.txt | 4 ++++
.../org/apache/cassandra/cdc/LocalCommitLog.java | 22 ++++++++++++++++++++--
.../db/commitlog/BufferingCommitLogReader.java | 8 --------
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b973c39..e2740c94 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+0.4.0
+-----
+ * Fixing CdcTests.testMockedCdc broken due to incorrect position update in
BufferingCommitLogReader (CASSANALYTICS-127)
+
0.3.0
-----
* Assign data file start offset based on BTI index (CASSANALYTICS-121)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
index da7e06f0..3c696bca 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/LocalCommitLog.java
@@ -20,7 +20,9 @@
package org.apache.cassandra.cdc;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.spark.data.FileSystemSource;
@@ -56,7 +58,8 @@ public class LocalCommitLog implements CommitLog
public long maxOffset()
{
- return length;
+ List<String> lines = readIdxFile();
+ return lines.isEmpty() ? length : Long.parseLong(lines.get(0).trim());
}
public long length()
@@ -67,7 +70,22 @@ public class LocalCommitLog implements CommitLog
public boolean completed()
{
// for CDC Cassandra writes COMPLETED in the final line of the
CommitLog-7-*_cdc.idx index file.
- return maxOffset() >= 67108818;
+ List<String> lines = readIdxFile();
+ return lines.size() >= 2 &&
lines.get(1).trim().equalsIgnoreCase("COMPLETED");
+ }
+
+ // Each commit log .log file has a corresponding _cdc.idx file that tracks
the CDC offset and completion status.
+ private List<String> readIdxFile()
+ {
+ Path idxPath = path.resolveSibling(name.replace(".log", "_cdc.idx"));
+ try
+ {
+ return Files.exists(idxPath) ? Files.readAllLines(idxPath) :
List.of();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public FileSystemSource<CommitLog> source()
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index fd457d8a..d6476087 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -316,14 +316,6 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
break;
}
}
-
- // If the loop finished naturally (iterator exhausted) without
hitting an error or limit,
- // ensure the position reflects the end of the file. If we aborted
early due to an error
- // or mutation limit, 'this.position' remains at the last valid
read offset.
- if (statusTracker.shouldContinue())
- {
- this.position = (int) log.maxOffset();
- }
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop)
cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]