anmolnar commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2709926770


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return (stagedWalSize >= 
source.getReplicationEndpoint().getMaxBufferSize())

Review Comment:
   I can agree with this comment too. You introduced two new properties: 
`getMaxBufferSize()` and `maxFlushInterval()` with default values which should 
mimic the original behavior. I think you documented it in `ReplicationEndpoint` 
interface properly and it would be useful to explicitly check for it too.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,29 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
+  // ContinuousBackupReplicationEndpoint
+  // and -1 for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()

Review Comment:
   @ankitsol It seems that this comment is valid.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return (stagedWalSize >= 
source.getReplicationEndpoint().getMaxBufferSize())
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs
+          >= source.getReplicationEndpoint().maxFlushInterval());
+  }
+
+  private void persistLogPosition() {
+    if (lastShippedBatch == null) {
+      return;
+    }
+    if (stagedWalSize > 0) {
+      source.getReplicationEndpoint().beforePersistingReplicationOffset();
+    }
+    stagedWalSize = 0;
+    lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+
+    // Clean up hfile references
+    for (Entry entry : entriesForCleanUpHFileRefs) {
+      try {
+        cleanUpHFileRefs(entry.getEdit());
+      } catch (IOException e) {
+        LOG.warn("{} threw unknown exception:",
+          source.getReplicationEndpoint().getClass().getName(), e);
+      }

Review Comment:
   Agree. Either the exception should be rethrown here or don't need to catch 
at all.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,29 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
+  // ContinuousBackupReplicationEndpoint
+  // and -1 for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
+  default long getMaxBufferSize() {
+    return -1;
+  }
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for
+  // ContinuousBackupReplicationEndpoint
+  // and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()

Review Comment:
   Same here.



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperBufferedFlush.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Tests staged WAL flush behavior in ReplicationSourceShipper.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationSourceShipperBufferedFlush {
+
+  static class CountingReplicationEndpoint extends BaseReplicationEndpoint {
+
+    private final AtomicInteger beforePersistCalls = new AtomicInteger();
+
+    @Override
+    public void start() {
+      startAsync().awaitRunning();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync().awaitTerminated();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext ctx) {
+      return true;
+    }
+
+    @Override
+    public void beforePersistingReplicationOffset() {
+      beforePersistCalls.incrementAndGet();
+    }
+
+    @Override
+    public long getMaxBufferSize() {
+      return 1L; // force immediate flush
+    }
+
+    @Override
+    public long maxFlushInterval() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return null;
+    }
+
+    int getBeforePersistCalls() {
+      return beforePersistCalls.get();
+    }
+  }
+
+  @Test
+  public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
+    endpoint.start();
+
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    ReplicationSourceWALReader walReader = 
Mockito.mock(ReplicationSourceWALReader.class);
+
+    Mockito.when(source.isPeerEnabled()).thenReturn(true);
+    Mockito.when(source.isSourceActive()).thenReturn(true);
+    Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
+    Mockito.when(source.getPeerId()).thenReturn("1");
+    
Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class));
+
+    WALEntryBatch batch = new WALEntryBatch(1, null);
+    batch.setLastWalPath(new Path("wal"));
+    batch.setLastWalPosition(1L);
+    // no entries, no heap size
+
+    Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
+
+    ReplicationSourceShipper shipper =
+      new ReplicationSourceShipper(conf, "wal-group", source, walReader);
+
+    shipper.start();
+
+    // Allow loop to run
+    Waiter.waitFor(conf, 3000, () -> true);
+
+    shipper.interrupt();

Review Comment:
   Another good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to