This is an automated email from the ASF dual-hosted git repository.
devesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7ee78a270f4 HDDS-14730. Update Recon container sync to use container
IDs (#9842)
7ee78a270f4 is described below
commit 7ee78a270f416a4e0f6b04e2780aee1e12374f15
Author: Jason O'Sullivan <[email protected]>
AuthorDate: Wed Mar 25 14:57:09 2026 +0000
HDDS-14730. Update Recon container sync to use container IDs (#9842)
---
.../protocol/StorageContainerLocationProtocol.java | 4 +-
...inerLocationProtocolClientSideTranslatorPB.java | 29 +++-
.../src/main/proto/ScmAdminProtocol.proto | 14 ++
.../hdds/scm/container/ContainerManager.java | 21 ++-
.../hdds/scm/container/ContainerManagerImpl.java | 8 +
.../hdds/scm/container/ContainerStateManager.java | 9 +
.../scm/container/ContainerStateManagerImpl.java | 7 +
.../scm/container/states/ContainerStateMap.java | 14 ++
...inerLocationProtocolServerSideTranslatorPB.java | 33 ++++
.../hdds/scm/server/SCMClientProtocolServer.java | 14 +-
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +-
.../scm/container/TestContainerStateManager.java | 31 ++++
.../container/states/TestContainerStateMap.java | 76 +++++++++
.../scm/server/TestSCMClientProtocolServer.java | 31 +++-
.../hadoop/ozone/TestOzoneConfigurationFields.java | 1 +
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 32 ++++
.../scm/ReconStorageContainerManagerFacade.java | 77 +--------
.../recon/scm/ReconStorageContainerSyncHelper.java | 113 +++++++++++++
.../recon/spi/StorageContainerServiceProvider.java | 8 +-
.../impl/StorageContainerServiceProviderImpl.java | 9 +-
.../scm/TestReconStorageContainerSyncHelper.java | 185 +++++++++++++++++++++
21 files changed, 621 insertions(+), 98 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index b057efb1ff8..32da41f8f17 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -505,8 +505,8 @@ StatusAndMessages queryUpgradeFinalizationProgress(
long getContainerCount(HddsProtos.LifeCycleState state)
throws IOException;
- List<ContainerInfo> getListOfContainers(
- long startContainerID, int count, HddsProtos.LifeCycleState state)
+ List<ContainerID> getListOfContainerIDs(
+ ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException;
DecommissionScmResponseProto decommissionScm(
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 01e3d709cf0..ca942465770 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -109,6 +110,8 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto;
@@ -1250,10 +1253,30 @@ public void close() {
}
@Override
- public List<ContainerInfo> getListOfContainers(
- long startContainerID, int count, HddsProtos.LifeCycleState state)
+ public List<ContainerID> getListOfContainerIDs(
+ ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException {
- return listContainer(startContainerID, count,
state).getContainerInfoList();
+ Preconditions.checkState(startContainerID.getId() >= 0,
+ "Container ID cannot be negative.");
+ Preconditions.checkState(count > 0,
+ "Container count must be greater than 0.");
+ SCMListContainerIDsRequestProto.Builder builder =
SCMListContainerIDsRequestProto
+ .newBuilder();
+ builder.setStartContainerID(startContainerID.getProtobuf());
+ builder.setCount(count);
+ builder.setTraceID(TracingUtil.exportCurrentSpan());
+ builder.setState(state);
+
+ SCMListContainerIDsRequestProto request = builder.build();
+
+ SCMListContainerIDsResponseProto response =
+ submitRequest(Type.ListContainerIDs,
+ builder1 -> builder1.setScmListContainerIDsRequest(request))
+ .getScmListContainerIDsResponse();
+ return response.getContainerIDsList()
+ .stream()
+ .map(ContainerID::getFromProtobuf)
+ .collect(Collectors.toList());
}
@Override
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 455f048d8b5..b6508ca9688 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -87,6 +87,7 @@ message ScmContainerLocationRequest {
optional ContainerBalancerStatusInfoRequestProto
containerBalancerStatusInfoRequest = 48;
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto
getDeletedBlocksTxnSummaryRequest = 50;
+ optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 51;
}
message ScmContainerLocationResponse {
@@ -145,6 +146,7 @@ message ScmContainerLocationResponse {
optional ContainerBalancerStatusInfoResponseProto
containerBalancerStatusInfoResponse = 48;
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto
getDeletedBlocksTxnSummaryResponse = 50;
+ optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 51;
enum Status {
OK = 1;
@@ -202,6 +204,7 @@ enum Type {
GetContainerBalancerStatusInfo = 44;
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
+ ListContainerIDs = 47;
}
/**
@@ -291,6 +294,17 @@ message GetExistContainerWithPipelinesInBatchResponseProto
{
repeated ContainerWithPipeline containerWithPipelines = 1;
}
+message SCMListContainerIDsRequestProto {
+ required uint32 count = 1;
+ optional ContainerID startContainerID = 2;
+ optional LifeCycleState state = 3;
+ optional string traceID = 4;
+}
+
+message SCMListContainerIDsResponseProto {
+ repeated ContainerID containerIDs = 1;
+}
+
message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 36753202ec9..3c5706cc0fb 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -55,16 +55,33 @@ default List<ContainerInfo> getContainers() {
return getContainers(ContainerID.valueOf(0), Integer.MAX_VALUE);
}
+ /**
+ * Returns container IDs under certain conditions.
+ * Search container IDs from start ID(inclusive),
+ * The max size of the searching range cannot exceed the
+ * value of count.
+ *
+ * @param startID start containerID, >=0,
+ * start searching at the head if 0.
+ * @param count count must be >= 0
+ * Usually the count will be replaced with a very big
+ * value instead of being unlimited in case the db is very big.
+ * @param state container state
+ *
+ * @return a list of container IDs.
+ */
+ List<ContainerID> getContainerIDs(ContainerID startID, int count,
LifeCycleState state);
+
/**
* Returns containers under certain conditions.
- * Search container IDs from start ID(exclusive),
+ * Search container IDs from start ID(inclusive),
* The max size of the searching range cannot exceed the
* value of count.
*
* @param startID start containerID, >=0,
* start searching at the head if 0.
* @param count count must be >= 0
- * Usually the count will be replace with a very big
+ * Usually the count will be replaced with a very big
* value instead of being unlimited in case the db is very big.
*
* @return a list of container.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index f77bf86cec1..432c9890e98 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -141,6 +141,14 @@ public List<ContainerInfo> getContainers(ReplicationType
type) {
return containerStateManager.getContainerInfos(type);
}
+ @Override
+ public List<ContainerID> getContainerIDs(final ContainerID startID,
+ final int count,
+ final LifeCycleState state) {
+ scmContainerManagerMetrics.incNumListContainersOps();
+ return containerStateManager.getContainerIDs(state, startID, count);
+ }
+
@Override
public List<ContainerInfo> getContainers(final ContainerID startID,
final int count) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 3809db1cd33..0d66027480d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -103,6 +103,15 @@ public interface ContainerStateManager {
*/
boolean contains(ContainerID containerID);
+ /**
+ * Get {@link ContainerID}s for the given state.
+ *
+ * @param start the start {@link ContainerID} (inclusive)
+ * @param count the size limit
+ * @return a list of {@link ContainerID};
+ */
+ List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID start,
int count);
+
/**
* Get {@link ContainerInfo}s.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index dc5afd43a20..5c95aff5c19 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -266,6 +266,13 @@ private void initialize() throws IOException {
return actions;
}
+ @Override
+ public List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID
start, int count) {
+ try (AutoCloseableLock ignored = readLock()) {
+ return containers.getContainerIDs(state, start, count);
+ }
+ }
+
@Override
public List<ContainerInfo> getContainerInfos(ContainerID start, int count) {
try (AutoCloseableLock ignored = readLock()) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index b1ff5f4ae48..4dd93aef747 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -260,6 +260,20 @@ public void updateState(ContainerID containerID,
LifeCycleState currentState,
currentInfo.setState(newState);
}
+ /**
+ *
+ * @param state the state of the containers
+ * @param start the start id
+ * @param count the maximum size of the returned list
+ * @return a list of sorted {@link ContainerID}s
+ */
+ public List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID
start, int count) {
+ Preconditions.assertTrue(count >= 0, "count < 0");
+ return lifeCycleStateMap.tailMap(state, start).keySet().stream()
+ .limit(count)
+ .collect(Collectors.toList());
+ }
+
public List<ContainerInfo> getContainerInfos(ContainerID start, int count) {
return containerMap.getInfos(start, count);
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 2765cabec03..dd18ad68d13 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -117,6 +117,8 @@
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto;
@@ -752,6 +754,12 @@ public ScmContainerLocationResponse processRequest(
.setStatus(Status.OK)
.setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest()))
.build();
+ case ListContainerIDs:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+
.setScmListContainerIDsResponse(listContainerIDs(request.getScmListContainerIDsRequest()))
+ .build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
@@ -1401,4 +1409,29 @@ public ReconcileContainerResponseProto
reconcileContainer(ReconcileContainerRequ
return ReconcileContainerResponseProto.getDefaultInstance();
}
+ public SCMListContainerIDsResponseProto listContainerIDs(
+ SCMListContainerIDsRequestProto request) throws IOException {
+ ContainerID startContainerID = ContainerID.valueOf(0);
+
+ if (request.hasStartContainerID()) {
+ startContainerID =
ContainerID.valueOf(request.getStartContainerID().getId());
+ }
+
+ HddsProtos.LifeCycleState state = null;
+ if (request.hasState()) {
+ state = request.getState();
+ }
+
+ SCMListContainerIDsResponseProto.Builder builder =
+ SCMListContainerIDsResponseProto.newBuilder();
+
+ List<ContainerID> containerIDs = impl.getListOfContainerIDs(
+ startContainerID, request.getCount(), state);
+
+ containerIDs.stream()
+ .map(ContainerID::getProtobuf)
+ .forEach(builder::addContainerIDs);
+
+ return builder.build();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index f500dc82330..b161e0e84d7 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1502,7 +1502,7 @@ public long getContainerCount(HddsProtos.LifeCycleState
state)
auditMap.put("state", String.valueOf(state));
try {
- long count = scm.getContainerManager().getContainers(state).size();
+ long count = scm.getContainerManager().getContainerStateCount(state);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.GET_CONTAINER_COUNT, auditMap));
return count;
@@ -1514,8 +1514,8 @@ public long getContainerCount(HddsProtos.LifeCycleState
state)
}
@Override
- public List<ContainerInfo> getListOfContainers(
- long startContainerID, int count, HddsProtos.LifeCycleState state)
+ public List<ContainerID> getListOfContainerIDs(
+ ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException {
final Map<String, String> auditMap = Maps.newHashMap();
@@ -1523,14 +1523,14 @@ public List<ContainerInfo> getListOfContainers(
auditMap.put("count", String.valueOf(count));
auditMap.put("state", String.valueOf(state));
try {
- List<ContainerInfo> results = scm.getContainerManager().getContainers(
- ContainerID.valueOf(startContainerID), count, state);
+ List<ContainerID> results = scm.getContainerManager().getContainerIDs(
+ startContainerID, count, state);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
- SCMAction.LIST_CONTAINER, auditMap));
+ SCMAction.LIST_CONTAINER_IDS, auditMap));
return results;
} catch (Exception ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
- SCMAction.LIST_CONTAINER, auditMap, ex));
+ SCMAction.LIST_CONTAINER_IDS, auditMap, ex));
throw ex;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 52cd943c4db..b7acb40d7ac 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -68,7 +68,8 @@ public enum SCMAction implements AuditAction {
QUERY_NODE,
GET_PIPELINE,
RECONCILE_CONTAINER,
- GET_DELETED_BLOCK_SUMMARY;
+ GET_DELETED_BLOCK_SUMMARY,
+ LIST_CONTAINER_IDS;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 5c3035f28fc..182a589382a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -371,6 +371,37 @@ private void verifyContainerState(ContainerID containerId,
assertEquals(expectedState,
containerManager.getContainer(containerId).getState());
}
+ @Test
+ public void testGetContainerIDs() throws IOException {
+ ContainerInfo openContainerInfo = new ContainerInfo.Builder()
+ .setContainerID(1)
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setSequenceId(100L)
+ .setOwner("scm")
+ .setPipelineID(PipelineID.randomId())
+ .setReplicationConfig(
+ RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE))
+ .build();
+
+ ContainerInfo closedContainerInfo = new ContainerInfo.Builder()
+ .setContainerID(2)
+ .setState(HddsProtos.LifeCycleState.CLOSED)
+ .setSequenceId(200L)
+ .setOwner("scm")
+ .setPipelineID(PipelineID.randomId())
+ .setReplicationConfig(
+ RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE))
+ .build();
+
+ containerStateManager.addContainer(openContainerInfo.getProtobuf());
+ containerStateManager.addContainer(closedContainerInfo.getProtobuf());
+
+ assertEquals(1, containerStateManager.getContainerIDs(
+ HddsProtos.LifeCycleState.CLOSED, ContainerID.MIN, 10).size());
+ }
+
@Test
public void testSequenceIdOnStateUpdate() throws Exception {
ContainerID containerID = ContainerID.valueOf(3L);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java
new file mode 100644
index 00000000000..c38c3c211bd
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerStateMap.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.states;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.junit.jupiter.api.Test;
+
+class TestContainerStateMap {
+
+ @Test
+ void testGetContainerIDs() {
+ ContainerStateMap map = new ContainerStateMap();
+
+ List<ContainerInfo> containerInfos = containerInfos();
+
+ // initialize map
+ containerInfos.forEach(map::addContainer);
+
+ assertEquals(4, map.getContainerIDs(OPEN, ContainerID.MIN,
containerInfos.size()).size());
+ assertEquals(4, map.getContainerIDs(CLOSED, ContainerID.MIN,
containerInfos.size()).size());
+
+ // verify pagination
+ assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.MIN, 3).size());
+ assertEquals(3, map.getContainerIDs(CLOSED, ContainerID.valueOf(7),
3).size());
+ }
+
+ private List<ContainerInfo> containerInfos() {
+ return Arrays.asList(
+ buildContainerInfo(1, OPEN),
+ buildContainerInfo(2, CLOSED),
+ buildContainerInfo(3, QUASI_CLOSED),
+ buildContainerInfo(4, DELETED),
+ buildContainerInfo(5, OPEN),
+ buildContainerInfo(6, OPEN),
+ buildContainerInfo(7, CLOSED),
+ buildContainerInfo(8, CLOSED),
+ buildContainerInfo(9, CLOSED),
+ buildContainerInfo(10, OPEN)
+ );
+ }
+
+ private ContainerInfo buildContainerInfo(long containerID,
HddsProtos.LifeCycleState state) {
+ return new ContainerInfo.Builder()
+ .setContainerID(containerID)
+ .setState(state)
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(THREE))
+ .build();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
index 9402218014c..7d2f399d1fa 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java
@@ -17,10 +17,12 @@
package org.apache.hadoop.hdds.scm.server;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +35,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
@@ -130,12 +133,27 @@ public void testScmListContainer() throws Exception {
SCMClientProtocolServer scmServer =
new SCMClientProtocolServer(new OzoneConfiguration(),
mockStorageContainerManager(), mock(ReconfigurationHandler.class));
+ try {
+ assertEquals(10, scmServer.listContainer(1, 10,
+ null, HddsProtos.ReplicationType.RATIS,
null).getContainerInfoList().size());
+ // Test call from a legacy client, which uses a different method of
listContainer
+ assertEquals(10, scmServer.listContainer(1, 10, null,
+ HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size());
+ } finally {
+ scmServer.stop();
+ }
+ }
- assertEquals(10, scmServer.listContainer(1, 10,
- null, HddsProtos.ReplicationType.RATIS,
null).getContainerInfoList().size());
- // Test call from a legacy client, which uses a different method of
listContainer
- assertEquals(10, scmServer.listContainer(1, 10, null,
- HddsProtos.ReplicationFactor.THREE).getContainerInfoList().size());
+ @Test
+ public void testScmGetContainerCount() throws IOException {
+ SCMClientProtocolServer scmServer =
+ new SCMClientProtocolServer(new OzoneConfiguration(),
+ mockStorageContainerManager(), mock(ReconfigurationHandler.class));
+ try {
+ assertEquals(10, scmServer.getContainerCount(CLOSED));
+ } finally {
+ scmServer.stop();
+ }
}
private StorageContainerManager mockStorageContainerManager() {
@@ -145,11 +163,12 @@ private StorageContainerManager
mockStorageContainerManager() {
}
ContainerManagerImpl containerManager = mock(ContainerManagerImpl.class);
when(containerManager.getContainers()).thenReturn(infos);
+
when(containerManager.getContainerStateCount(any(LifeCycleState.class))).thenReturn(infos.size());
StorageContainerManager storageContainerManager =
mock(StorageContainerManager.class);
when(storageContainerManager.getContainerManager()).thenReturn(containerManager);
SCMNodeDetails scmNodeDetails = mock(SCMNodeDetails.class);
- when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new
InetSocketAddress("localhost", 9876));
+ when(scmNodeDetails.getClientProtocolServerAddress()).thenReturn(new
InetSocketAddress("localhost", 0));
when(scmNodeDetails.getClientProtocolServerAddressKey()).thenReturn("test");
when(storageContainerManager.getScmNodeDetails()).thenReturn(scmNodeDetails);
return storageContainerManager;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 98ccd8fac8b..be9f1ac00e9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -113,6 +113,7 @@ private void addPropertiesNotInXml() {
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DELAY,
ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
+ ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE,
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
OMConfigKeys.OZONE_OM_HA_PREFIX,
OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index a57095d2e4c..b4da42d8f03 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -221,6 +221,38 @@ public final class ReconServerConfigKeys {
"ozone.recon.dn.metrics.collection.timeout";
public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT
= "10m";
+ /**
+ * Application-level ceiling on the number of ContainerIDs fetched from SCM
+ * per RPC call during container sync. The effective batch size is
+ * {@code min(this value, ipc.maximum.data.length / 12,
totalContainerCount)},
+ * so raising this above the default is only meaningful if
+ * {@code ipc.maximum.data.length} has also been raised from its default.
+ *
+ * <p><b>Recon wire cost</b>: each ContainerID is ~12 bytes on the wire, so
+ * the default 1,000,000 produces ~12 MB per RPC.
+ *
+ * <p><b>Recon JVM heap</b>: each deserialized {@code ContainerID} object
+ * occupies ~32 bytes, so the default batch requires ~32 MB of heap on Recon.
+ * Reduce this value on memory-constrained Recon nodes.
+ *
+ * <p><b>SCM-side pressure</b>: on each RPC call SCM holds its container
+ * state read lock (a fair {@link
java.util.concurrent.locks.ReentrantReadWriteLock})
+ * for the full duration of streaming N entries from its in-memory
+ * {@link java.util.TreeMap} and collecting them into a response list.
+ * Because the lock is fair, any concurrent write (container allocation,
+ * state transition) queuing for the write lock will be blocked for the
+ * entire batch duration — and new reads queue behind that waiting writer.
+ * Larger batches therefore increase worst-case container-allocation latency
+ * on SCM during sync. On write-heavy SCM nodes, prefer smaller batches with
+ * more calls over fewer large batches.
+ *
+ * <p>Default: 1,000,000 (~12 MB wire, ~32 MB JVM heap per batch on Recon;
+ * 4 calls for a 4 M-container cluster)
+ */
+ public static final String OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE =
+ "ozone.recon.scm.container.id.batch.size";
+ public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT =
1_000_000;
+
/**
* Private constructor for utility class.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index bc6d4943ecd..278bac0011d 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -62,18 +62,15 @@
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
@@ -140,9 +137,6 @@ public class ReconStorageContainerManagerFacade
private static final Logger LOG = LoggerFactory
.getLogger(ReconStorageContainerManagerFacade.class);
- public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L;
- private static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length";
- private static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024;
private final OzoneConfiguration ozoneConfiguration;
private final ReconDatanodeProtocolServer datanodeProtocolServer;
@@ -173,6 +167,7 @@ public class ReconStorageContainerManagerFacade
private AtomicBoolean isSyncDataFromSCMRunning;
private final String threadNamePrefix;
+ private final ReconStorageContainerSyncHelper containerSyncHelper;
// To Do :- Refactor the constructor in a separate JIRA
@Inject
@@ -385,6 +380,12 @@ public
ReconStorageContainerManagerFacade(OzoneConfiguration conf,
reconSafeModeMgrTask = new ReconSafeModeMgrTask(
containerManager, nodeManager, safeModeManager,
reconTaskConfig, ozoneConfiguration);
+
+ containerSyncHelper = new ReconStorageContainerSyncHelper(
+ scmServiceProvider,
+ ozoneConfiguration,
+ containerManager
+ );
}
/**
@@ -566,73 +567,13 @@ public void updateReconSCMDBWithNewSnapshot() throws
IOException {
}
}
- public boolean syncWithSCMContainerInfo()
- throws IOException {
+ public boolean syncWithSCMContainerInfo() {
if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
- try {
- List<ContainerInfo> containers = containerManager.getContainers();
-
- long totalContainerCount = scmServiceProvider.getContainerCount(
- HddsProtos.LifeCycleState.CLOSED);
- long containerCountPerCall =
- getContainerCountPerCall(totalContainerCount);
- long startContainerId = 1;
- long retrievedContainerCount = 0;
- if (totalContainerCount > 0) {
- while (retrievedContainerCount < totalContainerCount) {
- List<ContainerInfo> listOfContainers = scmServiceProvider.
- getListOfContainers(startContainerId,
- Long.valueOf(containerCountPerCall).intValue(),
- HddsProtos.LifeCycleState.CLOSED);
- if (null != listOfContainers && !listOfContainers.isEmpty()) {
- LOG.info("Got list of containers from SCM : " +
- listOfContainers.size());
- listOfContainers.forEach(containerInfo -> {
- long containerID = containerInfo.getContainerID();
- boolean isContainerPresentAtRecon =
- containers.contains(containerInfo);
- if (!isContainerPresentAtRecon) {
- try {
- ContainerWithPipeline containerWithPipeline =
- scmServiceProvider.getContainerWithPipeline(
- containerID);
- containerManager.addNewContainer(containerWithPipeline);
- } catch (IOException e) {
- LOG.error("Could not get container with pipeline " +
- "for container : {}", containerID);
- }
- }
- });
- startContainerId = listOfContainers.get(
- listOfContainers.size() - 1).getContainerID() + 1;
- } else {
- LOG.info("No containers found at SCM in CLOSED state");
- return false;
- }
- retrievedContainerCount += containerCountPerCall;
- }
- }
- } catch (IOException e) {
- LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
- return false;
- }
+ return containerSyncHelper.syncWithSCMContainerInfo();
} else {
LOG.debug("SCM DB sync is already running.");
return false;
}
- return true;
- }
-
- private long getContainerCountPerCall(long totalContainerCount) {
- // Assumption of size of 1 container info object here is 1 MB
- long containersMetaDataTotalRpcRespSizeMB =
- CONTAINER_METADATA_SIZE * totalContainerCount;
- long hadoopRPCSize = ozoneConfiguration.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
- long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <=
- hadoopRPCSize ? totalContainerCount :
- Math.round(Math.floor(
- hadoopRPCSize / (double) CONTAINER_METADATA_SIZE));
- return containerCountPerCall;
}
private void deleteOldSCMDB() throws IOException {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java
new file mode 100644
index 00000000000..c8d940aa835
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerSyncHelper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReconStorageContainerSyncHelper {
+
+ // Serialized size of one ContainerID proto on the wire (varint tag + 8-byte
long = ~12 bytes).
+ // Used to derive the maximum batch size that fits within
ipc.maximum.data.length.
+ private static final long CONTAINER_ID_PROTO_SIZE_BYTES = 12;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ReconStorageContainerSyncHelper.class);
+
+ private final StorageContainerServiceProvider scmServiceProvider;
+ private final OzoneConfiguration ozoneConfiguration;
+ private final ReconContainerManager containerManager;
+
+ ReconStorageContainerSyncHelper(StorageContainerServiceProvider
scmServiceProvider,
+ OzoneConfiguration ozoneConfiguration,
+ ReconContainerManager containerManager) {
+ this.scmServiceProvider = scmServiceProvider;
+ this.ozoneConfiguration = ozoneConfiguration;
+ this.containerManager = containerManager;
+ }
+
+ public boolean syncWithSCMContainerInfo() {
+ try {
+ long totalContainerCount = scmServiceProvider.getContainerCount(
+ HddsProtos.LifeCycleState.CLOSED);
+ long containerCountPerCall =
+ getContainerCountPerCall(totalContainerCount);
+ ContainerID startContainerId = ContainerID.valueOf(1);
+ long retrievedContainerCount = 0;
+ if (totalContainerCount > 0) {
+ while (retrievedContainerCount < totalContainerCount) {
+ List<ContainerID> listOfContainers = scmServiceProvider.
+ getListOfContainerIDs(startContainerId,
+ Long.valueOf(containerCountPerCall).intValue(),
+ HddsProtos.LifeCycleState.CLOSED);
+ if (null != listOfContainers && !listOfContainers.isEmpty()) {
+ LOG.info("Got list of containers from SCM : {}",
listOfContainers.size());
+ listOfContainers.forEach(containerID -> {
+ boolean isContainerPresentAtRecon =
containerManager.containerExist(containerID);
+ if (!isContainerPresentAtRecon) {
+ try {
+ ContainerWithPipeline containerWithPipeline =
+ scmServiceProvider.getContainerWithPipeline(
+ containerID.getId());
+ containerManager.addNewContainer(containerWithPipeline);
+ } catch (IOException e) {
+ LOG.error("Could not get container with pipeline " +
+ "for container : {}", containerID);
+ }
+ }
+ });
+ long lastID = listOfContainers.get(listOfContainers.size() -
1).getId();
+ startContainerId = ContainerID.valueOf(lastID + 1);
+ } else {
+ LOG.info("No containers found at SCM in CLOSED state");
+ return false;
+ }
+ retrievedContainerCount += containerCountPerCall;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ return false;
+ }
+ return true;
+ }
+
+ private long getContainerCountPerCall(long totalContainerCount) {
+ long hadoopRPCSize = ozoneConfiguration.getInt(
+ IPC_MAXIMUM_DATA_LENGTH, IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+ long countByRpcLimit = hadoopRPCSize / CONTAINER_ID_PROTO_SIZE_BYTES;
+ long countByBatchLimit = ozoneConfiguration.getLong(
+ OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE,
+ OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT);
+
+ long batchSize = Math.min(countByRpcLimit, countByBatchLimit);
+ return Math.min(totalContainerCount, batchSize);
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
index 412bd302766..9e73c30edb8 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -80,15 +80,15 @@ List<ContainerWithPipeline>
getExistContainerWithPipelinesInBatch(
DBCheckpoint getSCMDBSnapshot();
/**
- * Get the list of containers from SCM. This is a RPC call.
+ * Get the list of container IDs from SCM. This is an RPC call.
*
* @param startContainerID the start container id
* @param count the number of containers to return
* @param state the containers in given state to be returned
- * @return the list of containers from SCM in a given state
+ * @return the list of container IDs from SCM in a given state
* @throws IOException
*/
- List<ContainerInfo> getListOfContainers(long startContainerID,
+ List<ContainerID> getListOfContainerIDs(ContainerID startContainerID,
int count,
HddsProtos.LifeCycleState state)
throws IOException;
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
index edd1c1f702b..6d4e3104234 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
@@ -35,7 +35,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient;
import org.apache.hadoop.hdds.scm.ha.SCMSnapshotDownloader;
@@ -185,10 +185,9 @@ private RocksDBCheckpoint getRocksDBCheckpoint(String
snapshotFileName, File tar
}
@Override
- public List<ContainerInfo> getListOfContainers(
- long startContainerID, int count, HddsProtos.LifeCycleState state)
+ public List<ContainerID> getListOfContainerIDs(
+ ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException {
- return scmClient.getListOfContainers(startContainerID, count, state);
+ return scmClient.getListOfContainerIDs(startContainerID, count, state);
}
-
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java
new file mode 100644
index 00000000000..9ba0d85a931
--- /dev/null
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconStorageContainerSyncHelper.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+import org.junit.jupiter.api.Test;
+
+class TestReconStorageContainerSyncHelper {
+
+ private final StorageContainerServiceProvider mockScmServiceProvider =
+ mock(StorageContainerServiceProvider.class);
+
+ private final ReconContainerManager mockContainerManager =
+ mock(ReconContainerManager.class);
+
+ private final ReconStorageContainerSyncHelper syncHelper;
+
+ TestReconStorageContainerSyncHelper() {
+ syncHelper = new ReconStorageContainerSyncHelper(
+ mockScmServiceProvider,
+ new OzoneConfiguration(),
+ mockContainerManager
+ );
+ }
+
+ @Test
+ void testContainerMissingFromReconIsAdded() throws Exception {
+ ContainerID cid = ContainerID.valueOf(42L);
+ ContainerInfo info = new ContainerInfo.Builder()
+ .setContainerID(42L)
+ .setState(CLOSED)
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+ .setOwner("test")
+ .build();
+ ContainerWithPipeline cwp = new ContainerWithPipeline(info, null);
+
+ when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L);
+ when(mockScmServiceProvider.getListOfContainerIDs(
+ eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED)))
+ .thenReturn(Collections.singletonList(cid));
+ when(mockContainerManager.containerExist(cid)).thenReturn(false);
+ when(mockScmServiceProvider.getContainerWithPipeline(42L)).thenReturn(cwp);
+
+ boolean result = syncHelper.syncWithSCMContainerInfo();
+
+ assertTrue(result);
+ verify(mockScmServiceProvider).getContainerWithPipeline(42L);
+ verify(mockContainerManager).addNewContainer(cwp);
+ }
+
+ @Test
+ void testContainerMissingFromReconIsAddedWhenMultiplePages() throws
Exception {
+ // Force containerCountPerCall = 2 via the batch size config
+ OzoneConfiguration pagedConf = new OzoneConfiguration();
+ pagedConf.setLong(OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE, 2L);
+ ReconStorageContainerSyncHelper pagedHelper = new
ReconStorageContainerSyncHelper(
+ mockScmServiceProvider, pagedConf, mockContainerManager);
+
+ // Page 1: containers 1 and 2 (both missing from Recon)
+ ContainerID cid1 = ContainerID.valueOf(1L);
+ ContainerID cid2 = ContainerID.valueOf(2L);
+ ContainerInfo info1 = new ContainerInfo.Builder()
+ .setContainerID(1L).setState(CLOSED)
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+ .setOwner("test").build();
+ ContainerInfo info2 = new ContainerInfo.Builder()
+ .setContainerID(2L).setState(CLOSED)
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+ .setOwner("test").build();
+ ContainerWithPipeline cwp1 = new ContainerWithPipeline(info1, null);
+ ContainerWithPipeline cwp2 = new ContainerWithPipeline(info2, null);
+
+ // Page 2: container 3 (already in Recon)
+ ContainerID cid3 = ContainerID.valueOf(3L);
+
+ when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(3L);
+ when(mockScmServiceProvider.getListOfContainerIDs(
+ eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED)))
+ .thenReturn(Arrays.asList(cid1, cid2));
+ when(mockScmServiceProvider.getListOfContainerIDs(
+ eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED)))
+ .thenReturn(Collections.singletonList(cid3));
+
+ when(mockContainerManager.containerExist(cid1)).thenReturn(false);
+ when(mockContainerManager.containerExist(cid2)).thenReturn(false);
+ when(mockContainerManager.containerExist(cid3)).thenReturn(true);
+ when(mockScmServiceProvider.getContainerWithPipeline(1L)).thenReturn(cwp1);
+ when(mockScmServiceProvider.getContainerWithPipeline(2L)).thenReturn(cwp2);
+
+ boolean result = pagedHelper.syncWithSCMContainerInfo();
+
+ assertTrue(result);
+ // Page 1: both missing containers were added
+ verify(mockContainerManager).addNewContainer(cwp1);
+ verify(mockContainerManager).addNewContainer(cwp2);
+ // Page 2: present container was skipped
+ verify(mockContainerManager, never()).addNewContainer(
+ argThat(cwp -> cwp.getContainerInfo().getContainerID() == 3L));
+ // Both pages were fetched
+ verify(mockScmServiceProvider).getListOfContainerIDs(
+ eq(ContainerID.valueOf(1L)), eq(2), eq(CLOSED));
+ verify(mockScmServiceProvider).getListOfContainerIDs(
+ eq(ContainerID.valueOf(3L)), eq(2), eq(CLOSED));
+ }
+
+ @Test
+ void testContainerAlreadyInReconIsSkipped() throws Exception {
+ ContainerID cid = ContainerID.valueOf(7L);
+
+ when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L);
+ when(mockScmServiceProvider.getListOfContainerIDs(
+ eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED)))
+ .thenReturn(Collections.singletonList(cid));
+ when(mockContainerManager.containerExist(cid)).thenReturn(true);
+
+ boolean result = syncHelper.syncWithSCMContainerInfo();
+
+ assertTrue(result);
+ verify(mockScmServiceProvider,
never()).getContainerWithPipeline(anyLong());
+ verify(mockContainerManager, never()).addNewContainer(any());
+ }
+
+ @Test
+ void testZeroClosedContainersReturnsTrue() throws Exception {
+ when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(0L);
+
+ boolean result = syncHelper.syncWithSCMContainerInfo();
+
+ assertTrue(result);
+ verifyNoInteractions(mockContainerManager);
+ verify(mockScmServiceProvider, never())
+ .getListOfContainerIDs(any(), any(Integer.class), any());
+ }
+
+ @Test
+ void testEmptyListFromSCMReturnsFalse() throws Exception {
+ when(mockScmServiceProvider.getContainerCount(CLOSED)).thenReturn(1L);
+ when(mockScmServiceProvider.getListOfContainerIDs(
+ eq(ContainerID.valueOf(1L)), eq(1), eq(CLOSED)))
+ .thenReturn(Collections.emptyList());
+
+ boolean result = syncHelper.syncWithSCMContainerInfo();
+
+ assertFalse(result);
+ verifyNoInteractions(mockContainerManager);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]