smengcl commented on a change in pull request #1670:
URL: https://github.com/apache/ozone/pull/1670#discussion_r551519149
##########
File path:
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerSchemaManager.java
##########
@@ -113,40 +142,121 @@ public void
insertUnhealthyContainerRecords(List<UnhealthyContainers> recs) {
unhealthyContainersDao.insert(recs);
}
- public void upsertContainerHistory(long containerID, String datanode,
- long time) {
- DSLContext dslContext = containerSchemaDefinition.getDSLContext();
- Record2<Long, String> recordToFind =
- dslContext.newRecord(
- CONTAINER_HISTORY.CONTAINER_ID,
- CONTAINER_HISTORY.DATANODE_HOST).value1(containerID).value2(datanode);
- ContainerHistory newRecord = new ContainerHistory();
- newRecord.setContainerId(containerID);
- newRecord.setDatanodeHost(datanode);
- newRecord.setLastReportTimestamp(time);
- ContainerHistory record = containerHistoryDao.findById(recordToFind);
- if (record != null) {
- newRecord.setFirstReportTimestamp(record.getFirstReportTimestamp());
- containerHistoryDao.update(newRecord);
- } else {
- newRecord.setFirstReportTimestamp(time);
- containerHistoryDao.insert(newRecord);
+ public void upsertContainerHistory(long containerID, UUID uuid, long time) {
+ Map<UUID, ContainerReplicaHistory> tsMap;
+ try {
+ tsMap = dbServiceProvider.getContainerReplicaHistoryMap(containerID);
+ ContainerReplicaHistory ts = tsMap.get(uuid);
+ if (ts == null) {
+ // New entry
+ tsMap.put(uuid, new ContainerReplicaHistory(uuid, time, time));
+ } else {
+ // Entry exists, update last seen time and put it back to DB.
+ ts.setLastSeenTime(time);
+ }
+ dbServiceProvider.storeContainerReplicaHistoryMap(containerID, tsMap);
+ } catch (IOException e) {
+ LOG.debug("Error on DB operations. {}", e.getMessage());
}
}
public List<ContainerHistory> getAllContainerHistory(long containerID) {
- return containerHistoryDao.fetchByContainerId(containerID);
+ // First, get the existing entries from DB
+ Map<UUID, ContainerReplicaHistory> resMap;
+ try {
+ resMap = dbServiceProvider.getContainerReplicaHistoryMap(containerID);
+ } catch (IOException ex) {
+ resMap = new HashMap<>();
+ LOG.debug("Unable to retrieve container replica history from RDB.");
+ }
+
+ // Then, update the entries with the latest in-memory info, if available
+ if (replicaHistoryMap != null) {
+ Map<UUID, ContainerReplicaHistory> replicaLastSeenMap =
+ replicaHistoryMap.get(containerID);
+ if (replicaLastSeenMap != null) {
+ Map<UUID, ContainerReplicaHistory> finalResMap = resMap;
+ replicaLastSeenMap.forEach((k, v) ->
+ finalResMap.merge(k, v, (old, latest) -> latest));
+ resMap = finalResMap;
+ }
+ }
+
+ // Finally, convert map to list for output
+ List<ContainerHistory> resList = new ArrayList<>();
+ for (Map.Entry<UUID, ContainerReplicaHistory> entry : resMap.entrySet()) {
+ final UUID uuid = entry.getKey();
+ String hostname = "N/A";
+ // Attempt to retrieve hostname from NODES table
+ if (nodeDB != null) {
+ try {
+ DatanodeDetails dnDetails = nodeDB.get(uuid);
+ if (dnDetails != null) {
+ hostname = dnDetails.getHostName();
+ }
+ } catch (IOException ex) {
+ LOG.debug("Unable to retrieve from NODES table of node {}. {}",
+ uuid, ex.getMessage());
+ }
+ }
+ final long firstSeenTime = entry.getValue().getFirstSeenTime();
+ final long lastSeenTime = entry.getValue().getLastSeenTime();
+ resList.add(new ContainerHistory(containerID, uuid.toString(), hostname,
+ firstSeenTime, lastSeenTime));
+ }
+ return resList;
}
public List<ContainerHistory> getLatestContainerHistory(long containerID,
int limit) {
- DSLContext dslContext = containerSchemaDefinition.getDSLContext();
- // Get container history sorted in descending order of last report
timestamp
- return dslContext.select()
- .from(CONTAINER_HISTORY)
- .where(CONTAINER_HISTORY.CONTAINER_ID.eq(containerID))
- .orderBy(CONTAINER_HISTORY.LAST_REPORT_TIMESTAMP.desc())
- .limit(limit)
- .fetchInto(ContainerHistory.class);
+ List<ContainerHistory> res = getAllContainerHistory(containerID);
+ res.sort(comparingLong(ContainerHistory::getLastSeenTime).reversed());
+ return res.stream().limit(limit).collect(Collectors.toList());
+ }
+
+ /**
+ * Should only be called once during ReconContainerManager init.
+ */
+ public void setReplicaHistoryMap(
+ Map<Long, Map<UUID, ContainerReplicaHistory>> replicaHistoryMap) {
+ this.replicaHistoryMap = replicaHistoryMap;
+ }
+
+ /**
+ * Should only be called once during ReconContainerManager init.
+ */
+ public void setScmDBStore(DBStore scmDBStore) {
+ this.scmDBStore = scmDBStore;
+ try {
+ this.nodeDB = ReconSCMDBDefinition.NODES.getTable(scmDBStore);
+ } catch (IOException ex) {
+ LOG.debug("Failed to get NODES table. {}", ex.getMessage());
+ }
+ }
+
+ /**
+ * Flush the container replica history in-memory map to DB.
+ * @param clearMap true to clear the in-memory map after flushing completes.
+ */
+ public void flushReplicaHistoryMapToDB(boolean clearMap) {
+ if (replicaHistoryMap == null) {
+ return;
+ }
+ synchronized (replicaHistoryMap) {
+ try {
+ for (Map.Entry<Long, Map<UUID, ContainerReplicaHistory>> entry :
+ replicaHistoryMap.entrySet()) {
+ final long containerId = entry.getKey();
+ final Map<UUID, ContainerReplicaHistory> map = entry.getValue();
+ dbServiceProvider.storeContainerReplicaHistoryMap(containerId, map);
Review comment:
Good idea. I'll try
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]