errose28 commented on code in PR #10570:
URL: https://github.com/apache/ozone/pull/10570#discussion_r3484269804


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java:
##########
@@ -33,6 +34,8 @@ public class ReplicationTask extends AbstractReplicationTask {
   public static final String METRIC_NAME = "ContainerReplications";
   public static final String METRIC_DESCRIPTION_SEGMENT = "container 
replications";
 
+  private int lowestCommonApparentVersion = 
HDDSVersion.DEFAULT_VERSION.serialize();

Review Comment:
   In the pojos we should operate on the typed `HDDSVersion` as much as 
possible and defer `int` usage until serialization is required.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java:
##########
@@ -84,6 +84,12 @@ public void handle(SCMCommand<?> command, OzoneContainer 
container,
             downloadReplicator : pushReplicator;
 
     ReplicationTask task = new ReplicationTask(replicateCommand, replicator);
+
+    int peerVersion = replicateCommand.getPeerApparentVersion();
+    int localVersion = context.getParent().getVersionManager()
+        .getApparentVersion().serialize();
+    task.setLowestCommonApparentVersion(Math.min(localVersion, peerVersion));

Review Comment:
   I think we should leave all logic on SCM as the coordinator with datanodes 
just following orders. That means SCM would already compute the min apparent 
version among the nodes to replicate and send that with the command as the 
definitive version to use.
   
   SCM may have stale information but this is ok since the apparent version can 
never go backwards and older apparent versions will remain supported.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java:
##########
@@ -125,6 +128,21 @@ public void setTransferredBytes(long transferredBytes) {
     this.transferredBytes = transferredBytes;
   }
 
+  /**
+   * @return the lowest apparent version supported by both this datanode and
+   *     the replication peer, computed as {@code min(localApparentVersion,
+   *     peerApparentVersion)}. Replicators can use this to gate
+   *     version-dependent protocol features so that the newer side downgrades
+   *     to a behavior the older side understands.
+   */
+  public int getLowestCommonApparentVersion() {

Review Comment:
   Again following the pattern of SCM doing the computation and datanodes 
following orders: DNs should only know that this is the apparent version they 
need to use for this replication task, but they don't need to know why. This 
allows us to change the logic for deciding apparent version on SCM later.
   ```suggestion
     public int getApparentVersion() {
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java:
##########
@@ -42,6 +43,7 @@ public final class ReplicateContainerCommand
   private int replicaIndex = 0;
   private ReplicationCommandPriority priority =
       ReplicationCommandPriority.NORMAL;
+  private int peerApparentVersion = HDDSVersion.DEFAULT_VERSION.serialize();

Review Comment:
   ```suggestion
     private HDDSVersion apparentVersion = HDDSVersion.DEFAULT_VERSION;
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -675,6 +682,36 @@ public void sendDatanodeCommand(SCMCommand<?> command,
         scmDeadlineEpochMs);
   }
 
+  private void enrichReplicateCommandWithPeerVersion(
+      ReplicateContainerCommand cmd) {
+    if (cmd.getTargetDatanode() != null) {
+      cmd.setPeerApparentVersion(
+          lookupApparentVersion(cmd.getTargetDatanode()));
+    } else {
+      int min = cmd.getSourceDatanodes().stream()
+          .mapToInt(this::lookupApparentVersion)
+          .min()
+          .orElse(HDDSVersion.DEFAULT_VERSION.serialize());
+      cmd.setPeerApparentVersion(min);
+    }
+  }
+
+  private int lookupApparentVersion(DatanodeDetails dn) {
+    DatanodeInfo info = nodeManager.getDatanodeInfo(dn);
+    if (info == null) {
+      return HDDSVersion.DEFAULT_VERSION.serialize();

Review Comment:
   This should probably throw `NodeNotFound` exception instead. We shouldn't 
proceed with a replication command to a node which we don't have info for.



##########
hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto:
##########
@@ -429,6 +429,7 @@ message ReplicateContainerCommandProto {
   optional int32 replicaIndex = 4;
   optional DatanodeDetailsProto target = 5;
   optional ReplicationCommandPriority priority = 6 [default = NORMAL];
+  optional uint32 peerApparentVersion = 7;

Review Comment:
   With the above suggestion this version will be the one that should be used 
to carry out the replication, which may actually not be the peer's apparent 
version if the peer's version is higher.
   ```suggestion
     optional uint32 apparentVersion = 7;
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -664,6 +667,10 @@ public void sendDatanodeCommand(SCMCommand<?> command,
       throws NotLeaderException {
     long datanodeDeadline =
         scmDeadlineEpochMs - rmConf.getDatanodeTimeoutOffset();
+    if (command.getType() == Type.replicateContainerCommand) {
+      enrichReplicateCommandWithPeerVersion(
+          (ReplicateContainerCommand) command);

Review Comment:
   The cast means we are doing this too low in the stack. There are higher 
methods that deal specifically with replication commands where we should add 
this. We might also want to make `toTarget` take an `HDDSVersion` as a required 
parameter, and have all tests call `forTest` which would automatically fill 
that in with the current version.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java:
##########
@@ -125,6 +128,21 @@ public void setTransferredBytes(long transferredBytes) {
     this.transferredBytes = transferredBytes;
   }
 
+  /**
+   * @return the lowest apparent version supported by both this datanode and
+   *     the replication peer, computed as {@code min(localApparentVersion,
+   *     peerApparentVersion)}. Replicators can use this to gate
+   *     version-dependent protocol features so that the newer side downgrades
+   *     to a behavior the older side understands.
+   */
+  public int getLowestCommonApparentVersion() {
+    return lowestCommonApparentVersion;
+  }
+
+  public void setLowestCommonApparentVersion(int lowestCommonApparentVersion) {

Review Comment:
   This information should arrive from the `ReplicateContainerCommand` passed 
to the constructor so it can be `final` without a setter.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java:
##########
@@ -116,4 +139,76 @@ public void testMetrics() {
       metrics.unRegister();
     }
   }
+
+  @Test
+  public void testEffectiveVersionPeerIsOlder() {
+    ArgumentCaptor<ReplicationTask> captor = captureSubmittedTask();
+    when(versionManager.getApparentVersion())
+        .thenReturn(HDDSVersion.STREAM_BLOCK_SUPPORT);
+
+    ReplicateContainerCommandHandler handler =
+        new ReplicateContainerCommandHandler(conf, supervisor,
+            downloadReplicator, pushReplicator);
+
+    DatanodeDetails source = MockDatanodeDetails.randomDatanodeDetails();
+    List<DatanodeDetails> sources = new ArrayList<>();
+    sources.add(source);

Review Comment:
   We can assume that only push replication is supported for now and #10584 
will be done by the time this is merged into master.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -675,6 +682,36 @@ public void sendDatanodeCommand(SCMCommand<?> command,
         scmDeadlineEpochMs);
   }
 
+  private void enrichReplicateCommandWithPeerVersion(
+      ReplicateContainerCommand cmd) {
+    if (cmd.getTargetDatanode() != null) {
+      cmd.setPeerApparentVersion(
+          lookupApparentVersion(cmd.getTargetDatanode()));
+    } else {

Review Comment:
   Let's remove this branch. We can assume #10584 is going to be merged before 
this. With that we can probably collapse this and `lookupApparentVersion` into 
one method.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -675,6 +682,36 @@ public void sendDatanodeCommand(SCMCommand<?> command,
         scmDeadlineEpochMs);
   }
 
+  private void enrichReplicateCommandWithPeerVersion(
+      ReplicateContainerCommand cmd) {
+    if (cmd.getTargetDatanode() != null) {
+      cmd.setPeerApparentVersion(
+          lookupApparentVersion(cmd.getTargetDatanode()));
+    } else {
+      int min = cmd.getSourceDatanodes().stream()
+          .mapToInt(this::lookupApparentVersion)
+          .min()
+          .orElse(HDDSVersion.DEFAULT_VERSION.serialize());
+      cmd.setPeerApparentVersion(min);
+    }
+  }
+
+  private int lookupApparentVersion(DatanodeDetails dn) {
+    DatanodeInfo info = nodeManager.getDatanodeInfo(dn);
+    if (info == null) {
+      return HDDSVersion.DEFAULT_VERSION.serialize();
+    }
+    ComponentVersion peerVersion = info.getLastKnownApparentVersion();
+    // A peer reporting a version newer than this SCM can recognize
+    // deserializes to UNKNOWN_VERSION (-1). Treat it as the oldest known
+    // version so the client datanode falls back to the most conservative,
+    // backward-compatible behavior rather than propagating the sentinel.
+    if (peerVersion == null || peerVersion == HDDSVersion.UNKNOWN_VERSION) {
+      return HDDSVersion.DEFAULT_VERSION.serialize();
+    }

Review Comment:
   SCM needs to be upgraded before Datanodes and Datanodes should be fenced out 
of the cluster if they are newer. If we somehow make it this far we should 
throw to stop any further operations on the illegal newer node.
   
   If `peerVersion` is null we should log a warning, since Datanodes are 
expected to report this on every heartbeat. Without this, if there is an issue 
with version reporting we would otherwise silently fall back to the lowest 
version indefinitely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to