Apache9 commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2802689035


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +272,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) {
+      return false;
+    }
+
+    // Default behaviour to update offset immediately after replicate()
+    if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == 
Long.MAX_VALUE) {

Review Comment:
   This is a bit strange...
   
   If offsetUpdateSizeThresholdBytes is -1, then the below 
accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes will always 
returns true, so we do not need this check here?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -105,10 +130,17 @@ public final void run() {
         sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
         continue;
       }
+      // check time-based offset persistence
+      if (shouldPersistLogPosition()) {

Review Comment:
   So this wants to solve the problem that, since now we may not always persist 
the offset and shipping, it is possible that we shipped out a batch, and then 
did not get a new batch for a very long time, so if we do not add a check here, 
there is no way for us to persist the offset?
   
   I think we also need to change the poll timeout below, we should not wait 
longer than the remaining time of update offset interval?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -90,6 +99,22 @@ public ReplicationSourceShipper(Configuration conf, String 
walGroupId, Replicati
       this.conf.getInt("replication.source.getEntries.timeout", 
DEFAULT_TIMEOUT);
     this.shipEditsTimeout = 
this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
       HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+    ReplicationPeer peer = source.getPeer();
+    if (peer != null && peer.getPeerConfig() != null) {

Review Comment:
   The conf here already contains all the configurations in peer config. You 
can see the implementation of ReplicationPeers for more details, we will create 
a CompoundConfiguration and apply the peerConfig.getConfiguration() to it.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -154,15 +196,16 @@ protected void postFinish() {
   private void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
-    if (entries.isEmpty()) {
-      updateLogPosition(entryBatch);
-      return;
-    }
     int currentSize = (int) entryBatch.getHeapSize();
     source.getSourceMetrics()
       .setTimeStampNextToReplicate(entries.get(entries.size() - 
1).getKey().getWriteTime());
     while (isActive()) {
       try {
+        if (entries.isEmpty()) {

Review Comment:
   Why move this here?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -154,15 +196,16 @@ protected void postFinish() {
   private void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
-    if (entries.isEmpty()) {
-      updateLogPosition(entryBatch);
-      return;
-    }
     int currentSize = (int) entryBatch.getHeapSize();
     source.getSourceMetrics()
       .setTimeStampNextToReplicate(entries.get(entries.size() - 
1).getKey().getWriteTime());
     while (isActive()) {
       try {
+        if (entries.isEmpty()) {

Review Comment:
   OK, I guess I know why you move this here, in the past updateLogPosition can 
not fail(it will lead to an abort so we can assume it never fails), but now 
since we have introduced a callback method, it could fail.
   
   Then I do not think this is the correct way to deal with this. Consider the 
S3 based solution, if you fail to commit the file on S3, then the correct way 
is to send the data again? But here we just retry committing... I think we 
should restart from the previous persist offset and send data again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to