wchevreuil commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2720780067
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+ if (!endpoint.isBufferedReplicationEndpoint()) {
+ // Non-buffering endpoints persist immediately
+ return true;
+ }
+ if (stagedWalSize == 0 || lastShippedBatch == null) {
+ return false;
+ }
+ return stagedWalSize >= endpoint.getMaxBufferSize()
+ || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >=
endpoint.maxFlushInterval());
+ }
+
+ @Nullable
+ // Returns IOException instead of throwing so callers can decide
+ // whether to retry (shipEdits) or best-effort log (run()).
+ private IOException persistLogPosition() {
+ if (lastShippedBatch == null) {
+ return null;
+ }
+
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+
+ if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) {
+ source.getReplicationEndpoint().beforePersistingReplicationOffset();
+ }
Review Comment:
Also should be in the endpoint.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+ if (!endpoint.isBufferedReplicationEndpoint()) {
+ // Non-buffering endpoints persist immediately
+ return true;
+ }
+ if (stagedWalSize == 0 || lastShippedBatch == null) {
+ return false;
+ }
+ return stagedWalSize >= endpoint.getMaxBufferSize()
+ || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >=
endpoint.maxFlushInterval());
+ }
Review Comment:
This should be in the Endpoint, as the decision varies according to the type
of endpoint. Today we have basically two types, buffered and non-buffered. If
we have new endpoint types in the future, we might again need to come here and
add the related logic.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,35 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();
+
+ /**
+ * @return true if this endpoint buffers WAL entries and requires explicit
flush control before
+ * persisting replication offsets.
+ */
+ default boolean isBufferedReplicationEndpoint() {
+ return false;
+ }
+
+ /**
+ * Maximum WAL size (bytes) to buffer before forcing a flush. Only
meaningful when
+ * isBufferedReplicationEndpoint() == true.
+ */
+ default long getMaxBufferSize() {
+ return -1L;
+ }
+
+ /**
+ * Maximum time (ms) to wait before forcing a flush. Only meaningful when
+ * isBufferedReplicationEndpoint() == true.
+ */
+ default long maxFlushInterval() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Hook invoked before persisting replication offsets. Buffered endpoints
should flush/close WALs
+ * here.
+ */
+ default void beforePersistingReplicationOffset() {
Review Comment:
From the endpoints view, this method is just a flush/close of the given wal,
so lets name it accordingly.
```suggestion
default void flushAndCloseWAL() {
```
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -190,13 +204,16 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
- // Clean up hfile references
- for (Entry entry : entries) {
- cleanUpHFileRefs(entry.getEdit());
- LOG.trace("shipped entry {}: ", entry);
+
+ stagedWalSize += currentSize;
+ entriesForCleanUpHFileRefs.addAll(entries);
+ lastShippedBatch = entryBatch;
+ if (shouldPersistLogPosition()) {
Review Comment:
Rather than having these `stagedWalSize` and `lastShippedBatch` as global
variables, we should just pass the `entryBatch` along to
`shouldPersistLogPosition()` (which should be defined/implemented in the
endpoints, btw, see my other comment related) and `persistLogPosition()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]