Copilot commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r3012357137
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +297,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) {
+ return false;
+ }
+
+ // Default behaviour to update offset immediately after replicate()
+ if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs ==
Long.MAX_VALUE) {
+ return true;
+ }
+
+ return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes)
+ || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >=
offsetUpdateIntervalMs);
Review Comment:
`shouldPersistLogPosition` treats `offsetUpdateSizeThresholdBytes == -1` as
a real threshold, so `accumulatedSizeSinceLastUpdate >= -1` will be true
whenever any data has been shipped. This makes it impossible to enable
*time-based only* persistence by setting just
`hbase.replication.shipper.offset.update.interval.ms` (you will still persist
immediately after every batch). Consider treating negative/zero thresholds as
"disabled" for size-based persistence and only applying the size comparison
when the configured threshold is > 0.
```suggestion
// Treat non-positive size thresholds as "disabled" for size-based
persistence.
boolean sizeThresholdReached = offsetUpdateSizeThresholdBytes > 0
&& accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes;
boolean timeThresholdReached =
EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >=
offsetUpdateIntervalMs;
return sizeThresholdReached || timeThresholdReached;
```
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -118,11 +154,24 @@ public final void run() {
} else {
shipEdits(entryBatch);
}
Review Comment:
With deferred offset persistence enabled (size/interval thresholds),
receiving `WALEntryBatch.NO_MORE_DATA` can transition the worker to `FINISHED`
without ever calling `persistLogPosition()` for the last shipped batch. This
can drop the final offset update and any deferred HFile-ref cleanup, causing
re-replication/recovery work. Consider persisting (if
`accumulatedSizeSinceLastUpdate > 0`) before calling `noMoreData()` / before
setting state to `FINISHED`.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -190,13 +252,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
- // Clean up hfile references
- for (Entry entry : entries) {
- cleanUpHFileRefs(entry.getEdit());
- LOG.trace("shipped entry {}: ", entry);
+
+ accumulatedSizeSinceLastUpdate += currentSize;
+ entriesForCleanUpHFileRefs.addAll(entries);
Review Comment:
`entriesForCleanUpHFileRefs.addAll(entries)` retains full `WAL.Entry`
objects across batches until the next offset persist. With large
thresholds/intervals this can hold a large amount of WAL data in memory
(including edits/cells) even though cleanup only needs bulk-load-related
metadata. Consider recording only the minimal information needed for cleanup
(e.g., extracted bulk-load store file refs) or only retaining entries that
actually contain `WALEdit.BULK_LOAD` cells.
```suggestion
// Only retain WAL entries that contain bulk load cells for later
HFile ref cleanup
for (Entry entry : entries) {
WALEdit edit = entry.getEdit();
if (edit == null) {
continue;
}
for (Cell cell : edit.getCells()) {
if (CellUtil.matchFamily(cell, WALEdit.BULK_LOAD)) {
entriesForCleanUpHFileRefs.add(entry);
break;
}
}
}
```
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,11 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();
+
+ /**
+ * Hook invoked before persisting replication offsets. Eg: Buffered
endpoints can flush/close WALs
Review Comment:
Javadoc typo: use "e.g.," instead of "Eg:".
```suggestion
* Hook invoked before persisting replication offsets, e.g., buffered
endpoints can flush/close WALs
```
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java:
##########
@@ -866,4 +866,24 @@ public long getTotalReplicatedEdits() {
long getSleepForRetries() {
return sleepForRetries;
}
+
+ void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) {
+ workerThreads.compute(walGroupId, (key, current) -> {
+ if (current != oldWorker) {
+ return current; // already replaced
+ }
+
+ LOG.warn("Restarting shipper for walGroupId={}", walGroupId);
+
+ try {
+ ReplicationSourceShipper newWorker = createNewShipper(walGroupId);
+ startShipper(newWorker);
+ return newWorker;
+ } catch (Exception e) {
+ LOG.error("Failed to restart shipper for walGroupId={}", walGroupId,
e);
+ return current; // retry later
Review Comment:
If `createNewShipper`/`startShipper` fails, this code returns `current` (the
old worker). But `abortAndRestart` has already interrupted the old worker, so
leaving it in `workerThreads` can permanently stall replication for this
walGroupId (and also blocks `tryStartNewShipper` from creating a new worker
because the map entry is non-null). Consider removing the entry / scheduling a
retry / not interrupting the old worker until the new one is successfully
started.
```suggestion
// Remove the aborted worker so that a future attempt can create a
new shipper
return null;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]