smengcl commented on code in PR #9656:
URL: https://github.com/apache/ozone/pull/9656#discussion_r2723993260
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java:
##########
@@ -247,39 +247,166 @@ private void submitSnapshotPurgeRequest(List<String>
purgeSnapshotKeys) {
}
}
- private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
- List<SnapshotMoveKeyInfos>
deletedKeys,
- List<HddsProtos.KeyValue>
renamedList,
- List<SnapshotMoveKeyInfos>
dirsToMove) {
-
- SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder =
SnapshotMoveTableKeysRequest.newBuilder()
+ /**
+ * Submits a single batch of snapshot move requests.
+ *
+ * @param snapInfo The snapshot being processed
+ * @param deletedKeys List of deleted keys to move
+ * @param renamedList List of renamed keys
+ * @param dirsToMove List of deleted directories to move
+ * @return true if submission was successful, false otherwise
+ */
+ private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo,
+ List<SnapshotMoveKeyInfos>
deletedKeys,
+ List<HddsProtos.KeyValue>
renamedList,
+ List<SnapshotMoveKeyInfos>
dirsToMove) {
+ SnapshotMoveTableKeysRequest.Builder moveDeletedKeys =
SnapshotMoveTableKeysRequest.newBuilder()
.setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()));
- SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
- .addAllDeletedKeys(deletedKeys)
- .addAllRenamedKeys(renamedList)
- .addAllDeletedDirs(dirsToMove)
- .build();
- if (isBufferLimitCrossed(ratisByteLimit, 0,
moveDeletedKeys.getSerializedSize())) {
- int remaining = MIN_ERR_LIMIT_PER_TASK;
- deletedKeys = deletedKeys.subList(0, Math.min(remaining,
deletedKeys.size()));
- remaining -= deletedKeys.size();
- renamedList = renamedList.subList(0, Math.min(remaining,
renamedList.size()));
- remaining -= renamedList.size();
- dirsToMove = dirsToMove.subList(0, Math.min(remaining,
dirsToMove.size()));
- moveDeletedKeys = moveDeletedKeysBuilder
- .addAllDeletedKeys(deletedKeys)
- .addAllRenamedKeys(renamedList)
- .addAllDeletedDirs(dirsToMove)
- .build();
+ if (!deletedKeys.isEmpty()) {
+ moveDeletedKeys.addAllDeletedKeys(deletedKeys);
+ }
+
+ if (!renamedList.isEmpty()) {
+ moveDeletedKeys.addAllRenamedKeys(renamedList);
+ }
+
+ if (!dirsToMove.isEmpty()) {
+ moveDeletedKeys.addAllDeletedDirs(dirsToMove);
}
OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.SnapshotMoveTableKeys)
- .setSnapshotMoveTableKeysRequest(moveDeletedKeys)
+ .setSnapshotMoveTableKeysRequest(moveDeletedKeys.build())
.setClientId(clientId.toString())
.build();
- submitOMRequest(omRequest);
+
+ try {
+ OzoneManagerProtocolProtos.OMResponse response =
submitRequest(omRequest);
+ if (response == null || !response.getSuccess()) {
+ LOG.error("SnapshotMoveTableKeys request failed. Will retry in the
next run.");
+ return false;
+ }
+ return true;
+ } catch (ServiceException e) {
+ LOG.error("SnapshotMoveTableKeys request failed. Will retry in the
next run", e);
+ return false;
+ }
+ }
+
+ /**
+ * Submits snapshot move requests with batching to respect the Ratis
buffer limit.
+ * This method progressively builds batches while checking size limits
before adding entries.
+ *
+ * @param snapInfo The snapshot being processed
+ * @param deletedKeys List of deleted keys to move
+ * @param renamedList List of renamed keys
+ * @param dirsToMove List of deleted directories to move
+ * @return The number of entries successfully submitted
+ */
+ private int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo
snapInfo,
+
List<SnapshotMoveKeyInfos> deletedKeys,
+
List<HddsProtos.KeyValue> renamedList,
+
List<SnapshotMoveKeyInfos> dirsToMove) {
+ List<SnapshotMoveKeyInfos> currentDeletedKeys = new ArrayList<>();
+ List<HddsProtos.KeyValue> currentRenamedKeys = new ArrayList<>();
+ List<SnapshotMoveKeyInfos> currentDeletedDirs = new ArrayList<>();
+ long batchBytes = 0;
+ int totalSubmitted = 0;
+ int batchCount = 0;
+
+ for (SnapshotMoveKeyInfos key : deletedKeys) {
+ int keySize = key.getSerializedSize();
+
+ // If adding this key would exceed the limit, flush the current batch
first
+ if (batchBytes + keySize > ratisByteLimit &&
!currentDeletedKeys.isEmpty()) {
+ batchCount++;
+ LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys,
{} renamedKeys, {} deletedDirs, " +
+ "size: {} bytes", batchCount, snapInfo.getTableKey(),
currentDeletedKeys.size(),
+ currentRenamedKeys.size(), currentDeletedDirs.size(),
batchBytes);
+
+ if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys,
currentRenamedKeys, currentDeletedDirs)) {
+ return totalSubmitted;
+ }
+
+ totalSubmitted += currentDeletedKeys.size();
+ currentDeletedKeys.clear();
+ batchBytes = 0;
+ }
+
+ currentDeletedKeys.add(key);
+ batchBytes += keySize;
+ }
+
+ for (HddsProtos.KeyValue renameKey : renamedList) {
+ int keySize = renameKey.getSerializedSize();
+
+ // If adding this key would exceed the limit, flush the current batch
first
+ if (batchBytes + keySize > ratisByteLimit &&
+ (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) {
+ batchCount++;
+ LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys,
{} renamedKeys, {} deletedDirs, " +
+ "size: {} bytes", batchCount, snapInfo.getTableKey(),
currentDeletedKeys.size(),
+ currentRenamedKeys.size(), currentDeletedDirs.size(),
batchBytes);
+
+ if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys,
currentRenamedKeys, currentDeletedDirs)) {
+ return totalSubmitted;
+ }
+
+ totalSubmitted += currentDeletedKeys.size() +
currentRenamedKeys.size();
+ currentDeletedKeys.clear();
+ currentRenamedKeys.clear();
+ batchBytes = 0;
+ }
+
+ currentRenamedKeys.add(renameKey);
+ batchBytes += keySize;
+ }
+
+ for (SnapshotMoveKeyInfos dir : dirsToMove) {
+ int dirSize = dir.getSerializedSize();
+
+ // If adding this dir would exceed the limit, flush the current batch
first
+ if (batchBytes + dirSize > ratisByteLimit &&
+ (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() ||
!currentDeletedDirs.isEmpty())) {
+ batchCount++;
+ LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys,
{} renamedKeys, {} deletedDirs, " +
+ "size: {} bytes", batchCount, snapInfo.getTableKey(),
currentDeletedKeys.size(),
+ currentRenamedKeys.size(), currentDeletedDirs.size(),
batchBytes);
+
+ if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys,
currentRenamedKeys, currentDeletedDirs)) {
+ return totalSubmitted;
+ }
+
+ totalSubmitted += currentDeletedKeys.size() +
currentRenamedKeys.size() + currentDeletedDirs.size();
+ currentDeletedKeys.clear();
+ currentRenamedKeys.clear();
+ currentDeletedDirs.clear();
+ batchBytes = 0;
+ }
+
+ currentDeletedDirs.add(dir);
+ batchBytes += dirSize;
Review Comment:
`batchBytes` is not accounting for `fromSnapshotID` serialized size and the
size of enclosing `OMRequest`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]