This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 34c6c382 CASSANALYTICS-124: Commitlog reading not progressing in CDC
due to incorrect CommitLogReader.isFullyRead (#176)
34c6c382 is described below
commit 34c6c38253d59a9fe8485368fea3145e2dffe205
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Mar 9 11:13:50 2026 -0700
CASSANALYTICS-124: Commitlog reading not progressing in CDC due to
incorrect CommitLogReader.isFullyRead (#176)
Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-124
---
.../commitlog/BufferingCommitLogReaderTests.java | 23 +++++++++++++++++----
.../db/commitlog/BufferingCommitLogReader.java | 24 +++++++++++++++++++---
2 files changed, 40 insertions(+), 7 deletions(-)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
index fa4dfd0b..a5459ae7 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.commitlog;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test;
import org.apache.cassandra.cdc.CdcTester;
import org.apache.cassandra.cdc.CdcTests;
+import org.apache.cassandra.cdc.LocalCommitLog;
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.cdc.api.Marker;
import org.apache.cassandra.cdc.stats.CdcStats;
@@ -82,10 +84,18 @@ public class BufferingCommitLogReaderTests
CdcTester.testCommitLog.sync();
List<Marker> markers = Collections.synchronizedList(new ArrayList<>());
- CommitLog firstLog = CdcTests.logProvider(directory)
- .logs()
- .min(CommitLog::compareTo)
- .orElseThrow(() -> new
RuntimeException("Commit log file not found"));
+ CommitLog firstLog = new
LocalCommitLog(Paths.get(CdcTests.logProvider(directory)
+
.logs()
+
.min(CommitLog::compareTo)
+
.orElseThrow(() -> new RuntimeException("Commit log file not found"))
+
.path()))
+ {
+ @Override
+ public boolean completed()
+ {
+ return true;
+ }
+ };
// read entire commit log and verify correct
Consumer<Marker> listener = markers::add;
@@ -145,6 +155,11 @@ public class BufferingCommitLogReaderTests
assertThat(keys).contains(key);
}
+ // Verify the position fix: after reading (from any start offset),
position must
+ // reach maxOffset and isFullyRead() must return true.
+ assertThat(reader.position()).isEqualTo((int) logFile.maxOffset());
+ assertThat(result.isFullyRead()).isTrue();
+
return keysRead;
}
catch (Exception e)
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index 8a808d98..fd457d8a 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -284,6 +284,9 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
{
stats.commitLogBytesSkippedOnRead(startMarker.position() -
reader.getFilePointer());
segmentReader.seek(startMarker.position());
+ // When starting from an offset, position must be initialized
to startMarker.position()
+ // rather than 0; an incorrect value causes isFullyRead to
fail.
+ this.position = startMarker.position();
}
for (CommitLogSegmentReader.SyncSegment syncSegment :
segmentReader)
@@ -295,9 +298,13 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
readSection(syncSegment.input, syncSegment.endPosition);
- // track the position at end of previous section after
successfully reading mutations
- // so we can update highwater mark after reading
- this.position = (int) reader.getFilePointer();
+ // Only advance position if the section completed normally.
+ // An early termination (e.g. LEGACY_END_OF_SEGMENT_MARKER)
sets this.position
+ // to the correct value inside readSection and must not be
overridden here.
+ if (statusTracker.shouldContinue())
+ {
+ this.position = (int) reader.getFilePointer();
+ }
if (listener != null)
{
@@ -309,6 +316,14 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
break;
}
}
+
+ // If the loop finished naturally (iterator exhausted) without
hitting an error or limit,
+ // ensure the position reflects the end of the file. If we aborted
early due to an error
+ // or mutation limit, 'this.position' remains at the last valid
read offset.
+ if (statusTracker.shouldContinue())
+ {
+ this.position = (int) log.maxOffset();
+ }
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop)
cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException.
@@ -427,6 +442,9 @@ public class BufferingCommitLogReader implements
CommitLogReadHandler,
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
logger.trace("Encountered end of segment marker at",
"position", reader.getFilePointer());
+ // Mark the log as fully consumed so isFullyRead() returns
true.
+ // The guard above ensures this is not overridden after
readSection returns.
+ this.position = (int) log.maxOffset();
statusTracker.requestTermination();
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]