This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 45c900d5d9 HDDS-12619. Optimize Recon OM Container Mismatch API (#8101)
45c900d5d9 is described below

commit 45c900d5d9d20e271fdb01cbd1814ce40c68ce47
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Mar 19 18:54:56 2025 -0700

    HDDS-12619. Optimize Recon OM Container Mismatch API (#8101)
---
 .../apache/hadoop/ozone/util/SeekableIterator.java |  28 +++++
 .../hadoop/ozone/recon/api/ContainerEndpoint.java  | 126 +++++++++------------
 .../recon/spi/ReconContainerMetadataManager.java   |   4 +
 .../impl/ReconContainerMetadataManagerImpl.java    | 116 ++++++++++++-------
 .../TestReconContainerMetadataManagerImpl.java     |   4 +-
 5 files changed, 160 insertions(+), 118 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
new file mode 100644
index 0000000000..fb424b28ca
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An {@link Iterator} that may hold resources until it is closed.
+ */
+public interface SeekableIterator<K, E> extends ClosableIterator<E> {
+  void seek(K position) throws IOException;
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index 041bcc8e6b..b38e7138a1 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -29,8 +29,9 @@
 import java.io.IOException;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +51,6 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -80,11 +80,13 @@
 import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.util.SeekableIterator;
 import 
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
 import 
org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Endpoint for querying keys that belong to a container.
  */
@@ -585,109 +587,87 @@ public Response getContainerMisMatchInsights(
 
     List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
         new ArrayList<>();
-    try {
-      Map<Long, ContainerMetadata> omContainers =
-          reconContainerMetadataManager.getContainers(-1, -1);
-      List<Long> scmNonDeletedContainers =
-          containerManager.getContainers().stream()
-              .filter(containerInfo -> containerInfo.getState() !=
-                  HddsProtos.LifeCycleState.DELETED)
-              .map(containerInfo -> containerInfo.getContainerID())
-              .collect(Collectors.toList());
-      DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
-
+    Long minContainerID = prevKey + 1;
+    Iterator<ContainerInfo> scmNonDeletedContainers =
+            containerManager.getContainers().stream()
+                    .filter(containerInfo -> (containerInfo.getContainerID() 
>= minContainerID))
+                    .filter(containerInfo -> containerInfo.getState() != 
HddsProtos.LifeCycleState.DELETED)
+                    
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
+    ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ?
+            scmNonDeletedContainers.next() : null;
+    DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
+    try (SeekableIterator<Long, ContainerMetadata> omContainers =
+                 reconContainerMetadataManager.getContainersIterator()) {
+      omContainers.seek(minContainerID);
+      ContainerMetadata containerMetadata = omContainers.hasNext() ? 
omContainers.next() : null;
       switch (dataFilter) {
-
       case SCM:
-        List<Map.Entry<Long, ContainerMetadata>> notSCMContainers =
-            omContainers.entrySet().stream()
-                .filter(
-                    containerMetadataEntry -> 
!scmNonDeletedContainers.contains(
-                        containerMetadataEntry.getKey()))
-                .collect(Collectors.toList());
-
-        if (prevKey > 0) {
-          int index = 0;
-          while (index < notSCMContainers.size() &&
-              notSCMContainers.get(index).getKey() <= prevKey) {
-            index++;
-          }
-          if (index < notSCMContainers.size()) {
-            notSCMContainers = notSCMContainers.subList(index,
-                Math.min(index + limit, notSCMContainers.size()));
+        List<ContainerMetadata> notSCMContainers = new ArrayList<>();
+        while (containerMetadata != null && notSCMContainers.size() < limit) {
+          Long omContainerID = containerMetadata.getContainerID();
+          Long scmContainerID = scmContainerInfo == null ? null : 
scmContainerInfo.getContainerID();
+          if (omContainerID.equals(scmContainerID)) {
+            containerMetadata = omContainers.hasNext() ? omContainers.next() : 
null;
+            scmContainerInfo = scmNonDeletedContainers.hasNext() ? 
scmNonDeletedContainers.next() : null;
+          } else if (scmContainerID == null || 
omContainerID.compareTo(scmContainerID) < 0) {
+            notSCMContainers.add(containerMetadata);
+            containerMetadata = omContainers.hasNext() ? omContainers.next() : 
null;
           } else {
-            notSCMContainers = Collections.emptyList();
+            scmContainerInfo = scmNonDeletedContainers.hasNext() ? 
scmNonDeletedContainers.next() : null;
           }
-        } else {
-          notSCMContainers = notSCMContainers.subList(0,
-              Math.min(limit, notSCMContainers.size()));
         }
 
         notSCMContainers.forEach(nonSCMContainer -> {
           ContainerDiscrepancyInfo containerDiscrepancyInfo =
               new ContainerDiscrepancyInfo();
-          containerDiscrepancyInfo.setContainerID(nonSCMContainer.getKey());
+          
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID());
           containerDiscrepancyInfo.setNumberOfKeys(
-              nonSCMContainer.getValue().getNumberOfKeys());
+              nonSCMContainer.getNumberOfKeys());
           containerDiscrepancyInfo.setPipelines(
-              nonSCMContainer.getValue().getPipelines());
+              nonSCMContainer.getPipelines());
           containerDiscrepancyInfo.setExistsAt("OM");
           containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
         });
         break;
 
       case OM:
-        List<Long> nonOMContainers = scmNonDeletedContainers.stream()
-            .filter(containerId -> !omContainers.containsKey(containerId))
-            .collect(Collectors.toList());
-
-        if (prevKey > 0) {
-          int index = 0;
-          while (index < nonOMContainers.size() &&
-              nonOMContainers.get(index) <= prevKey) {
-            index++;
-          }
-          if (index < nonOMContainers.size()) {
-            nonOMContainers = nonOMContainers.subList(index,
-                Math.min(index + limit, nonOMContainers.size()));
+        List<ContainerInfo> nonOMContainers = new ArrayList<>();
+        while (scmContainerInfo != null && nonOMContainers.size() < limit) {
+          Long omContainerID = containerMetadata == null ? null : 
containerMetadata.getContainerID();
+          Long scmContainerID = scmContainerInfo.getContainerID();
+          if (scmContainerID.equals(omContainerID)) {
+            scmContainerInfo = scmNonDeletedContainers.hasNext() ? 
scmNonDeletedContainers.next() : null;
+            containerMetadata = omContainers.hasNext() ? omContainers.next() : 
null;
+          } else if (omContainerID == null || 
scmContainerID.compareTo(omContainerID) < 0) {
+            nonOMContainers.add(scmContainerInfo);
+            scmContainerInfo = scmNonDeletedContainers.hasNext() ? 
scmNonDeletedContainers.next() : null;
           } else {
-            nonOMContainers = Collections.emptyList();
+            //Seeking directly to SCM containerId sequential read is just 
wasteful here if there are too many values
+            // to be read in b/w omContainerID & scmContainerID since 
(omContainerId<scmContainerID)
+            omContainers.seek(scmContainerID);
+            containerMetadata = omContainers.hasNext() ? omContainers.next() : 
null;
           }
-        } else {
-          nonOMContainers = nonOMContainers.subList(0,
-              Math.min(limit, nonOMContainers.size()));
         }
 
         List<Pipeline> pipelines = new ArrayList<>();
-        nonOMContainers.forEach(nonOMContainerId -> {
-          boolean containerExistsInScm = true;
-          ContainerDiscrepancyInfo containerDiscrepancyInfo =
-              new ContainerDiscrepancyInfo();
-          containerDiscrepancyInfo.setContainerID(nonOMContainerId);
+        nonOMContainers.forEach(containerInfo -> {
+          ContainerDiscrepancyInfo containerDiscrepancyInfo = new 
ContainerDiscrepancyInfo();
+          
containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID());
           containerDiscrepancyInfo.setNumberOfKeys(0);
           PipelineID pipelineID = null;
           try {
-            pipelineID = containerManager.getContainer(
-                ContainerID.valueOf(nonOMContainerId)).getPipelineID();
+            pipelineID = containerInfo.getPipelineID();
             if (pipelineID != null) {
               pipelines.add(pipelineManager.getPipeline(pipelineID));
             }
-          } catch (ContainerNotFoundException e) {
-            containerExistsInScm = false;
-            LOG.warn("Container {} not found in SCM: {}", nonOMContainerId,
-                e);
           } catch (PipelineNotFoundException e) {
             LOG.debug(
                 "Pipeline not found for container: {} and pipelineId: {}",
-                nonOMContainerId, pipelineID, e);
-          }
-          // The container might have been deleted in SCM after the call to
-          // get the list of containers
-          if (containerExistsInScm) {
-            containerDiscrepancyInfo.setPipelines(pipelines);
-            containerDiscrepancyInfo.setExistsAt("SCM");
-            containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
+                containerInfo, pipelineID, e);
           }
+          containerDiscrepancyInfo.setPipelines(pipelines);
+          containerDiscrepancyInfo.setExistsAt("SCM");
+          containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
         });
         break;
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
index 24605c95a0..1400279d12 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
 import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
 import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
+import org.apache.hadoop.ozone.util.SeekableIterator;
 
 /**
  * The Recon Container DB Service interface.
@@ -186,6 +187,9 @@ Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(
   Map<Long, ContainerMetadata> getContainers(int limit, long prevContainer)
       throws IOException;
 
+
+  SeekableIterator<Long, ContainerMetadata> getContainersIterator() throws 
IOException;
+
   /**
    * Delete an entry in the container DB.
    *
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
index 27567333d9..17cb793985 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
@@ -26,6 +26,7 @@
 
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -38,6 +39,7 @@
 import javax.inject.Singleton;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
@@ -54,6 +56,7 @@
 import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
 import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistoryList;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.util.SeekableIterator;
 import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
 import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
 import org.jooq.Configuration;
@@ -431,53 +434,80 @@ public Map<Long, ContainerMetadata> getContainers(int 
limit,
                                                     long prevContainer)
       throws IOException {
     Map<Long, ContainerMetadata> containers = new LinkedHashMap<>();
-    try (
-        TableIterator<ContainerKeyPrefix,
-            ? extends KeyValue<ContainerKeyPrefix, Integer>>
-            containerIterator = containerKeyTable.iterator()) {
-      ContainerKeyPrefix seekKey;
-      if (prevContainer > 0L) {
-        seekKey = ContainerKeyPrefix.get(prevContainer);
-        KeyValue<ContainerKeyPrefix,
-            Integer> seekKeyValue = containerIterator.seek(seekKey);
-        // Check if RocksDB was able to correctly seek to the given
-        // prevContainer containerId. If not, then return empty result
-        if (seekKeyValue != null &&
-            seekKeyValue.getKey().getContainerId() != prevContainer) {
-          return containers;
-        } else {
-          // seek to the prevContainer+1 containerID to start scan
-          seekKey = ContainerKeyPrefix.get(prevContainer + 1);
-          containerIterator.seek(seekKey);
-        }
+    try (SeekableIterator<Long, ContainerMetadata> containerIterator = 
getContainersIterator()) {
+      containerIterator.seek(prevContainer + 1);
+      while (containerIterator.hasNext() && ((limit < 0) || containers.size() 
< limit)) {
+        ContainerMetadata containerMetadata = containerIterator.next();
+        containers.put(containerMetadata.getContainerID(), containerMetadata);
       }
-      while (containerIterator.hasNext()) {
-        KeyValue<ContainerKeyPrefix, Integer> keyValue =
-            containerIterator.next();
-        ContainerKeyPrefix containerKeyPrefix = keyValue.getKey();
-        Long containerID = containerKeyPrefix.getContainerId();
-        Integer numberOfKeys = keyValue.getValue();
-        List<Pipeline> pipelines =
-            getPipelines(containerKeyPrefix);
-
-        // break the loop if limit has been reached
-        // and one more new entity needs to be added to the containers map
-        if (containers.size() == limit &&
-            !containers.containsKey(containerID)) {
-          break;
-        }
+    }
+    return containers;
+  }
+
+  @Override
+  public SeekableIterator<Long, ContainerMetadata> getContainersIterator()
+          throws IOException {
+    return new ContainerMetadataIterator();
+  }
+
+  private class ContainerMetadataIterator implements SeekableIterator<Long, 
ContainerMetadata> {
+    private TableIterator<ContainerKeyPrefix, ? extends 
KeyValue<ContainerKeyPrefix, Integer>> containerIterator;
+    private KeyValue<ContainerKeyPrefix, Integer> currentKey;
+
+    ContainerMetadataIterator()
+            throws IOException {
+      containerIterator = containerKeyTable.iterator();
+      currentKey = containerIterator.hasNext() ? containerIterator.next() : 
null;
+    }
 
-        // initialize containerMetadata with 0 as number of keys.
-        containers.computeIfAbsent(containerID, ContainerMetadata::new);
-        // increment number of keys for the containerID
-        ContainerMetadata containerMetadata = containers.get(containerID);
-        containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() +
-            numberOfKeys);
-        containerMetadata.setPipelines(pipelines);
-        containers.put(containerID, containerMetadata);
+    @Override
+    public void seek(Long containerID) throws IOException {
+      ContainerKeyPrefix seekKey = ContainerKeyPrefix.get(containerID);
+      containerIterator.seek(seekKey);
+      currentKey = containerIterator.hasNext() ? containerIterator.next() : 
null;
+    }
+
+    @Override
+    public void close() {
+      try {
+        containerIterator.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentKey != null;
+    }
+
+    @Override
+    public ContainerMetadata next() {
+      try {
+        if (currentKey == null) {
+          return null;
+        }
+        Map<PipelineID, Pipeline> pipelines = new HashMap<>();
+        ContainerMetadata containerMetadata = new 
ContainerMetadata(currentKey.getKey().getContainerId());
+        do {
+          ContainerKeyPrefix containerKeyPrefix = this.currentKey.getKey();
+          
containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() + 1);
+          getPipelines(containerKeyPrefix).forEach(pipeline -> {
+            pipelines.putIfAbsent(pipeline.getId(), pipeline);
+          });
+          if (containerIterator.hasNext()) {
+            currentKey = containerIterator.next();
+          } else {
+            currentKey = null;
+          }
+        } while (currentKey != null &&
+                currentKey.getKey().getContainerId() == 
containerMetadata.getContainerID());
+        containerMetadata.setPipelines(new ArrayList<>(pipelines.values()));
+        return containerMetadata;
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
       }
     }
-    return containers;
   }
 
   @Nonnull
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
index dbac3e5ee5..a3b5cef123 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
@@ -365,8 +365,8 @@ public void testGetContainersWithPrevContainer() throws 
Exception {
         reconContainerMetadataManager.getContainers(-1, 0L);
     assertEquals(2, containerMap.size());
 
-    assertEquals(3, containerMap.get(containerId).getNumberOfKeys());
-    assertEquals(3, containerMap.get(nextContainerId).getNumberOfKeys());
+    assertEquals(2, containerMap.get(containerId).getNumberOfKeys());
+    assertEquals(1, containerMap.get(nextContainerId).getNumberOfKeys());
 
     // test if limit works
     containerMap = reconContainerMetadataManager.getContainers(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to