Apache9 commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2736040903
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +247,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
Review Comment:
Here we should use configuration values instead of getting from
ReplicationEndpoint. We can have default configuration values to keep the old
behavior for normal replication.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +247,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+ long maxBufferSize = endpoint.getMaxBufferSize();
+ if (stagedWalSize == 0 || lastShippedBatch == null) {
+ return false;
+ }
+ if (maxBufferSize == -1) {
+ return true;
+ }
+ return stagedWalSize >= maxBufferSize
+ || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >=
endpoint.maxFlushInterval());
+ }
+
+ private void persistLogPosition() throws IOException {
+ if (lastShippedBatch == null) {
Review Comment:
Since we could cumulate different batches in the above loop, a null batch
does not mean we haven't shipped anything out? Why here we just return if
lastShippedBatch is null?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -98,6 +103,14 @@ public final void run() {
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}",
this.walGroupId);
// Loop until we close down
while (isActive()) {
+ // Whether to persist replication offsets based on size/time thresholds
+ if (shouldPersistLogPosition()) {
+ try {
+ persistLogPosition();
+ } catch (IOException e) {
+ LOG.warn("Exception while persisting replication state", e);
Review Comment:
This is not enough for handling the exception? Typically we should restart
from the last persistent offset and replicate again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]