This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34c6c382 CASSANALYTICS-124: Commitlog reading not progressing in CDC 
due to incorrect CommitLogReader.isFullyRead (#176)
34c6c382 is described below

commit 34c6c38253d59a9fe8485368fea3145e2dffe205
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Mar 9 11:13:50 2026 -0700

    CASSANALYTICS-124: Commitlog reading not progressing in CDC due to 
incorrect CommitLogReader.isFullyRead (#176)
    
    Patch by Jyothsna Konisa; Reviewed by Josh McKenzie for CASSANALYTICS-124
---
 .../commitlog/BufferingCommitLogReaderTests.java   | 23 +++++++++++++++++----
 .../db/commitlog/BufferingCommitLogReader.java     | 24 +++++++++++++++++++---
 2 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
index fa4dfd0b..a5459ae7 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.db.commitlog;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.cdc.CdcTester;
 import org.apache.cassandra.cdc.CdcTests;
+import org.apache.cassandra.cdc.LocalCommitLog;
 import org.apache.cassandra.cdc.api.CommitLog;
 import org.apache.cassandra.cdc.api.Marker;
 import org.apache.cassandra.cdc.stats.CdcStats;
@@ -82,10 +84,18 @@ public class BufferingCommitLogReaderTests
         CdcTester.testCommitLog.sync();
 
         List<Marker> markers = Collections.synchronizedList(new ArrayList<>());
-        CommitLog firstLog = CdcTests.logProvider(directory)
-                                     .logs()
-                                     .min(CommitLog::compareTo)
-                                     .orElseThrow(() -> new 
RuntimeException("Commit log file not found"));
+        CommitLog firstLog = new 
LocalCommitLog(Paths.get(CdcTests.logProvider(directory)
+                                                                              
.logs()
+                                                                              
.min(CommitLog::compareTo)
+                                                                              
.orElseThrow(() -> new RuntimeException("Commit log file not found"))
+                                                                              
.path()))
+        {
+            @Override
+            public boolean completed()
+            {
+                return true;
+            }
+        };
 
         // read entire commit log and verify correct
         Consumer<Marker> listener = markers::add;
@@ -145,6 +155,11 @@ public class BufferingCommitLogReaderTests
                 assertThat(keys).contains(key);
             }
 
+            // Verify the position fix: after reading (from any start offset), 
position must
+            // reach maxOffset and isFullyRead() must return true.
+            assertThat(reader.position()).isEqualTo((int) logFile.maxOffset());
+            assertThat(result.isFullyRead()).isTrue();
+
             return keysRead;
         }
         catch (Exception e)
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
index 8a808d98..fd457d8a 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java
@@ -284,6 +284,9 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
             {
                 stats.commitLogBytesSkippedOnRead(startMarker.position() - 
reader.getFilePointer());
                 segmentReader.seek(startMarker.position());
+                // When starting from an offset, position must be initialized 
to startMarker.position()
+                // rather than 0; an incorrect value causes isFullyRead to 
fail.
+                this.position = startMarker.position();
             }
 
             for (CommitLogSegmentReader.SyncSegment syncSegment : 
segmentReader)
@@ -295,9 +298,13 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
 
                 readSection(syncSegment.input, syncSegment.endPosition);
 
-                // track the position at end of previous section after 
successfully reading mutations
-                // so we can update highwater mark after reading
-                this.position = (int) reader.getFilePointer();
+                // Only advance position if the section completed normally.
+                // An early termination (e.g. LEGACY_END_OF_SEGMENT_MARKER) 
sets this.position
+                // to the correct value inside readSection and must not be 
overridden here.
+                if (statusTracker.shouldContinue())
+                {
+                    this.position = (int) reader.getFilePointer();
+                }
 
                 if (listener != null)
                 {
@@ -309,6 +316,14 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                     break;
                 }
             }
+
+            // If the loop finished naturally (iterator exhausted) without 
hitting an error or limit,
+            // ensure the position reflects the end of the file. If we aborted 
early due to an error
+            // or mutation limit, 'this.position' remains at the last valid 
read offset.
+            if (statusTracker.shouldContinue())
+            {
+                this.position = (int) log.maxOffset();
+            }
         }
         // Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) 
cannot throw a checked exception,
         // so we check to see if a RuntimeException is wrapping an IOException.
@@ -427,6 +442,9 @@ public class BufferingCommitLogReader implements 
CommitLogReadHandler,
                 if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
                 {
                     logger.trace("Encountered end of segment marker at", 
"position", reader.getFilePointer());
+                    // Mark the log as fully consumed so isFullyRead() returns 
true.
+                    // The guard above ensures this is not overridden after 
readSection returns.
+                    this.position = (int) log.maxOffset();
                     statusTracker.requestTermination();
                     return;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to