Copilot commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2684440781


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return (stagedWalSize >= 
source.getReplicationEndpoint().getMaxBufferSize())

Review Comment:
   The condition checks if stagedWalSize is greater than or equal to 
getMaxBufferSize(), but getMaxBufferSize() can return -1 for non-buffering 
endpoints (as documented in ReplicationEndpoint). This means the comparison 
'stagedWalSize >= -1' would always be true when stagedWalSize > 0, causing 
immediate flushes for non-buffering endpoints. While this preserves existing 
behavior, the intent is unclear and could be confusing. Consider explicitly 
checking for -1 to make the logic more explicit.
   ```suggestion
       long maxBufferSize = source.getReplicationEndpoint().getMaxBufferSize();
       // For non-buffering endpoints, getMaxBufferSize() returns a negative 
value (e.g., -1).
       // In that case, we always trigger a flush based on size as soon as 
there is staged data.
       boolean sizeBasedFlush =
         (maxBufferSize < 0) || (stagedWalSize >= maxBufferSize);
       return sizeBasedFlush
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return (stagedWalSize >= 
source.getReplicationEndpoint().getMaxBufferSize())
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs
+          >= source.getReplicationEndpoint().maxFlushInterval());
+  }
+
+  private void persistLogPosition() {
+    if (lastShippedBatch == null) {
+      return;
+    }
+    if (stagedWalSize > 0) {
+      source.getReplicationEndpoint().beforePersistingReplicationOffset();
+    }
+    stagedWalSize = 0;
+    lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+
+    // Clean up hfile references
+    for (Entry entry : entriesForCleanUpHFileRefs) {
+      try {
+        cleanUpHFileRefs(entry.getEdit());
+      } catch (IOException e) {

Review Comment:
   The error handling for IOException has been changed to catch and log the 
exception instead of propagating it. This silently suppresses IOException 
failures during cleanup, which could hide serious issues like file system 
problems. If cleanup failures should be non-fatal, this should be explicitly 
documented, or consider at least incrementing a failure metric to track these 
errors.
   ```suggestion
         } catch (IOException e) {
           // Cleanup failures are intentionally treated as non-fatal: 
replication has already
           // succeeded for these entries, so we log the failure and continue.
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,29 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
+  // ContinuousBackupReplicationEndpoint
+  // and -1 for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()

Review Comment:
   The comment references 'shouldFlushStagedWal()' but the actual method name 
in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency 
will confuse developers trying to understand the interaction between these 
components.



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Tests staged WAL flush behavior in ReplicationSourceShipper.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationSourceShipperBufferedFlush {
+
+  static class CountingReplicationEndpoint extends BaseReplicationEndpoint {
+
+    private final AtomicInteger beforePersistCalls = new AtomicInteger();
+
+    @Override
+    public void start() {
+      startAsync().awaitRunning();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync().awaitTerminated();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext ctx) {
+      return true;
+    }
+
+    @Override
+    public void beforePersistingReplicationOffset() {
+      beforePersistCalls.incrementAndGet();
+    }
+
+    @Override
+    public long getMaxBufferSize() {
+      return 1L; // force immediate flush
+    }
+
+    @Override
+    public long maxFlushInterval() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return null;
+    }
+
+    int getBeforePersistCalls() {
+      return beforePersistCalls.get();
+    }
+  }
+
+  @Test
+  public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
+    endpoint.start();
+
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    ReplicationSourceWALReader walReader = 
Mockito.mock(ReplicationSourceWALReader.class);
+
+    Mockito.when(source.isPeerEnabled()).thenReturn(true);
+    Mockito.when(source.isSourceActive()).thenReturn(true);
+    Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
+    Mockito.when(source.getPeerId()).thenReturn("1");
+    
Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class));
+
+    WALEntryBatch batch = new WALEntryBatch(1, null);
+    batch.setLastWalPath(new Path("wal"));
+    batch.setLastWalPosition(1L);
+    // no entries, no heap size
+
+    Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
+
+    ReplicationSourceShipper shipper =
+      new ReplicationSourceShipper(conf, "wal-group", source, walReader);
+
+    shipper.start();
+
+    // Allow loop to run
+    Waiter.waitFor(conf, 3000, () -> true);
+
+    shipper.interrupt();

Review Comment:
   The test uses a fixed wait time of 3000ms with an empty lambda that always 
returns true (Waiter.waitFor(conf, 3000, () -> true)). This just sleeps 
unconditionally and doesn't actually verify any condition. The test should wait 
for a meaningful condition, such as verifying that the shipper has processed 
the batch or checking that the thread has reached a specific state. This makes 
the test timing-dependent and unreliable.
   ```suggestion
       // Wait until the shipper thread has finished processing the batch
       Waiter.waitFor(conf, 3000, () -> !shipper.isAlive());
   ```



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Tests staged WAL flush behavior in ReplicationSourceShipper.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationSourceShipperBufferedFlush {
+
+  static class CountingReplicationEndpoint extends BaseReplicationEndpoint {
+
+    private final AtomicInteger beforePersistCalls = new AtomicInteger();
+
+    @Override
+    public void start() {
+      startAsync().awaitRunning();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync().awaitTerminated();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext ctx) {
+      return true;
+    }
+
+    @Override
+    public void beforePersistingReplicationOffset() {
+      beforePersistCalls.incrementAndGet();
+    }
+
+    @Override
+    public long getMaxBufferSize() {
+      return 1L; // force immediate flush
+    }
+
+    @Override
+    public long maxFlushInterval() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return null;
+    }
+
+    int getBeforePersistCalls() {
+      return beforePersistCalls.get();
+    }
+  }
+
+  @Test
+  public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
+    endpoint.start();
+
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    ReplicationSourceWALReader walReader = 
Mockito.mock(ReplicationSourceWALReader.class);
+
+    Mockito.when(source.isPeerEnabled()).thenReturn(true);
+    Mockito.when(source.isSourceActive()).thenReturn(true);
+    Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
+    Mockito.when(source.getPeerId()).thenReturn("1");
+    
Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class));
+
+    WALEntryBatch batch = new WALEntryBatch(1, null);
+    batch.setLastWalPath(new Path("wal"));
+    batch.setLastWalPosition(1L);
+    // no entries, no heap size
+
+    Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);

Review Comment:
   The mock is configured to stub 'walReader.take()' but the 
ReplicationSourceShipper actually calls 'entryReader.poll(getEntriesTimeout)' 
(line 118 in ReplicationSourceShipper.java). This means the mock configuration 
has no effect, and the test will call the unmocked poll() method which will 
return null, causing the shipper to skip processing any batches. The test 
should mock 'poll(anyInt())' instead of 'take()'.
   ```suggestion
       
Mockito.when(walReader.poll(Mockito.anyInt())).thenReturn(batch).thenReturn(null);
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,29 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
+  // ContinuousBackupReplicationEndpoint
+  // and -1 for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
+  default long getMaxBufferSize() {
+    return -1;
+  }
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for
+  // ContinuousBackupReplicationEndpoint
+  // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()

Review Comment:
   The comment references 'shouldFlushStagedWal()' but the actual method name 
in ReplicationSourceShipper is 'shouldPersistLogPosition()'. This inconsistency 
will confuse developers trying to understand the interaction between these 
components.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -98,6 +103,10 @@ public final void run() {
     LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
     // Loop until we close down
     while (isActive()) {
+      // check if flush needed for WAL backup, this is need for timeout based 
flush

Review Comment:
   The comment has a typo: 'need' should be 'needed'. Should read "this is 
needed for timeout based flush".
   ```suggestion
         // check if flush needed for WAL backup, this is needed for timeout 
based flush
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to