ArafatKhan2198 commented on code in PR #4764:
URL: https://github.com/apache/ozone/pull/4764#discussion_r1230782758


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.common.collect.Iterators;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import java.util.Map.Entry;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
+import static org.jooq.impl.DSL.currentTimestamp;
+import static org.jooq.impl.DSL.select;
+import static org.jooq.impl.DSL.using;
+
+/**
+ * Class to iterate over the OM DB and store the total counts of volumes,
+ * buckets, keys, open keys, deleted keys, etc.
+ */
+public class OmTableInsightTask implements ReconOmTask {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OmTableInsightTask.class);
+
+  private GlobalStatsDao globalStatsDao;
+  private Configuration sqlConfiguration;
+  private ReconOMMetadataManager reconOMMetadataManager;
+
+  @Inject
+  public OmTableInsightTask(GlobalStatsDao globalStatsDao,
+                            Configuration sqlConfiguration,
+                            ReconOMMetadataManager reconOMMetadataManager) {
+    this.globalStatsDao = globalStatsDao;
+    this.sqlConfiguration = sqlConfiguration;
+    this.reconOMMetadataManager = reconOMMetadataManager;
+  }
+
+  /**
+   * Iterates the rows of each table in the OM snapshot DB and calculates the
+   * counts and sizes for table data.
+   *
+   * For tables that require data size calculation
+   * (as returned by getTablesToCalculateSize), both the number of
+   * records (count) and total data size of the records are calculated.
+   * For all other tables, only the count of records is calculated.
+   *
+   * @param omMetadataManager OM Metadata instance.
+   * @return Pair
+   */
+  @Override
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    HashMap<String, Long> objectCountMap = initializeCountMap();
+    HashMap<String, Long> unReplicatedSizeCountMap = initializeSizeMap(false);
+    HashMap<String, Long> replicatedSizeCountMap = initializeSizeMap(true);
+
+    for (String tableName : getTaskTables()) {
+      Table table = omMetadataManager.getTable(tableName);
+      if (table == null) {
+        LOG.error("Table " + tableName + " not found in OM Metadata.");
+        return new ImmutablePair<>(getTaskName(), false);
+      }
+
+      try (
+          TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator
+              = table.iterator()) {
+        if (getTablesToCalculateSize().contains(tableName)) {
+          Triple<Long, Long, Long> details = getTableSizeAndCount(iterator);
+          objectCountMap.put(getTableCountKeyFromTable(tableName),
+              details.getLeft());
+          unReplicatedSizeCountMap.put(
+              getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle());
+          replicatedSizeCountMap.put(getReplicatedSizeKeyFromTable(tableName),
+              details.getRight());
+        } else {
+          long count = Iterators.size(iterator);
+          objectCountMap.put(getTableCountKeyFromTable(tableName), count);
+        }
+      } catch (IOException ioEx) {
+        LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
+        return new ImmutablePair<>(getTaskName(), false);
+      }
+    }
+    // Write the data to the DB
+    if (!objectCountMap.isEmpty()) {
+      writeDataToDB(objectCountMap);
+    }
+    if (!unReplicatedSizeCountMap.isEmpty()) {
+      writeDataToDB(unReplicatedSizeCountMap);
+    }
+    if (!replicatedSizeCountMap.isEmpty()) {
+      writeDataToDB(replicatedSizeCountMap);
+    }
+
+    LOG.info("Completed a 'reprocess' run of OmTableInsightTask.");
+    return new ImmutablePair<>(getTaskName(), true);
+  }
+
+  /**
+   * Returns a triple with the total count of records (left), total 
unreplicated
+   * size (middle), and total replicated size (right) in the given iterator.
+   * Increments count for each record and adds the dataSize if a record's value
+   * is an instance of OmKeyInfo. If the iterator is null, returns (0, 0, 0).
+   *
+   * @param iterator The iterator over the table to be iterated.
+   * @return A Triple with three Long values representing the count,
+   *         unreplicated size and replicated size.
+   * @throws IOException If an I/O error occurs during the iterator traversal.
+   */
+  private Triple<Long, Long, Long> getTableSizeAndCount(
+      TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator)
+      throws IOException {
+    long count = 0;
+    long unReplicatedSize = 0;
+    long replicatedSize = 0;
+
+    if (iterator != null) {
+      while (iterator.hasNext()) {
+        Table.KeyValue<String, ?> kv = iterator.next();
+        if (kv != null && kv.getValue() != null) {
+          if (kv.getValue() instanceof OmKeyInfo) {
+            OmKeyInfo omKeyInfo = (OmKeyInfo) kv.getValue();
+            unReplicatedSize += omKeyInfo.getDataSize();
+            replicatedSize += omKeyInfo.getReplicatedSize();
+          }
+          count++;  // Increment count for each row
+        }
+      }
+    }
+
+    return Triple.of(count, unReplicatedSize, replicatedSize);
+  }
+
+  /**
+   * Returns a collection of table names that require data size calculation.
+   */
+  public Collection<String> getTablesToCalculateSize() {
+    List<String> taskTables = new ArrayList<>();
+    taskTables.add(OPEN_KEY_TABLE);
+    taskTables.add(OPEN_FILE_TABLE);
+    return taskTables;
+  }
+
+  @Override
+  public String getTaskName() {
+    return "OmTableInsightTask";
+  }
+
+  public Collection<String> getTaskTables() {
+    return new ArrayList<>(reconOMMetadataManager.listTableNames());
+  }
+
+  /**
+   * Read the update events and update the count and sizes of respective object
+   * (volume, bucket, key etc.) based on the action (put or delete).
+   *
+   * @param events Update events - PUT, DELETE and UPDATE.
+   * @return Pair
+   */
+  @Override
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    HashMap<String, Long> objectCountMap = initializeCountMap();
+    HashMap<String, Long> unreplicatedSizeCountMap = initializeSizeMap(false);
+    HashMap<String, Long> replicatedSizeCountMap = initializeSizeMap(true);
+    final Collection<String> taskTables = getTaskTables();
+    final Collection<String> sizeRelatedTables = getTablesToCalculateSize();
+
+    while (eventIterator.hasNext()) {
+      OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+      String tableName = omdbUpdateEvent.getTable();
+
+      if (!taskTables.contains(tableName)) {
+        continue;
+      }
+
+      String countKey = getTableCountKeyFromTable(tableName);
+      String unReplicatedSizeKey =
+          getUnReplicatedSizeKeyFromTable(tableName);
+      String replicatedSizeKey =
+          getReplicatedSizeKeyFromTable(tableName);
+
+      try {
+        switch (omdbUpdateEvent.getAction()) {
+        case PUT:
+          objectCountMap.computeIfPresent(countKey, (k, count) -> count + 1L);

Review Comment:
   Thanks for confirming @sumitagrawl I'll add an UPDATE event to the task!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to