adoroszlai commented on code in PR #4599:
URL: https://github.com/apache/ozone/pull/4599#discussion_r1173439504
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java:
##########
@@ -217,7 +217,18 @@ private List<DatanodeDetails> getSources(
}
})
.filter(r -> !pendingDeletion.contains(r.getDatanodeDetails()))
- .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId()))
+ .collect(Collectors.toList());
+
+ // We should replicate only the max available sequence ID, as replicas with
+ // earlier sequence IDs may be stale copies.
+ long maxSequenceId = availableSources.stream()
+ .map(r -> {
+ Long seqId = r.getSequenceId();
+ return seqId == null ? Long.valueOf(0L) : seqId;
+ }).max(Long::compareTo).orElse(0L);
+
+ return availableSources.stream()
+ .filter(r -> r.getSequenceId() == maxSequenceId)
Review Comment:
This comparison may still produce NPE if `r.getSequenceId()` is `null`, due
to auto-unboxing (converted to `long` for comparison).
```
java.lang.NullPointerException
at
org.apache.hadoop.hdds.scm.container.replication.RatisUnderReplicationHandler.lambda$getSources$5(RatisUnderReplicationHandler.java:231)
at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at
org.apache.hadoop.hdds.scm.container.replication.RatisUnderReplicationHandler.getSources(RatisUnderReplicationHandler.java:233)
```
The following is safe (tested with all non-nulls, mixed, all nulls):
```suggestion
OptionalLong maxSequenceId = availableSources.stream()
.filter(r -> r.getSequenceId() != null)
.mapToLong(ContainerReplica::getSequenceId)
.max();
Stream<ContainerReplica> replicaStream = availableSources.stream();
if (maxSequenceId.isPresent()) {
replicaStream = replicaStream
.filter(r -> r.getSequenceId() != null)
.filter(r -> r.getSequenceId() == maxSequenceId.getAsLong());
}
return replicaStream
```
Note: some new imports are needed for it to compile.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]