This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 537f044a7b HDDS-8585. Recon - API for Number of open keys and amount
of data mapped to such keys. (#4764)
537f044a7b is described below
commit 537f044a7b1f90fed77d063370024ee2296fec2c
Author: Arafat2198 <[email protected]>
AuthorDate: Fri Jun 16 18:57:31 2023 +0530
HDDS-8585. Recon - API for Number of open keys and amount of data mapped to
such keys. (#4764)
---
.../apache/hadoop/ozone/recon/ReconConstants.java | 4 +
.../hadoop/ozone/recon/ReconControllerModule.java | 4 +-
.../ozone/recon/api/ClusterStateEndpoint.java | 14 +-
.../ozone/recon/api/OMDBInsightEndpoint.java | 211 ++++++++----
.../recon/api/types/KeyInsightInfoResponse.java | 43 ++-
.../ozone/recon/tasks/OmTableInsightTask.java | 377 +++++++++++++++++++++
.../hadoop/ozone/recon/tasks/TableCountTask.java | 213 ------------
.../ozone/recon/OMMetadataManagerTestUtils.java | 68 ++++
.../hadoop/ozone/recon/api/TestEndpoints.java | 8 +-
.../ozone/recon/api/TestOmDBInsightEndPoint.java | 138 +++++++-
.../ozone/recon/tasks/TestOmTableInsightTask.java | 308 +++++++++++++++++
.../ozone/recon/tasks/TestTableCountTask.java | 173 ----------
12 files changed, 1079 insertions(+), 482 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index c2c76bab97..c27964e373 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -36,10 +36,14 @@ public final class ReconConstants {
public static final String RECON_SCM_SNAPSHOT_DB = "scm.snapshot.db";
// By default, limit the number of results returned
+ public static final String DEFAULT_OPEN_KEY_INCLUDE_NON_FSO = "false";
+ public static final String DEFAULT_OPEN_KEY_INCLUDE_FSO = "false";
public static final String DEFAULT_FETCH_COUNT = "1000";
public static final String DEFAULT_BATCH_NUMBER = "1";
public static final String RECON_QUERY_BATCH_PARAM = "batchNum";
public static final String RECON_QUERY_PREVKEY = "prevKey";
+ public static final String RECON_OPEN_KEY_INCLUDE_NON_FSO = "includeNonFso";
+ public static final String RECON_OPEN_KEY_INCLUDE_FSO = "includeFso";
public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
public static final String PREV_DELETED_BLOCKS_TRANSACTION_ID_DEFAULT_VALUE =
"0";
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 1283e26de2..bb7ba4954d 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
-import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.ClientId;
import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig;
@@ -128,7 +128,7 @@ public class ReconControllerModule extends AbstractModule {
Multibinder.newSetBinder(binder(), ReconOmTask.class);
taskBinder.addBinding().to(ContainerKeyMapperTask.class);
taskBinder.addBinding().to(FileSizeCountTask.class);
- taskBinder.addBinding().to(TableCountTask.class);
+ taskBinder.addBinding().to(OmTableInsightTask.class);
taskBinder.addBinding().to(NSSummaryTask.class);
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 752c720950..bc87c402eb 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -30,7 +30,7 @@ import
org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
import org.apache.hadoop.ozone.recon.scm.ReconPipelineManager;
-import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
@@ -124,21 +124,21 @@ public class ClusterStateEndpoint {
ClusterStateResponse.Builder builder = ClusterStateResponse.newBuilder();
GlobalStats volumeRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(VOLUME_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(VOLUME_TABLE));
GlobalStats bucketRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(BUCKET_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(BUCKET_TABLE));
// Keys from OBJECT_STORE buckets.
GlobalStats keyRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(KEY_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE));
// Keys from FILE_SYSTEM_OPTIMIZED buckets
GlobalStats fileRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(FILE_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(FILE_TABLE));
// Keys from the DeletedTable
GlobalStats deletedKeyRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(DELETED_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(DELETED_TABLE));
// Directories from the DeletedDirectoryTable
GlobalStats deletedDirRecord = globalStatsDao.findById(
- TableCountTask.getRowKeyFromTable(DELETED_DIR_TABLE));
+ OmTableInsightTask.getTableCountKeyFromTable(DELETED_DIR_TABLE));
if (volumeRecord != null) {
builder.setVolumes(volumeRecord.getValue());
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index 51fba63b73..b662dd7c17 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.api;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -30,6 +31,11 @@ import
org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
@@ -42,11 +48,20 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
import static org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_FETCH_COUNT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_LIMIT;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
+import static
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_FSO;
+import static
org.apache.hadoop.ozone.recon.ReconConstants.DEFAULT_OPEN_KEY_INCLUDE_NON_FSO;
+import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_FSO;
+import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_OPEN_KEY_INCLUDE_NON_FSO;
+
/**
* Endpoint to get following key level info under OM DB Insight page of Recon.
@@ -69,84 +84,112 @@ public class OMDBInsightEndpoint {
private ReconContainerMetadataManager reconContainerMetadataManager;
private final ReconOMMetadataManager omMetadataManager;
private final ReconContainerManager containerManager;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OMDBInsightEndpoint.class);
+ private final GlobalStatsDao globalStatsDao;
@Inject
public OMDBInsightEndpoint(OzoneStorageContainerManager reconSCM,
- ReconOMMetadataManager omMetadataManager) {
+ ReconOMMetadataManager omMetadataManager,
+ GlobalStatsDao globalStatsDao) {
this.containerManager =
(ReconContainerManager) reconSCM.getContainerManager();
this.omMetadataManager = omMetadataManager;
+ this.globalStatsDao = globalStatsDao;
}
/**
* This method retrieves set of keys/files which are open.
*
* @return the http json response wrapped in below format:
+ *
* {
- * replicatedTotal: 13824,
- * unreplicatedTotal: 4608,
- * entities: [
+ * "keysSummary": {
+ * "totalUnreplicatedDataSize": 2147483648,
+ * "totalReplicatedDataSize": 2147483648,
+ * "totalOpenKeys": 8
+ * },
+ * "lastKey": "/-4611686018427388160/-9223372036854775552/-922777620354",
+ * "replicatedTotal": 2147483648,
+ * "unreplicatedTotal": 2147483648,
+ * "fso": [
* {
- * path: “/vol1/bucket1/key1”,
- * keyState: “Open”,
- * inStateSince: 1667564193026,
- * size: 1024,
- * replicatedSize: 3072,
- * unreplicatedSize: 1024,
- * replicationType: RATIS,
- * replicationFactor: THREE
- * }.
- * {
- * path: “/vol1/bucket1/key2”,
- * keyState: “Open”,
- * inStateSince: 1667564193026,
- * size: 512,
- * replicatedSize: 1536,
- * unreplicatedSize: 512,
- * replicationType: RATIS,
- * replicationFactor: THREE
- * }.
+ * "key": "/-4611686018427388160/-9223372036/-922337203977722380527",
+ * "path": "239",
+ * "inStateSince": 1686156886632,
+ * "size": 268435456,
+ * "replicatedSize": 268435456,
+ * "replicationInfo": {
+ * "replicationFactor": "ONE",
+ * "requiredNodes": 1,
+ * "replicationType": "RATIS"
+ * }
+ * },
* {
- * path: “/vol1/fso-bucket/dir1/file1”,
- * keyState: “Open”,
- * inStateSince: 1667564193026,
- * size: 1024,
- * replicatedSize: 3072,
- * unreplicatedSize: 1024,
- * replicationType: RATIS,
- * replicationFactor: THREE
- * }.
+ * "key": "/-4611686018427388160/-9223372036854775552/0397777586240",
+ * "path": "244",
+ * "inStateSince": 1686156887186,
+ * "size": 268435456,
+ * "replicatedSize": 268435456,
+ * "replicationInfo": {
+ * "replicationFactor": "ONE",
+ * "requiredNodes": 1,
+ * "replicationType": "RATIS"
+ * }
+ * }
+ * ],
+ * "nonFSO": [
* {
- * path: “/vol1/fso-bucket/dir1/dir2/file2”,
- * keyState: “Open”,
- * inStateSince: 1667564193026,
- * size: 2048,
- * replicatedSize: 6144,
- * unreplicatedSize: 2048,
- * replicationType: RATIS,
- * replicationFactor: THREE
+ * "key": "/vol1/bucket1/object1",
+ * "path": "239",
+ * "inStateSince": 1686156886632,
+ * "size": 268435456,
+ * "replicatedSize": 268435456,
+ * "replicationInfo": {
+ * "replicationFactor": "ONE",
+ * "requiredNodes": 1,
+ * "replicationType": "RATIS"
+ * }
* }
- * ]
+ * ],
+ * "status": "OK"
* }
*/
+
@GET
@Path("/open")
public Response getOpenKeyInfo(
@DefaultValue(DEFAULT_FETCH_COUNT) @QueryParam(RECON_QUERY_LIMIT)
- int limit,
+ int limit,
@DefaultValue(StringUtils.EMPTY) @QueryParam(RECON_QUERY_PREVKEY)
- String prevKey) {
+ String prevKey,
+ @DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_FSO)
+ @QueryParam(RECON_OPEN_KEY_INCLUDE_FSO)
+ boolean includeFso,
+ @DefaultValue(DEFAULT_OPEN_KEY_INCLUDE_NON_FSO)
+ @QueryParam(RECON_OPEN_KEY_INCLUDE_NON_FSO)
+ boolean includeNonFso) {
KeyInsightInfoResponse openKeyInsightInfo = new KeyInsightInfoResponse();
List<KeyEntityInfo> nonFSOKeyInfoList =
openKeyInsightInfo.getNonFSOKeyInfoList();
+
+ // Create a HashMap for the keysSummary
+ Map<String, Long> keysSummary = new HashMap<>();
boolean skipPrevKeyDone = false;
boolean isLegacyBucketLayout = true;
boolean recordsFetchedLimitReached = false;
+
String lastKey = "";
List<KeyEntityInfo> fsoKeyInfoList =
openKeyInsightInfo.getFsoKeyInfoList();
- for (BucketLayout layout : Arrays.asList(BucketLayout.LEGACY,
- BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
+ for (BucketLayout layout : Arrays.asList(
+ BucketLayout.LEGACY, BucketLayout.FILE_SYSTEM_OPTIMIZED)) {
isLegacyBucketLayout = (layout == BucketLayout.LEGACY);
+ // Skip bucket iteration based on parameters includeFso and includeNonFso
+ if ((!includeFso && !isLegacyBucketLayout) ||
+ (!includeNonFso && isLegacyBucketLayout)) {
+ continue;
+ }
+
Table<String, OmKeyInfo> openKeyTable =
omMetadataManager.getOpenKeyTable(layout);
try (
@@ -184,11 +227,11 @@ public class OMDBInsightEndpoint {
keyEntityInfo.setSize(omKeyInfo.getDataSize());
keyEntityInfo.setReplicatedSize(omKeyInfo.getReplicatedSize());
keyEntityInfo.setReplicationConfig(omKeyInfo.getReplicationConfig());
- openKeyInsightInfo.setUnreplicatedTotal(
- openKeyInsightInfo.getUnreplicatedTotal() +
+ openKeyInsightInfo.setUnreplicatedDataSize(
+ openKeyInsightInfo.getUnreplicatedDataSize() +
keyEntityInfo.getSize());
- openKeyInsightInfo.setReplicatedTotal(
- openKeyInsightInfo.getReplicatedTotal() +
+ openKeyInsightInfo.setReplicatedDataSize(
+ openKeyInsightInfo.getReplicatedDataSize() +
keyEntityInfo.getReplicatedSize());
boolean added =
isLegacyBucketLayout ? nonFSOKeyInfoList.add(keyEntityInfo) :
@@ -211,10 +254,53 @@ public class OMDBInsightEndpoint {
break;
}
}
+ // Populate the keysSummary map
+ createKeysSummaryForOpenKey(keysSummary);
+
+ openKeyInsightInfo.setKeysSummary(keysSummary);
+
openKeyInsightInfo.setLastKey(lastKey);
return Response.ok(openKeyInsightInfo).build();
}
+ /**
+ * Creates a keys summary for open keys and updates the provided
+ * keysSummary map. Calculates the total number of open keys, replicated
+ * data size, and unreplicated data size.
+ *
+ * @param keysSummary A map to store the keys summary information.
+ */
+ private void createKeysSummaryForOpenKey(
+ Map<String, Long> keysSummary) {
+ Long replicatedSizeOpenKey = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getReplicatedSizeKeyFromTable(OPEN_KEY_TABLE)));
+ Long replicatedSizeOpenFile = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getReplicatedSizeKeyFromTable(OPEN_FILE_TABLE)));
+ Long unreplicatedSizeOpenKey = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getUnReplicatedSizeKeyFromTable(OPEN_KEY_TABLE)));
+ Long unreplicatedSizeOpenFile = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getUnReplicatedSizeKeyFromTable(OPEN_FILE_TABLE)));
+ Long openKeyCountForKeyTable = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getTableCountKeyFromTable(OPEN_KEY_TABLE)));
+ Long openKeyCountForFileTable = getValueFromId(globalStatsDao.findById(
+ OmTableInsightTask.getTableCountKeyFromTable(OPEN_FILE_TABLE)));
+
+ // Calculate the total number of open keys
+ keysSummary.put("totalOpenKeys",
+ openKeyCountForKeyTable + openKeyCountForFileTable);
+ // Calculate the total replicated and unreplicated sizes
+ keysSummary.put("totalReplicatedDataSize",
+ replicatedSizeOpenKey + replicatedSizeOpenFile);
+ keysSummary.put("totalUnreplicatedDataSize",
+ unreplicatedSizeOpenKey + unreplicatedSizeOpenFile);
+
+ }
+
+ private Long getValueFromId(GlobalStats record) {
+ // If the record is null, return 0
+ return record != null ? record.getValue() : 0L;
+ }
+
private void getPendingForDeletionKeyInfo(
int limit,
String prevKey,
@@ -271,8 +357,9 @@ public class OMDBInsightEndpoint {
}
}
- /** This method retrieves set of keys/files pending for deletion.
- *
+ /**
+ * This method retrieves set of keys/files pending for deletion.
+ * <p>
* limit - limits the number of key/files returned.
* prevKey - E.g. /vol1/bucket1/key1, this will skip keys till it
* seeks correctly to the given prevKey.
@@ -379,11 +466,11 @@ public class OMDBInsightEndpoint {
keyEntityInfo.setSize(omKeyInfo.getDataSize());
keyEntityInfo.setReplicatedSize(omKeyInfo.getReplicatedSize());
keyEntityInfo.setReplicationConfig(omKeyInfo.getReplicationConfig());
- pendingForDeletionKeyInfo.setUnreplicatedTotal(
- pendingForDeletionKeyInfo.getUnreplicatedTotal() +
+ pendingForDeletionKeyInfo.setUnreplicatedDataSize(
+ pendingForDeletionKeyInfo.getUnreplicatedDataSize() +
keyEntityInfo.getSize());
- pendingForDeletionKeyInfo.setReplicatedTotal(
- pendingForDeletionKeyInfo.getReplicatedTotal() +
+ pendingForDeletionKeyInfo.setReplicatedDataSize(
+ pendingForDeletionKeyInfo.getReplicatedDataSize() +
keyEntityInfo.getReplicatedSize());
deletedDirInfoList.add(keyEntityInfo);
if (deletedDirInfoList.size() == limit) {
@@ -470,12 +557,18 @@ public class OMDBInsightEndpoint {
KeyInsightInfoResponse deletedKeyAndDirInsightInfo,
RepeatedOmKeyInfo repeatedOmKeyInfo) {
repeatedOmKeyInfo.getOmKeyInfoList().forEach(omKeyInfo -> {
- deletedKeyAndDirInsightInfo.setUnreplicatedTotal(
- deletedKeyAndDirInsightInfo.getUnreplicatedTotal() +
+ deletedKeyAndDirInsightInfo.setUnreplicatedDataSize(
+ deletedKeyAndDirInsightInfo.getUnreplicatedDataSize() +
omKeyInfo.getDataSize());
- deletedKeyAndDirInsightInfo.setReplicatedTotal(
- deletedKeyAndDirInsightInfo.getReplicatedTotal() +
+ deletedKeyAndDirInsightInfo.setReplicatedDataSize(
+ deletedKeyAndDirInsightInfo.getReplicatedDataSize() +
omKeyInfo.getReplicatedSize());
});
}
+
+ @VisibleForTesting
+ public GlobalStatsDao getDao() {
+ return this.globalStatsDao;
+ }
+
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyInsightInfoResponse.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyInsightInfoResponse.java
index 18da6b438e..425454ffcc 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyInsightInfoResponse.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/KeyInsightInfoResponse.java
@@ -23,25 +23,31 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* HTTP Response wrapped for keys insights.
*/
public class KeyInsightInfoResponse {
+ /** Keys summary. Includes aggregated information about the keys. */
+ @JsonProperty("keysSummary")
+ private Map<String, Long> keysSummary;
+
/** last key sent. */
@JsonProperty("lastKey")
private String lastKey;
/** Amount of data mapped to all keys and files in
* a cluster across all DNs. */
- @JsonProperty("replicatedTotal")
- private long replicatedTotal;
+ @JsonProperty("replicatedDataSize")
+ private long replicatedDataSize;
/** Amount of data mapped to all keys and files on a single DN. */
- @JsonProperty("unreplicatedTotal")
- private long unreplicatedTotal;
+ @JsonProperty("unreplicatedDataSize")
+ private long unreplicatedDataSize;
/** List of all non-fso keys. */
@JsonProperty("nonFSO")
@@ -69,12 +75,21 @@ public class KeyInsightInfoResponse {
public KeyInsightInfoResponse() {
responseCode = ResponseStatus.OK;
lastKey = "";
- replicatedTotal = 0L;
- unreplicatedTotal = 0L;
+ replicatedDataSize = 0L;
+ unreplicatedDataSize = 0L;
nonFSOKeyInfoList = new ArrayList<>();
fsoKeyInfoList = new ArrayList<>();
repeatedOmKeyInfoList = new ArrayList<>();
deletedDirInfoList = new ArrayList<>();
+ keysSummary = new HashMap<>();
+ }
+
+ public Map<String, Long> getKeysSummary() {
+ return keysSummary;
+ }
+
+ public void setKeysSummary(Map<String, Long> keysSummary) {
+ this.keysSummary = keysSummary;
}
public String getLastKey() {
@@ -85,20 +100,20 @@ public class KeyInsightInfoResponse {
this.lastKey = lastKey;
}
- public long getReplicatedTotal() {
- return replicatedTotal;
+ public long getReplicatedDataSize() {
+ return replicatedDataSize;
}
- public void setReplicatedTotal(long replicatedTotal) {
- this.replicatedTotal = replicatedTotal;
+ public void setReplicatedDataSize(long replicatedDataSize) {
+ this.replicatedDataSize = replicatedDataSize;
}
- public long getUnreplicatedTotal() {
- return unreplicatedTotal;
+ public long getUnreplicatedDataSize() {
+ return unreplicatedDataSize;
}
- public void setUnreplicatedTotal(long unreplicatedTotal) {
- this.unreplicatedTotal = unreplicatedTotal;
+ public void setUnreplicatedDataSize(long unreplicatedDataSize) {
+ this.unreplicatedDataSize = unreplicatedDataSize;
}
public List<KeyEntityInfo> getNonFSOKeyInfoList() {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
new file mode 100644
index 0000000000..fb4e44126f
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.common.collect.Iterators;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import java.util.Map.Entry;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.jooq.impl.DSL.currentTimestamp;
+import static org.jooq.impl.DSL.select;
+import static org.jooq.impl.DSL.using;
+
+/**
+ * Class to iterate over the OM DB and store the total counts of volumes,
+ * buckets, keys, open keys, deleted keys, etc.
+ */
+public class OmTableInsightTask implements ReconOmTask {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OmTableInsightTask.class);
+
+ private GlobalStatsDao globalStatsDao;
+ private Configuration sqlConfiguration;
+ private ReconOMMetadataManager reconOMMetadataManager;
+
+ @Inject
+ public OmTableInsightTask(GlobalStatsDao globalStatsDao,
+ Configuration sqlConfiguration,
+ ReconOMMetadataManager reconOMMetadataManager) {
+ this.globalStatsDao = globalStatsDao;
+ this.sqlConfiguration = sqlConfiguration;
+ this.reconOMMetadataManager = reconOMMetadataManager;
+ }
+
+ /**
+ * Iterates the rows of each table in the OM snapshot DB and calculates the
+ * counts and sizes for table data.
+ *
+ * For tables that require data size calculation
+ * (as returned by getTablesToCalculateSize), both the number of
+ * records (count) and total data size of the records are calculated.
+ * For all other tables, only the count of records is calculated.
+ *
+ * @param omMetadataManager OM Metadata instance.
+ * @return Pair
+ */
+ @Override
+ public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ HashMap<String, Long> objectCountMap = initializeCountMap();
+ HashMap<String, Long> unReplicatedSizeCountMap = initializeSizeMap(false);
+ HashMap<String, Long> replicatedSizeCountMap = initializeSizeMap(true);
+
+ for (String tableName : getTaskTables()) {
+ Table table = omMetadataManager.getTable(tableName);
+ if (table == null) {
+ LOG.error("Table " + tableName + " not found in OM Metadata.");
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+
+ try (
+ TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator
+ = table.iterator()) {
+ if (getTablesToCalculateSize().contains(tableName)) {
+ Triple<Long, Long, Long> details = getTableSizeAndCount(iterator);
+ objectCountMap.put(getTableCountKeyFromTable(tableName),
+ details.getLeft());
+ unReplicatedSizeCountMap.put(
+ getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle());
+ replicatedSizeCountMap.put(getReplicatedSizeKeyFromTable(tableName),
+ details.getRight());
+ } else {
+ long count = Iterators.size(iterator);
+ objectCountMap.put(getTableCountKeyFromTable(tableName), count);
+ }
+ } catch (IOException ioEx) {
+ LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+ }
+ // Write the data to the DB
+ if (!objectCountMap.isEmpty()) {
+ writeDataToDB(objectCountMap);
+ }
+ if (!unReplicatedSizeCountMap.isEmpty()) {
+ writeDataToDB(unReplicatedSizeCountMap);
+ }
+ if (!replicatedSizeCountMap.isEmpty()) {
+ writeDataToDB(replicatedSizeCountMap);
+ }
+
+ LOG.info("Completed a 'reprocess' run of OmTableInsightTask.");
+ return new ImmutablePair<>(getTaskName(), true);
+ }
+
+ /**
+ * Returns a triple with the total count of records (left), total
unreplicated
+ * size (middle), and total replicated size (right) in the given iterator.
+ * Increments count for each record and adds the dataSize if a record's value
+ * is an instance of OmKeyInfo. If the iterator is null, returns (0, 0, 0).
+ *
+ * @param iterator The iterator over the table to be iterated.
+ * @return A Triple with three Long values representing the count,
+ * unreplicated size and replicated size.
+ * @throws IOException If an I/O error occurs during the iterator traversal.
+ */
+ private Triple<Long, Long, Long> getTableSizeAndCount(
+ TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator)
+ throws IOException {
+ long count = 0;
+ long unReplicatedSize = 0;
+ long replicatedSize = 0;
+
+ if (iterator != null) {
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, ?> kv = iterator.next();
+ if (kv != null && kv.getValue() != null) {
+ if (kv.getValue() instanceof OmKeyInfo) {
+ OmKeyInfo omKeyInfo = (OmKeyInfo) kv.getValue();
+ unReplicatedSize += omKeyInfo.getDataSize();
+ replicatedSize += omKeyInfo.getReplicatedSize();
+ }
+ count++; // Increment count for each row
+ }
+ }
+ }
+
+ return Triple.of(count, unReplicatedSize, replicatedSize);
+ }
+
+ /**
+ * Returns a collection of table names that require data size calculation.
+ */
+ public Collection<String> getTablesToCalculateSize() {
+ List<String> taskTables = new ArrayList<>();
+ taskTables.add(OPEN_KEY_TABLE);
+ taskTables.add(OPEN_FILE_TABLE);
+ return taskTables;
+ }
+
+ @Override
+ public String getTaskName() {
+ return "OmTableInsightTask";
+ }
+
+ public Collection<String> getTaskTables() {
+ return new ArrayList<>(reconOMMetadataManager.listTableNames());
+ }
+
+ /**
+ * Read the update events and update the count and sizes of respective object
+ * (volume, bucket, key etc.) based on the action (put or delete).
+ *
+ * @param events Update events - PUT, DELETE and UPDATE.
+ * @return Pair
+ */
+ @Override
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ HashMap<String, Long> objectCountMap = initializeCountMap();
+ HashMap<String, Long> unreplicatedSizeCountMap = initializeSizeMap(false);
+ HashMap<String, Long> replicatedSizeCountMap = initializeSizeMap(true);
+ final Collection<String> taskTables = getTaskTables();
+ final Collection<String> sizeRelatedTables = getTablesToCalculateSize();
+
+ while (eventIterator.hasNext()) {
+ OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+ String tableName = omdbUpdateEvent.getTable();
+
+ if (!taskTables.contains(tableName)) {
+ continue;
+ }
+
+ String countKey = getTableCountKeyFromTable(tableName);
+ String unReplicatedSizeKey =
+ getUnReplicatedSizeKeyFromTable(tableName);
+ String replicatedSizeKey =
+ getReplicatedSizeKeyFromTable(tableName);
+
+ try {
+ switch (omdbUpdateEvent.getAction()) {
+ case PUT:
+ objectCountMap.computeIfPresent(countKey, (k, count) -> count + 1L);
+
+ // Compute unreplicated and replicated sizes for size-related tables
+ if (sizeRelatedTables.contains(tableName) &&
+ omdbUpdateEvent.getValue() instanceof OmKeyInfo) {
+ OmKeyInfo omKeyInfo = (OmKeyInfo) omdbUpdateEvent.getValue();
+ unreplicatedSizeCountMap.computeIfPresent(unReplicatedSizeKey,
+ (k, size) -> size + omKeyInfo.getDataSize());
+ replicatedSizeCountMap.computeIfPresent(replicatedSizeKey,
+ (k, size) -> size + omKeyInfo.getReplicatedSize());
+ }
+ break;
+
+ case DELETE:
+ if (omdbUpdateEvent.getValue() != null) {
+ objectCountMap.computeIfPresent(countKey,
+ (k, count) -> count > 0 ? count - 1L : 0L);
+
+ // Compute unreplicated and replicated sizes for size-related
tables
+ if (sizeRelatedTables.contains(tableName) &&
+ omdbUpdateEvent.getValue() instanceof OmKeyInfo) {
+ OmKeyInfo omKeyInfo = (OmKeyInfo) omdbUpdateEvent.getValue();
+ unreplicatedSizeCountMap.computeIfPresent(unReplicatedSizeKey,
+ (k, size) ->
+ size > omKeyInfo.getDataSize() ?
+ size - omKeyInfo.getDataSize() : 0L);
+ replicatedSizeCountMap.computeIfPresent(replicatedSizeKey,
+ (k, size) ->
+ size > omKeyInfo.getReplicatedSize() ?
+ size - omKeyInfo.getReplicatedSize() : 0L);
+ }
+ }
+ break;
+ case UPDATE:
+ if (omdbUpdateEvent.getValue() instanceof OmKeyInfo &&
+ sizeRelatedTables.contains(tableName) &&
+ omdbUpdateEvent.getOldValue() != null) {
+ OmKeyInfo oldKeyInfo = (OmKeyInfo) omdbUpdateEvent.getOldValue();
+ OmKeyInfo newKeyInfo = (OmKeyInfo) omdbUpdateEvent.getValue();
+ // Update key size by subtracting the oldSize and adding newSize
+ unreplicatedSizeCountMap.computeIfPresent(unReplicatedSizeKey,
+ (k, size) -> size - oldKeyInfo.getDataSize() +
+ newKeyInfo.getDataSize());
+ replicatedSizeCountMap.computeIfPresent(replicatedSizeKey,
+ (k, size) -> size - oldKeyInfo.getReplicatedSize() +
+ newKeyInfo.getReplicatedSize());
+ } else if (omdbUpdateEvent.getValue() != null) {
+ LOG.warn("Update event does not have the old Key Info for {}.",
+ omdbUpdateEvent.getKey());
+ }
+ break;
+ default:
+ LOG.trace("Skipping DB update event : Table: {}, Action: {}",
+ tableName, omdbUpdateEvent.getAction());
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Unexpected exception while processing the table {}, Action: {}",
+ tableName, omdbUpdateEvent.getAction(), e);
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+ }
+
+ if (!objectCountMap.isEmpty()) {
+ writeDataToDB(objectCountMap);
+ }
+ if (!unreplicatedSizeCountMap.isEmpty()) {
+ writeDataToDB(unreplicatedSizeCountMap);
+ }
+ if (!replicatedSizeCountMap.isEmpty()) {
+ writeDataToDB(replicatedSizeCountMap);
+ }
+
+ LOG.info("Completed a 'process' run of OmTableInsightTask.");
+ return new ImmutablePair<>(getTaskName(), true);
+ }
+
+
+ private void writeDataToDB(Map<String, Long> dataMap) {
+ List<GlobalStats> insertGlobalStats = new ArrayList<>();
+ List<GlobalStats> updateGlobalStats = new ArrayList<>();
+
+ for (Entry<String, Long> entry : dataMap.entrySet()) {
+ Timestamp now =
+ using(sqlConfiguration).fetchValue(select(currentTimestamp()));
+ GlobalStats record = globalStatsDao.fetchOneByKey(entry.getKey());
+ GlobalStats newRecord
+ = new GlobalStats(entry.getKey(), entry.getValue(), now);
+
+ // Insert a new record for key if it does not exist
+ if (record == null) {
+ insertGlobalStats.add(newRecord);
+ } else {
+ updateGlobalStats.add(newRecord);
+ }
+ }
+
+ globalStatsDao.insert(insertGlobalStats);
+ globalStatsDao.update(updateGlobalStats);
+ }
+
+ private HashMap<String, Long> initializeCountMap() {
+ Collection<String> tables = getTaskTables();
+ HashMap<String, Long> objectCountMap = new HashMap<>(tables.size());
+ for (String tableName : tables) {
+ String key = getTableCountKeyFromTable(tableName);
+ objectCountMap.put(key, getValueForKey(key));
+ }
+ return objectCountMap;
+ }
+
+ /**
+ * Initializes a size map with the replicated or unreplicated sizes for the
+ * tables to calculate size.
+ *
+ * @return The size map containing the size counts for each table.
+ */
+ private HashMap<String, Long> initializeSizeMap(boolean replicated) {
+ Collection<String> tables = getTablesToCalculateSize();
+ HashMap<String, Long> sizeCountMap = new HashMap<>(tables.size());
+ for (String tableName : tables) {
+ String key = replicated ? getReplicatedSizeKeyFromTable(tableName) :
+ getUnReplicatedSizeKeyFromTable(tableName);
+ sizeCountMap.put(key, getValueForKey(key));
+ }
+ return sizeCountMap;
+ }
+
+ public static String getTableCountKeyFromTable(String tableName) {
+ return tableName + "TableCount";
+ }
+
+ public static String getReplicatedSizeKeyFromTable(String tableName) {
+ return tableName + "ReplicatedDataSize";
+ }
+
+ public static String getUnReplicatedSizeKeyFromTable(String tableName) {
+ return tableName + "UnReplicatedDataSize";
+ }
+
+ /**
+ * Get the value stored for the given key from the Global Stats table.
+ * Return 0 if the record is not found.
+ *
+ * @param key Key in the Global Stats table
+ * @return The value associated with the key
+ */
+ private long getValueForKey(String key) {
+ GlobalStats record = globalStatsDao.fetchOneByKey(key);
+
+ return (record == null) ? 0L : record.getValue();
+ }
+
+}
+
+
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
deleted file mode 100644
index 1027eb952d..0000000000
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.tasks;
-
-import com.google.inject.Inject;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
-import org.jooq.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import static org.jooq.impl.DSL.currentTimestamp;
-import static org.jooq.impl.DSL.select;
-import static org.jooq.impl.DSL.using;
-
-/**
- * Class to iterate over the OM DB and store the total counts of volumes,
- * buckets, keys, open keys, deleted keys, etc.
- */
-public class TableCountTask implements ReconOmTask {
- private static final Logger LOG =
- LoggerFactory.getLogger(TableCountTask.class);
-
- private GlobalStatsDao globalStatsDao;
- private Configuration sqlConfiguration;
- private ReconOMMetadataManager reconOMMetadataManager;
-
- @Inject
- public TableCountTask(GlobalStatsDao globalStatsDao,
- Configuration sqlConfiguration,
- ReconOMMetadataManager reconOMMetadataManager) {
- this.globalStatsDao = globalStatsDao;
- this.sqlConfiguration = sqlConfiguration;
- this.reconOMMetadataManager = reconOMMetadataManager;
- }
-
- /**
- * Iterate the rows of each table in OM snapshot DB and calculate the
- * counts for each table.
- *
- * @param omMetadataManager OM Metadata instance.
- * @return Pair
- */
- @Override
- public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
- HashMap<String, Long> objectCountMap = initializeCountMap();
- for (String tableName : getTaskTables()) {
- Table table = omMetadataManager.getTable(tableName);
- try (TableIterator keyIter = table.iterator()) {
- long count = getCount(keyIter);
- objectCountMap.put(getRowKeyFromTable(tableName), count);
- } catch (IOException ioEx) {
- LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
- return new ImmutablePair<>(getTaskName(), false);
- }
- }
- writeCountsToDB(objectCountMap);
- LOG.info("Completed a 'reprocess' run of TableCountTask.");
- return new ImmutablePair<>(getTaskName(), true);
- }
-
- private long getCount(Iterator iterator) {
- long count = 0L;
- while (iterator.hasNext()) {
- count++;
- iterator.next();
- }
- return count;
- }
-
- @Override
- public String getTaskName() {
- return "TableCountTask";
- }
-
- public Collection<String> getTaskTables() {
- return new ArrayList<>(reconOMMetadataManager.listTableNames());
- }
-
- /**
- * Read the update events and update the count of respective object
- * (volume, bucket, key etc.) based on the action (put or delete).
- *
- * @param events Update events - PUT, DELETE and UPDATE.
- * @return Pair
- */
- @Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
- Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
- HashMap<String, Long> objectCountMap = initializeCountMap();
- final Collection<String> taskTables = getTaskTables();
-
- while (eventIterator.hasNext()) {
- OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
- // Filter event inside process method to avoid duping
- if (!taskTables.contains(omdbUpdateEvent.getTable())) {
- continue;
- }
- String rowKey = getRowKeyFromTable(omdbUpdateEvent.getTable());
- try {
- switch (omdbUpdateEvent.getAction()) {
- case PUT:
- objectCountMap.computeIfPresent(rowKey, (k, count) -> count + 1L);
- break;
-
- case DELETE:
- // if value is null, it means that the volume / bucket / key
- // is already deleted and does not exist in the OM database anymore.
- if (omdbUpdateEvent.getValue() != null) {
- String key = getRowKeyFromTable(omdbUpdateEvent.getTable());
- objectCountMap.computeIfPresent(key,
- (k, count) -> count > 0 ? count - 1L : 0L);
- }
- break;
-
- default: LOG.trace("Skipping DB update event : Table: {}, Action: {}",
- omdbUpdateEvent.getTable(), omdbUpdateEvent.getAction());
- }
- } catch (Exception e) {
- LOG.error("Unexpected exception while processing the table {}, " +
- "Action: {}", omdbUpdateEvent.getTable(),
- omdbUpdateEvent.getAction(), e);
- return new ImmutablePair<>(getTaskName(), false);
- }
- }
- writeCountsToDB(objectCountMap);
-
- LOG.info("Completed a 'process' run of TableCountTask.");
- return new ImmutablePair<>(getTaskName(), true);
- }
-
- private void writeCountsToDB(Map<String, Long> objectCountMap) {
- List<GlobalStats> insertGlobalStats = new ArrayList<>();
- List<GlobalStats> updateGlobalStats = new ArrayList<>();
-
- for (Entry<String, Long> entry: objectCountMap.entrySet()) {
- Timestamp now =
- using(sqlConfiguration).fetchValue(select(currentTimestamp()));
- GlobalStats record = globalStatsDao.fetchOneByKey(entry.getKey());
- GlobalStats newRecord
- = new GlobalStats(entry.getKey(), entry.getValue(), now);
-
- // Insert a new record for key if it does not exist
- if (record == null) {
- insertGlobalStats.add(newRecord);
- } else {
- updateGlobalStats.add(newRecord);
- }
- }
-
- globalStatsDao.insert(insertGlobalStats);
- globalStatsDao.update(updateGlobalStats);
- }
-
- private HashMap<String, Long> initializeCountMap() {
- Collection<String> tables = getTaskTables();
- HashMap<String, Long> objectCountMap = new HashMap<>(tables.size());
- for (String tableName: tables) {
- String key = getRowKeyFromTable(tableName);
- objectCountMap.put(key, getCountForKey(key));
- }
- return objectCountMap;
- }
-
- public static String getRowKeyFromTable(String tableName) {
- return tableName + "Count";
- }
-
- /**
- * Get the count stored for the given key from Global Stats table.
- * Return 0 if record not found.
- *
- * @param key Key in the global stats table
- * @return count
- */
- private long getCountForKey(String key) {
- GlobalStats record = globalStatsDao.fetchOneByKey(key);
-
- return (record == null) ? 0L : record.getValue();
- }
-}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
index 395e8e1737..838703bb59 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/OMMetadataManagerTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon;
import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
@@ -37,6 +38,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -260,6 +262,72 @@ public final class OMMetadataManagerTestUtils {
.build());
}
+ /**
+ * Write an open key to OM instance optimized for File System.
+ *
+ * @throws IOException while writing.
+ */
+ @SuppressWarnings("checkstyle:parameternumber")
+ public static void writeOpenFileToOm(OMMetadataManager omMetadataManager,
+ String keyName,
+ String bucketName,
+ String volName,
+ String fileName,
+ long objectId,
+ long parentObjectId,
+ long bucketObjectId,
+ long volumeObjectId,
+ List<OmKeyLocationInfoGroup> locationVersions,
+ long dataSize)
+ throws IOException {
+
+ String openKey = omMetadataManager.getOzonePathKey(volumeObjectId,
+ bucketObjectId, parentObjectId, fileName);
+
+ OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+ .setBucketName(bucketName)
+ .setVolumeName(volName)
+ .setKeyName(keyName)
+ .setDataSize(dataSize)
+ .setOmKeyLocationInfos(locationVersions)
+ .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
+ .setObjectID(objectId)
+ .setParentObjectID(parentObjectId)
+ .build();
+
+ omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .put(openKey, omKeyInfo);
+ }
+
+ /**
+ * Write an open key to OM instance with any other BucketLayout.
+ *
+ * @throws IOException while writing.
+ */
+ public static void writeOpenKeyToOm(OMMetadataManager omMetadataManager,
+ String keyName,
+ String bucketName,
+ String volName,
+ List<OmKeyLocationInfoGroup>
locationVersions,
+ long dataSize)
+ throws IOException {
+
+ String openKey =
+ omMetadataManager.getOzoneKey(volName, bucketName, keyName);
+ OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+ .setBucketName(bucketName)
+ .setVolumeName(volName)
+ .setKeyName(keyName)
+ .setDataSize(dataSize)
+ .setOmKeyLocationInfos(locationVersions)
+ .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
+ .build();
+
+ omMetadataManager.getOpenKeyTable(BucketLayout.LEGACY)
+ .put(openKey, omKeyInfo);
+ }
+
+
/**
* Writes deleted key information to the Ozone Manager metadata table.
* @param omMetadataManager the Ozone Manager metadata manager
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 45639409bf..232eb7bcc3 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -76,7 +76,7 @@ import
org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
-import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
@@ -141,7 +141,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private ReconOMMetadataManager reconOMMetadataManager;
private FileSizeCountTask fileSizeCountTask;
private ContainerSizeCountTask containerSizeCountTask;
- private TableCountTask tableCountTask;
+ private OmTableInsightTask omTableInsightTask;
private ReconStorageContainerManagerFacade reconScm;
private boolean isSetupDone = false;
private String pipelineId;
@@ -254,7 +254,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
utilizationSchemaDefinition);
fileSizeCountTask =
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
- tableCountTask = new TableCountTask(
+ omTableInsightTask = new OmTableInsightTask(
globalStatsDao, sqlConfiguration, reconOMMetadataManager);
containerHealthSchemaManager =
reconTestInjector.getInstance(ContainerHealthSchemaManager.class);
@@ -635,7 +635,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
// check volume, bucket and key count after running table count task
Pair<String, Boolean> result =
- tableCountTask.reprocess(reconOMMetadataManager);
+ omTableInsightTask.reprocess(reconOMMetadataManager);
assertTrue(result.getRight());
response = clusterStateEndpoint.getClusterState();
clusterStateResponse = (ClusterStateResponse) response.getEntity();
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
index 71fe6185f7..1dd5121563 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOmDBInsightEndPoint.java
@@ -33,6 +33,7 @@ import
org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.recon.ReconTestInjector;
import org.apache.hadoop.ozone.recon.api.types.KeyInsightInfoResponse;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.scm.ReconPipelineManager;
@@ -42,16 +43,18 @@ import
org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import
org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
-import org.junit.rules.TemporaryFolder;
import javax.ws.rs.core.Response;
+import java.sql.Timestamp;
+import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
@@ -67,10 +70,8 @@ import static org.mockito.Mockito.when;
/**
* Unit test for OmDBInsightEndPoint.
*/
-public class TestOmDBInsightEndPoint {
+public class TestOmDBInsightEndPoint extends AbstractReconSqlDBTest {
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
private OzoneStorageContainerManager ozoneStorageContainerManager;
private ReconContainerMetadataManager reconContainerMetadataManager;
private OMMetadataManager omMetadataManager;
@@ -201,7 +202,8 @@ public class TestOmDBInsightEndPoint {
reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
.get("/sampleVol/bucketOne/key_one");
Assertions.assertEquals("key_one", omKeyInfo1.getKeyName());
- Response openKeyInfoResp = omdbInsightEndpoint.getOpenKeyInfo(-1, "");
+ Response openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(-1, "", true, true);
KeyInsightInfoResponse keyInsightInfoResp =
(KeyInsightInfoResponse) openKeyInfoResp.getEntity();
Assertions.assertNotNull(keyInsightInfoResp);
@@ -209,6 +211,39 @@ public class TestOmDBInsightEndPoint {
keyInsightInfoResp.getNonFSOKeyInfoList().get(0).getPath());
}
+ @Test
+ public void testKeysSummaryAttribute() {
+ Timestamp now = new Timestamp(System.currentTimeMillis());
+ GlobalStatsDao statsDao = omdbInsightEndpoint.getDao();
+ // Insert records for replicated and unreplicated data sizes
+ GlobalStats newRecord =
+ new GlobalStats("openFileTableReplicatedDataSize", 30L, now);
+ statsDao.insert(newRecord);
+ newRecord = new GlobalStats("openKeyTableReplicatedDataSize", 30L, now);
+ statsDao.insert(newRecord);
+ newRecord = new GlobalStats("openFileTableUnReplicatedDataSize", 10L, now);
+ statsDao.insert(newRecord);
+ newRecord = new GlobalStats("openKeyTableUnReplicatedDataSize", 10L, now);
+ statsDao.insert(newRecord);
+
+ // Insert records for table counts
+ newRecord = new GlobalStats("openKeyTableTableCount", 3L, now);
+ statsDao.insert(newRecord);
+ newRecord = new GlobalStats("openFileTableTableCount", 3L, now);
+ statsDao.insert(newRecord);
+
+ // Call the API to get the response
+ Response openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(-1, "", true, true);
+ KeyInsightInfoResponse keyInsightInfoResp =
+ (KeyInsightInfoResponse) openKeyInfoResp.getEntity();
+ Assertions.assertNotNull(keyInsightInfoResp);
+ Map<String, Long> summary = keyInsightInfoResp.getKeysSummary();
+ Assertions.assertEquals(60L, summary.get("totalReplicatedDataSize"));
+ Assertions.assertEquals(20L, summary.get("totalUnreplicatedDataSize"));
+ Assertions.assertEquals(6L, summary.get("totalOpenKeys"));
+ }
+
@Test
public void testGetOpenKeyInfoLimitParam() throws Exception {
OmKeyInfo omKeyInfo1 =
@@ -224,7 +259,8 @@ public class TestOmDBInsightEndPoint {
.put("/sampleVol/bucketOne/key_two", omKeyInfo2);
reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
.put("/sampleVol/bucketOne/key_three", omKeyInfo3);
- Response openKeyInfoResp = omdbInsightEndpoint.getOpenKeyInfo(2, "");
+ Response openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(2, "", true, true);
KeyInsightInfoResponse keyInsightInfoResp =
(KeyInsightInfoResponse) openKeyInfoResp.getEntity();
Assertions.assertNotNull(keyInsightInfoResp);
@@ -236,7 +272,7 @@ public class TestOmDBInsightEndPoint {
Assertions.assertEquals("key_three",
keyInsightInfoResp.getNonFSOKeyInfoList().get(1).getPath());
- openKeyInfoResp = omdbInsightEndpoint.getOpenKeyInfo(3, "");
+ openKeyInfoResp = omdbInsightEndpoint.getOpenKeyInfo(3, "", true, true);
keyInsightInfoResp =
(KeyInsightInfoResponse) openKeyInfoResp.getEntity();
Assertions.assertNotNull(keyInsightInfoResp);
@@ -249,6 +285,87 @@ public class TestOmDBInsightEndPoint {
keyInsightInfoResp.getNonFSOKeyInfoList().get(1).getPath());
}
+ @Test
+ public void testGetOpenKeyInfoWithIncludeFsoAndIncludeNonFsoParams()
+ throws Exception {
+ OmKeyInfo omKeyInfo1 =
+ getOmKeyInfo("sampleVol", "non_fso_Bucket", "non_fso_key1", true);
+ OmKeyInfo omKeyInfo2 =
+ getOmKeyInfo("sampleVol", "non_fso_Bucket", "non_fso_key2", true);
+ OmKeyInfo omKeyInfo3 =
+ getOmKeyInfo("sampleVol", "non_fso_Bucket", "non_fso_key3", true);
+
+ reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
+ .put("/sampleVol/non_fso_Bucket/non_fso_key1", omKeyInfo1);
+ reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
+ .put("/sampleVol/non_fso_Bucket/non_fso_key2", omKeyInfo2);
+ reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
+ .put("/sampleVol/non_fso_Bucket/non_fso_key3", omKeyInfo3);
+
+ omKeyInfo1 =
+ getOmKeyInfo("sampleVol", "fso_Bucket", "fso_key1", false);
+ omKeyInfo2 =
+ getOmKeyInfo("sampleVol", "fso_Bucket", "fso_key2", false);
+ omKeyInfo3 =
+ getOmKeyInfo("sampleVol", "fso_Bucket", "fso_key3", false);
+ OmKeyInfo omKeyInfo4 =
+ getOmKeyInfo("sampleVol", "fso_Bucket", "fso_key4", false);
+
+ reconOMMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .put("/sampleVol/fso_Bucket/fso_key1", omKeyInfo1);
+ reconOMMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .put("/sampleVol/fso_Bucket/fso_key2", omKeyInfo2);
+ reconOMMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .put("/sampleVol/fso_Bucket/fso_key3", omKeyInfo3);
+ reconOMMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .put("/sampleVol/fso_Bucket/fso_key4", omKeyInfo4);
+
+ // CASE 1 :- Display only FSO keys in response
+ // includeFsoKeys=true, includeNonFsoKeys=false
+ Response openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(10, "", true, false);
+ KeyInsightInfoResponse keyInsightInfoResp =
+ (KeyInsightInfoResponse) openKeyInfoResp.getEntity();
+ Assertions.assertNotNull(keyInsightInfoResp);
+ Assertions.assertEquals(4,
+ keyInsightInfoResp.getFsoKeyInfoList().size());
+ Assertions.assertEquals(0,
+ keyInsightInfoResp.getNonFSOKeyInfoList().size());
+
+ // CASE 2 :- Display only Non-FSO keys in response
+ // includeFsoKeys=false, includeNonFsoKeys=true
+ openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(10, "", false, true);
+ keyInsightInfoResp = (KeyInsightInfoResponse) openKeyInfoResp.getEntity();
+ Assertions.assertNotNull(keyInsightInfoResp);
+ Assertions.assertEquals(0,
+ keyInsightInfoResp.getFsoKeyInfoList().size());
+ Assertions.assertEquals(3,
+ keyInsightInfoResp.getNonFSOKeyInfoList().size());
+
+ // CASE 3 :- Display both FSO and Non-FSO keys in response
+ // includeFsoKeys=true, includeNonFsoKeys=true
+ openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(10, "", true, true);
+ keyInsightInfoResp = (KeyInsightInfoResponse) openKeyInfoResp.getEntity();
+ Assertions.assertNotNull(keyInsightInfoResp);
+ Assertions.assertEquals(4,
+ keyInsightInfoResp.getFsoKeyInfoList().size());
+ Assertions.assertEquals(3,
+ keyInsightInfoResp.getNonFSOKeyInfoList().size());
+
+ // CASE 4 :- Don't Display both FSO and Non-FSO keys in response
+ // includeFsoKeys=false, includeNonFsoKeys=false
+ openKeyInfoResp =
+ omdbInsightEndpoint.getOpenKeyInfo(10, "", false, false);
+ keyInsightInfoResp = (KeyInsightInfoResponse) openKeyInfoResp.getEntity();
+ Assertions.assertNotNull(keyInsightInfoResp);
+ Assertions.assertEquals(0,
+ keyInsightInfoResp.getFsoKeyInfoList().size());
+ Assertions.assertEquals(0,
+ keyInsightInfoResp.getNonFSOKeyInfoList().size());
+ }
+
@Test
public void testGetOpenKeyInfoPrevKeyParam() throws Exception {
OmKeyInfo omKeyInfo1 =
@@ -265,7 +382,8 @@ public class TestOmDBInsightEndPoint {
reconOMMetadataManager.getOpenKeyTable(getBucketLayout())
.put("/sampleVol/bucketOne/key_three", omKeyInfo3);
Response openKeyInfoResp =
- omdbInsightEndpoint.getOpenKeyInfo(-1, "/sampleVol/bucketOne/key_one");
+ omdbInsightEndpoint.getOpenKeyInfo(-1, "/sampleVol/bucketOne/key_one",
+ true, true);
KeyInsightInfoResponse keyInsightInfoResp =
(KeyInsightInfoResponse) openKeyInfoResp.getEntity();
Assertions.assertNotNull(keyInsightInfoResp);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
new file mode 100644
index 0000000000..04fd55d58c
--- /dev/null
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
+
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.jooq.DSLContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenKeyToOm;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeOpenFileToOm;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
+import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
+import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
+import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
+import static
org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for Object Count Task.
+ */
+public class TestOmTableInsightTask extends AbstractReconSqlDBTest {
+
+ private GlobalStatsDao globalStatsDao;
+ private OmTableInsightTask omTableInsightTask;
+ private DSLContext dslContext;
+ private boolean isSetupDone = false;
+ private ReconOMMetadataManager reconOMMetadataManager;
+
+ private void initializeInjector() throws IOException {
+ reconOMMetadataManager = getTestReconOmMetadataManager(
+ initializeNewOmMetadataManager(temporaryFolder.newFolder()),
+ temporaryFolder.newFolder());
+ globalStatsDao = getDao(GlobalStatsDao.class);
+ omTableInsightTask = new OmTableInsightTask(
+ globalStatsDao, getConfiguration(), reconOMMetadataManager);
+ dslContext = getDslContext();
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ // The following setup runs only once
+ if (!isSetupDone) {
+ initializeInjector();
+ isSetupDone = true;
+ }
+ // Truncate table before running each test
+ dslContext.truncate(GLOBAL_STATS);
+ }
+
+ @Test
+ public void testReprocessForCount() throws Exception {
+ OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+
+ // Mock 5 rows in each table and test the count
+ for (String tableName : omTableInsightTask.getTaskTables()) {
+ TypedTable<String, Object> table = mock(TypedTable.class);
+ TypedTable.TypedTableIterator mockIter = mock(TypedTable
+ .TypedTableIterator.class);
+ when(table.iterator()).thenReturn(mockIter);
+ when(omMetadataManager.getTable(tableName)).thenReturn(table);
+ when(mockIter.hasNext())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+ TypedTable.TypedKeyValue mockKeyValue =
+ mock(TypedTable.TypedKeyValue.class);
+ when(mockKeyValue.getValue()).thenReturn(mock(OmKeyInfo.class));
+ when(mockIter.next()).thenReturn(mockKeyValue);
+ }
+
+ Pair<String, Boolean> result =
+ omTableInsightTask.reprocess(omMetadataManager);
+ assertTrue(result.getRight());
+
+ assertEquals(5L, getCountForTable(KEY_TABLE));
+ assertEquals(5L, getCountForTable(VOLUME_TABLE));
+ assertEquals(5L, getCountForTable(BUCKET_TABLE));
+ assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(5L, getCountForTable(DELETED_TABLE));
+ }
+
+
+ @Test
+ public void testReprocessForSize() throws Exception {
+ // Populate the OpenKeys table in OM DB
+ writeOpenKeyToOm(reconOMMetadataManager,
+ "key1", "Bucket1", "Volume1", null, 1L);
+ writeOpenKeyToOm(reconOMMetadataManager,
+ "key1", "Bucket2", "Volume2", null, 2L);
+ writeOpenKeyToOm(reconOMMetadataManager,
+ "key1", "Bucket3", "Volume3", null, 3L);
+
+ // Populate the OpenFile table in OM DB
+ writeOpenFileToOm(reconOMMetadataManager,
+ "file1", "Bucket1", "Volume1", "file1", 1, 0, 1, 1, null, 1L);
+ writeOpenFileToOm(reconOMMetadataManager,
+ "file2", "Bucket2", "Volume2", "file2", 2, 0, 2, 2, null, 2L);
+ writeOpenFileToOm(reconOMMetadataManager,
+ "file3", "Bucket3", "Volume3", "file3", 3, 0, 3, 3, null, 3L);
+
+ Pair<String, Boolean> result =
+ omTableInsightTask.reprocess(reconOMMetadataManager);
+ assertTrue(result.getRight());
+ assertEquals(3L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(3L, getCountForTable(OPEN_FILE_TABLE));
+ // Test for both replicated and unreplicated size for OPEN_KEY_TABLE
+ assertEquals(6L, getUnReplicatedSizeForTable(OPEN_KEY_TABLE));
+ assertEquals(18L, getReplicatedSizeForTable(OPEN_KEY_TABLE));
+ // Test for both replicated and unreplicated size for OPEN_FILE_TABLE
+ assertEquals(6L, getUnReplicatedSizeForTable(OPEN_FILE_TABLE));
+ assertEquals(18L, getReplicatedSizeForTable(OPEN_FILE_TABLE));
+ }
+
+
+ @Test
+ public void testProcessForCount() {
+ ArrayList<OMDBUpdateEvent> events = new ArrayList<>();
+ // Create 5 put, 1 delete and 1 update event for each table
+ for (String tableName : omTableInsightTask.getTaskTables()) {
+ for (int i = 0; i < 5; i++) {
+ events.add(getOMUpdateEvent("item" + i, null, tableName, PUT, null));
+ }
+ // for delete event, if value is set to null, the counter will not be
+ // decremented. This is because the value will be null if item does not
+ // exist in the database and there is no need to delete.
+ events.add(getOMUpdateEvent("item0", mock(OmKeyInfo.class), tableName,
+ DELETE, null));
+ events.add(getOMUpdateEvent("item1", null, tableName, UPDATE, null));
+ }
+ OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(events);
+ omTableInsightTask.process(omUpdateEventBatch);
+
+ // Verify 4 items in each table. (5 puts - 1 delete + 0 update)
+ assertEquals(4L, getCountForTable(KEY_TABLE));
+ assertEquals(4L, getCountForTable(VOLUME_TABLE));
+ assertEquals(4L, getCountForTable(BUCKET_TABLE));
+ assertEquals(4L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(4L, getCountForTable(DELETED_TABLE));
+
+ // add a new key and simulate delete on non-existing item (value: null)
+ ArrayList<OMDBUpdateEvent> newEvents = new ArrayList<>();
+ for (String tableName : omTableInsightTask.getTaskTables()) {
+ newEvents.add(getOMUpdateEvent("item5", null, tableName, PUT, null));
+ // This delete event should be a noop since value is null
+ newEvents.add(getOMUpdateEvent("item0", null, tableName, DELETE, null));
+ }
+
+ omUpdateEventBatch = new OMUpdateEventBatch(newEvents);
+ omTableInsightTask.process(omUpdateEventBatch);
+
+ // Verify 5 items in each table. (1 new put + 0 delete)
+ assertEquals(5L, getCountForTable(KEY_TABLE));
+ assertEquals(5L, getCountForTable(VOLUME_TABLE));
+ assertEquals(5L, getCountForTable(BUCKET_TABLE));
+ assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(5L, getCountForTable(DELETED_TABLE));
+ }
+
+ @Test
+ public void testProcessForSize() {
+ // Prepare mock data size
+ Long sizeToBeReturned = 1000L;
+ OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+ when(omKeyInfo.getDataSize()).thenReturn(sizeToBeReturned);
+ when(omKeyInfo.getReplicatedSize()).thenReturn(sizeToBeReturned * 3);
+
+ // Test PUT events
+ ArrayList<OMDBUpdateEvent> putEvents = new ArrayList<>();
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ for (int i = 0; i < 5; i++) {
+ putEvents.add(
+ getOMUpdateEvent("item" + i, omKeyInfo, tableName, PUT, null));
+ }
+ }
+ OMUpdateEventBatch putEventBatch = new OMUpdateEventBatch(putEvents);
+ omTableInsightTask.process(putEventBatch);
+
+ // After 5 PUTs, size should be 5 * 1000 = 5000 for each size-related table
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ assertEquals(5000L, getUnReplicatedSizeForTable(tableName));
+ assertEquals(15000L, getReplicatedSizeForTable(tableName));
+ }
+
+ // Test DELETE events
+ ArrayList<OMDBUpdateEvent> deleteEvents = new ArrayList<>();
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ // Delete "item0"
+ deleteEvents.add(
+ getOMUpdateEvent("item0", omKeyInfo, tableName, DELETE, null));
+ }
+ OMUpdateEventBatch deleteEventBatch = new OMUpdateEventBatch(deleteEvents);
+ omTableInsightTask.process(deleteEventBatch);
+
+ // After deleting "item0", size should be 4 * 1000 = 4000
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ assertEquals(4000L, getUnReplicatedSizeForTable(tableName));
+ assertEquals(12000L, getReplicatedSizeForTable(tableName));
+ }
+
+ // Test UPDATE events
+ ArrayList<OMDBUpdateEvent> updateEvents = new ArrayList<>();
+ Long newSizeToBeReturned = 2000L;
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ // Update "item1" with a new size
+ OmKeyInfo newKeyInfo = mock(OmKeyInfo.class);
+ when(newKeyInfo.getDataSize()).thenReturn(newSizeToBeReturned);
+ when(newKeyInfo.getReplicatedSize()).thenReturn(newSizeToBeReturned * 3);
+ updateEvents.add(
+ getOMUpdateEvent("item1", newKeyInfo, tableName, UPDATE, omKeyInfo));
+ }
+ OMUpdateEventBatch updateEventBatch = new OMUpdateEventBatch(updateEvents);
+ omTableInsightTask.process(updateEventBatch);
+
+ // After updating "item1", size should be 4000 - 1000 + 2000 = 5000
+ // presentValue - oldValue + newValue = updatedValue
+ for (String tableName : omTableInsightTask.getTablesToCalculateSize()) {
+ assertEquals(5000L, getUnReplicatedSizeForTable(tableName));
+ assertEquals(15000L, getReplicatedSizeForTable(tableName));
+ }
+ }
+
+
+ private OMDBUpdateEvent getOMUpdateEvent(
+ String name, Object value,
+ String table,
+ OMDBUpdateEvent.OMDBUpdateAction action,
+ Object oldValue) {
+ return new OMUpdateEventBuilder()
+ .setAction(action)
+ .setKey(name)
+ .setValue(value)
+ .setTable(table)
+ .setOldValue(oldValue)
+ .build();
+ }
+
+ private long getCountForTable(String tableName) {
+ String key = OmTableInsightTask.getTableCountKeyFromTable(tableName);
+ return globalStatsDao.findById(key).getValue();
+ }
+
+ private long getUnReplicatedSizeForTable(String tableName) {
+ String key = OmTableInsightTask.getUnReplicatedSizeKeyFromTable(tableName);
+ return globalStatsDao.findById(key).getValue();
+ }
+
+ private long getReplicatedSizeForTable(String tableName) {
+ String key = OmTableInsightTask.getReplicatedSizeKeyFromTable(tableName);
+ return globalStatsDao.findById(key).getValue();
+ }
+
+ private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
+ String keyName, boolean isFile) {
+ return new OmKeyInfo.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setFile(isFile)
+ .setReplicationConfig(StandaloneReplicationConfig
+ .getInstance(HddsProtos.ReplicationFactor.ONE))
+ .setDataSize(100L)
+ .build();
+ }
+}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
deleted file mode 100644
index fb400f6417..0000000000
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.tasks;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hdds.utils.db.TypedTable;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
-
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
-import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
-import org.jooq.DSLContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
-import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
-import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
-import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
-import static
org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
-import static
org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit test for Object Count Task.
- */
-public class TestTableCountTask extends AbstractReconSqlDBTest {
-
- private GlobalStatsDao globalStatsDao;
- private TableCountTask tableCountTask;
- private DSLContext dslContext;
- private boolean isSetupDone = false;
-
- private void initializeInjector() throws IOException {
- ReconOMMetadataManager omMetadataManager = getTestReconOmMetadataManager(
- initializeNewOmMetadataManager(temporaryFolder.newFolder()),
- temporaryFolder.newFolder());
- globalStatsDao = getDao(GlobalStatsDao.class);
- tableCountTask = new TableCountTask(globalStatsDao, getConfiguration(),
- omMetadataManager);
- dslContext = getDslContext();
- }
-
- @BeforeEach
- public void setUp() throws IOException {
- // The following setup runs only once
- if (!isSetupDone) {
- initializeInjector();
- isSetupDone = true;
- }
- // Truncate table before running each test
- dslContext.truncate(GLOBAL_STATS);
- }
-
- @Test
- public void testReprocess() throws Exception {
- OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
- // Mock 5 rows in each table and test the count
- for (String tableName: tableCountTask.getTaskTables()) {
- TypedTable<String, Object> table = mock(TypedTable.class);
- TypedTable.TypedTableIterator mockIter = mock(TypedTable
- .TypedTableIterator.class);
- when(table.iterator()).thenReturn(mockIter);
- when(omMetadataManager.getTable(tableName)).thenReturn(table);
- when(mockIter.hasNext())
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(true)
- .thenReturn(false);
- }
-
- Pair<String, Boolean> result = tableCountTask.reprocess(omMetadataManager);
- assertTrue(result.getRight());
-
- assertEquals(5L, getCountForTable(KEY_TABLE));
- assertEquals(5L, getCountForTable(VOLUME_TABLE));
- assertEquals(5L, getCountForTable(BUCKET_TABLE));
- assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
- assertEquals(5L, getCountForTable(DELETED_TABLE));
- }
-
- @Test
- public void testProcess() {
- ArrayList<OMDBUpdateEvent> events = new ArrayList<>();
- // Create 5 put, 1 delete and 1 update event for each table
- for (String tableName: tableCountTask.getTaskTables()) {
- for (int i = 0; i < 5; i++) {
- events.add(getOMUpdateEvent("item" + i, null, tableName, PUT));
- }
- // for delete event, if value is set to null, the counter will not be
- // decremented. This is because the value will be null if item does not
- // exist in the database and there is no need to delete.
- events.add(getOMUpdateEvent("item0", mock(OmKeyInfo.class), tableName,
- DELETE));
- events.add(getOMUpdateEvent("item1", null, tableName, UPDATE));
- }
- OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(events);
- tableCountTask.process(omUpdateEventBatch);
-
- // Verify 4 items in each table. (5 puts - 1 delete + 0 update)
- assertEquals(4L, getCountForTable(KEY_TABLE));
- assertEquals(4L, getCountForTable(VOLUME_TABLE));
- assertEquals(4L, getCountForTable(BUCKET_TABLE));
- assertEquals(4L, getCountForTable(OPEN_KEY_TABLE));
- assertEquals(4L, getCountForTable(DELETED_TABLE));
-
- // add a new key and simulate delete on non-existing item (value: null)
- ArrayList<OMDBUpdateEvent> newEvents = new ArrayList<>();
- for (String tableName: tableCountTask.getTaskTables()) {
- newEvents.add(getOMUpdateEvent("item5", null, tableName, PUT));
- // This delete event should be a noop since value is null
- newEvents.add(getOMUpdateEvent("item0", null, tableName, DELETE));
- }
-
- omUpdateEventBatch = new OMUpdateEventBatch(newEvents);
- tableCountTask.process(omUpdateEventBatch);
-
- // Verify 5 items in each table. (1 new put + 0 delete)
- assertEquals(5L, getCountForTable(KEY_TABLE));
- assertEquals(5L, getCountForTable(VOLUME_TABLE));
- assertEquals(5L, getCountForTable(BUCKET_TABLE));
- assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
- assertEquals(5L, getCountForTable(DELETED_TABLE));
- }
-
- private OMDBUpdateEvent getOMUpdateEvent(String name, Object value,
- String table,
- OMDBUpdateEvent.OMDBUpdateAction action) {
- return new OMUpdateEventBuilder()
- .setAction(action)
- .setKey(name)
- .setValue(value)
- .setTable(table)
- .build();
- }
-
- private long getCountForTable(String tableName) {
- String key = TableCountTask.getRowKeyFromTable(tableName);
- return globalStatsDao.findById(key).getValue();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]