JacksonYao287 commented on a change in pull request #2172:
URL: https://github.com/apache/ozone/pull/2172#discussion_r619732275



##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence
+      addNewContainer(containerID.getId(), containerWithPipeline);

Review comment:
       yes , done

##########
File path: 
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
##########
@@ -127,6 +129,33 @@ public void testCheckAndAddNewContainer() throws 
IOException {
         getContainerManager().getContainer(containerID).getState());
   }
 
+  @Test
+  public void testCheckAndAddNewContainerBatch() throws IOException {
+    List<ContainerReplicaProto> containerReplicaProtoList = new LinkedList<>();
+    ReconContainerManager containerManager = getContainerManager();
+    for (long i = 200L; i < 300L; i++) {
+      assertFalse(containerManager.containerExist(ContainerID.valueOf(i)));
+      ContainerReplicaProto.Builder ciBuilder =
+          ContainerReplicaProto.newBuilder();
+      ContainerReplicaProto crp = ciBuilder.
+          setContainerID(i).setState(OPEN).build();
+      containerReplicaProtoList.add(crp);

Review comment:
       added

##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence

Review comment:
       thanks for the review!  maybe we can  maintain it here for the present, 
and remove it later in another patch

##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence
+      addNewContainer(containerWithPipeline);
     } else {
-      // Check if container state is not open. In SCM, container state
-      // changes to CLOSING first, and then the close command is pushed down
-      // to Datanodes. Recon 'learns' this from DN, and hence replica state
-      // will move container state to 'CLOSING'.
-      ContainerInfo containerInfo = getContainer(containerID);
-      if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)
-          && !replicaState.equals(ContainerReplicaProto.State.OPEN)
-          && isHealthy(replicaState)) {
-        LOG.info("Container {} has state OPEN, but Replica has State {}.",
-            containerID, replicaState);
-        try {
-          updateContainerState(containerID, FINALIZE);
-        } catch (InvalidStateTransitionException e) {
-          throw new IOException(e);
-        }
+      checkContainerStateAndUpdate(containerID, replicaState);
+    }
+  }
+
+  /**
+   * Check and add new containers in batch if not already present in Recon.
+   *
+   * @param containerReplicaProtoList list of containerReplicaProtos.
+   * @throws IOException on Error.
+   */
+  public void checkAndAddNewContainerBatch(
+      List<ContainerReplicaProto> containerReplicaProtoList)
+      throws IOException {
+    Map<Boolean, List<ContainerReplicaProto>> containers =
+        containerReplicaProtoList.parallelStream()

Review comment:
       here , I want to divide "containerReplicaProtoList" into two parts , and 
both of the two parts will be used in the subsequent code。if i use 
"stream().filter()" instead, only one of the two parts got, so I have to call 
"stream().filter()" twice.

##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence
+      addNewContainer(containerWithPipeline);
     } else {
-      // Check if container state is not open. In SCM, container state
-      // changes to CLOSING first, and then the close command is pushed down
-      // to Datanodes. Recon 'learns' this from DN, and hence replica state
-      // will move container state to 'CLOSING'.
-      ContainerInfo containerInfo = getContainer(containerID);
-      if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)
-          && !replicaState.equals(ContainerReplicaProto.State.OPEN)
-          && isHealthy(replicaState)) {
-        LOG.info("Container {} has state OPEN, but Replica has State {}.",
-            containerID, replicaState);
-        try {
-          updateContainerState(containerID, FINALIZE);
-        } catch (InvalidStateTransitionException e) {
-          throw new IOException(e);
-        }
+      checkContainerStateAndUpdate(containerID, replicaState);
+    }
+  }
+
+  /**
+   * Check and add new containers in batch if not already present in Recon.
+   *
+   * @param containerReplicaProtoList list of containerReplicaProtos.
+   * @throws IOException on Error.
+   */
+  public void checkAndAddNewContainerBatch(
+      List<ContainerReplicaProto> containerReplicaProtoList)
+      throws IOException {
+    Map<Boolean, List<ContainerReplicaProto>> containers =
+        containerReplicaProtoList.parallelStream()
+        .collect(Collectors.groupingBy(c ->
+            containerExist(ContainerID.valueOf(c.getContainerID()))));
+
+    List<ContainerReplicaProto> existContainers = null;
+    if (containers.containsKey(true)) {
+      existContainers = containers.get(true);
+    }
+    List<Long> noExistContainers = null;
+    if (containers.containsKey(false)){
+      noExistContainers = containers.get(false).parallelStream().
+          map(ContainerReplicaProto::getContainerID)
+          .collect(Collectors.toList());
+    }
+
+    //for now , if any one container in noExistContainers is not found by SCM,
+    //an IOException will be throw and the whole noExistContainers will be 
drop.
+    //in some cases,this may slow the process for recon to learn new container,
+    //but it does not matter, just make it simple for the present
+    if (null != noExistContainers) {
+      List<ContainerWithPipeline> verifiedContainerPipeline =
+          scmClient.getContainerWithPipelineBatch(noExistContainers);

Review comment:
       yes ,  according to the current implementation of 
"getContainerWithPipelineBatch" , if any one container in noExistContainers is 
not found by SCM, an IOException will be throw and the whole noExistContainers 
will be drop.
   but I think it has little effect here,and just make it simple for the 
present.
   maybe we can make another patch to rewrite "getContainerWithPipelineBatch" 
in scm , which will just return the list of found containers, and do not throw 
an exception. 

##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence

Review comment:
       @symious Thank for the review!  "addNewContainer" will call 
"ContainerStateMap#contains" ,and this is what "containerExist" exactly does, 
so no need here. 

##########
File path: 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
##########
@@ -122,26 +122,87 @@ public void checkAndAddNewContainer(ContainerID 
containerID,
           scmClient.getContainerWithPipeline(containerID.getId());
       LOG.debug("Verified new container from SCM {}, {} ",
           containerID, containerWithPipeline.getPipeline().getId());
-      // If no other client added this, go ahead and add this container.
-      if (!containerExist(containerID)) {
-        addNewContainer(containerID.getId(), containerWithPipeline);
-      }
+      // no need call "containerExist" to check, because
+      // 1 containerExist and addNewContainer can not be atomic
+      // 2 addNewContainer will double check the existence

Review comment:
       @symious Thanks for the review!  "addNewContainer" will call 
"ContainerStateMap#contains" ,and this is what "containerExist" exactly does, 
so no need here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to