This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fa8494cd1d HDDS-5331. Recon: Trigger PipelineSyncTask when DN becomes
stale and ContainerHealthTask when DN becomes dead. (#4011)
fa8494cd1d is described below
commit fa8494cd1d31f501985f6b62d7b2f3673ac3f705
Author: devmadhuu <[email protected]>
AuthorDate: Thu Dec 22 02:25:11 2022 +0530
HDDS-5331. Recon: Trigger PipelineSyncTask when DN becomes stale and
ContainerHealthTask when DN becomes dead. (#4011)
---
.../ozone/recon/fsck/ContainerHealthTask.java | 49 +++++++++++-------
.../hadoop/ozone/recon/scm/PipelineSyncTask.java | 33 ++++++++----
.../ozone/recon/scm/ReconDeadNodeHandler.java | 12 ++++-
.../ozone/recon/scm/ReconStaleNodeHandler.java | 58 ++++++++++++++++++++++
.../scm/ReconStorageContainerManagerFacade.java | 35 +++++++------
5 files changed, 143 insertions(+), 44 deletions(-)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 4badac672e..59ca2ffd1a 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
@@ -56,6 +58,8 @@ public class ContainerHealthTask extends ReconScmTask {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerHealthTask.class);
+ private ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
private StorageContainerServiceProvider scmClient;
private ContainerManager containerManager;
private ContainerHealthSchemaManager containerHealthSchemaManager;
@@ -79,26 +83,11 @@ public class ContainerHealthTask extends ReconScmTask {
}
@Override
- public synchronized void run() {
+ public void run() {
try {
while (canRun()) {
- wait(interval);
- long start = Time.monotonicNow();
- long currentTime = System.currentTimeMillis();
- long existingCount = processExistingDBRecords(currentTime);
- LOG.info("Container Health task thread took {} milliseconds to" +
- " process {} existing database records.",
- Time.monotonicNow() - start, existingCount);
- start = Time.monotonicNow();
- final List<ContainerInfo> containers =
containerManager.getContainers();
- containers.stream()
- .filter(c -> !processedContainers.contains(c))
- .forEach(c -> processContainer(c, currentTime));
- recordSingleRunCompletion();
- LOG.info("Container Health task thread took {} milliseconds for" +
- " processing {} containers.", Time.monotonicNow() - start,
- containers.size());
- processedContainers.clear();
+ triggerContainerHealthCheck();
+ Thread.sleep(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Missing Container task Thread.", t);
@@ -108,6 +97,30 @@ public class ContainerHealthTask extends ReconScmTask {
}
}
+ public void triggerContainerHealthCheck() {
+ lock.writeLock().lock();
+ try {
+ long start = Time.monotonicNow();
+ long currentTime = System.currentTimeMillis();
+ long existingCount = processExistingDBRecords(currentTime);
+ LOG.info("Container Health task thread took {} milliseconds to" +
+ " process {} existing database records.",
+ Time.monotonicNow() - start, existingCount);
+ start = Time.monotonicNow();
+ final List<ContainerInfo> containers = containerManager.getContainers();
+ containers.stream()
+ .filter(c -> !processedContainers.contains(c))
+ .forEach(c -> processContainer(c, currentTime));
+ recordSingleRunCompletion();
+ LOG.info("Container Health task thread took {} milliseconds for" +
+ " processing {} containers.", Time.monotonicNow() - start,
+ containers.size());
+ processedContainers.clear();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
private ContainerHealthStatus setCurrentContainer(long recordId)
throws ContainerNotFoundException {
ContainerInfo container =
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
index 43bad418b7..b802efcf48 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/PipelineSyncTask.java
@@ -22,6 +22,9 @@ import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -49,6 +52,8 @@ public class PipelineSyncTask extends ReconScmTask {
private StorageContainerServiceProvider scmClient;
private ReconPipelineManager reconPipelineManager;
private ReconNodeManager nodeManager;
+
+ private ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final long interval;
public PipelineSyncTask(ReconPipelineManager pipelineManager,
@@ -64,17 +69,11 @@ public class PipelineSyncTask extends ReconScmTask {
}
@Override
- protected synchronized void run() {
+ public void run() {
try {
while (canRun()) {
- long start = Time.monotonicNow();
- List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
- reconPipelineManager.initializePipelines(pipelinesFromScm);
- syncOperationalStateOnDeadNodes();
- LOG.info("Pipeline sync Thread took {} milliseconds.",
- Time.monotonicNow() - start);
- recordSingleRunCompletion();
- wait(interval);
+ triggerPipelineSyncTask();
+ Thread.sleep(interval);
}
} catch (Throwable t) {
LOG.error("Exception in Pipeline sync Thread.", t);
@@ -84,6 +83,22 @@ public class PipelineSyncTask extends ReconScmTask {
}
}
+ public void triggerPipelineSyncTask()
+ throws IOException, TimeoutException, NodeNotFoundException {
+ lock.writeLock().lock();
+ try {
+ long start = Time.monotonicNow();
+ List<Pipeline> pipelinesFromScm = scmClient.getPipelines();
+ reconPipelineManager.initializePipelines(pipelinesFromScm);
+ syncOperationalStateOnDeadNodes();
+ LOG.info("Pipeline sync Thread took {} milliseconds.",
+ Time.monotonicNow() - start);
+ recordSingleRunCompletion();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
/**
* For every dead node in Recon, update Operational state with that on SCM
* if different.
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
index bc1b43a8a4..b64244a124 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.recon.fsck.ContainerHealthTask;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,15 +41,20 @@ public class ReconDeadNodeHandler extends DeadNodeHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ReconDeadNodeHandler.class);
-
private StorageContainerServiceProvider scmClient;
+ private ContainerHealthTask containerHealthTask;
+ private PipelineSyncTask pipelineSyncTask;
public ReconDeadNodeHandler(NodeManager nodeManager,
PipelineManager pipelineManager,
ContainerManager containerManager,
- StorageContainerServiceProvider scmClient) {
+ StorageContainerServiceProvider scmClient,
+ ContainerHealthTask containerHealthTask,
+ PipelineSyncTask pipelineSyncTask) {
super(nodeManager, pipelineManager, containerManager);
this.scmClient = scmClient;
+ this.containerHealthTask = containerHealthTask;
+ this.pipelineSyncTask = pipelineSyncTask;
}
@Override
@@ -71,6 +77,8 @@ public class ReconDeadNodeHandler extends DeadNodeHandler {
LOG.warn("Node {} has reached DEAD state, but SCM does not have " +
"information about it.", datanodeDetails);
}
+ containerHealthTask.triggerContainerHealthCheck();
+ pipelineSyncTask.triggerPipelineSyncTask();
} catch (Exception ioEx) {
LOG.error("Error trying to verify Node operational state from SCM.",
ioEx);
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
new file mode 100644
index 0000000000..998f063924
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStaleNodeHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recon's handling of Stale node.
+ */
+public class ReconStaleNodeHandler extends StaleNodeHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconStaleNodeHandler.class);
+ private PipelineSyncTask pipelineSyncTask;
+
+ public ReconStaleNodeHandler(NodeManager nodeManager,
+ PipelineManager pipelineManager,
+ OzoneConfiguration conf,
+ PipelineSyncTask pipelineSyncTask) {
+ super(nodeManager, pipelineManager, conf);
+ this.pipelineSyncTask = pipelineSyncTask;
+ }
+
+ @Override
+ public void onMessage(final DatanodeDetails datanodeDetails,
+ final EventPublisher publisher) {
+ super.onMessage(datanodeDetails, publisher);
+ try {
+ pipelineSyncTask.triggerPipelineSyncTask();
+ } catch (Exception exp) {
+ LOG.error("Error trying to trigger pipeline sync task..",
+ exp);
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 51499a0d6c..b93e04cbdd 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -197,10 +197,26 @@ public class ReconStorageContainerManagerFacade
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, scmContext, conf);
+ ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
+ PipelineSyncTask pipelineSyncTask = new PipelineSyncTask(
+ pipelineManager,
+ nodeManager,
+ scmServiceProvider,
+ reconTaskStatusDao,
+ reconTaskConfig);
+ ContainerHealthTask containerHealthTask = new ContainerHealthTask(
+ containerManager,
+ scmServiceProvider,
+ reconTaskStatusDao, containerHealthSchemaManager,
+ containerPlacementPolicy,
+ reconTaskConfig);
+
StaleNodeHandler staleNodeHandler =
- new StaleNodeHandler(nodeManager, pipelineManager, conf);
+ new ReconStaleNodeHandler(nodeManager, pipelineManager,
+ conf, pipelineSyncTask);
DeadNodeHandler deadNodeHandler = new ReconDeadNodeHandler(nodeManager,
- pipelineManager, containerManager, scmServiceProvider);
+ pipelineManager, containerManager,
+ scmServiceProvider, containerHealthTask, pipelineSyncTask);
ContainerReportHandler containerReportHandler =
new ReconContainerReportHandler(nodeManager, containerManager);
@@ -268,19 +284,8 @@ public class ReconStorageContainerManagerFacade
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
- ReconTaskConfig reconTaskConfig = conf.getObject(ReconTaskConfig.class);
- reconScmTasks.add(new PipelineSyncTask(
- pipelineManager,
- nodeManager,
- scmServiceProvider,
- reconTaskStatusDao,
- reconTaskConfig));
- reconScmTasks.add(new ContainerHealthTask(
- containerManager,
- scmServiceProvider,
- reconTaskStatusDao, containerHealthSchemaManager,
- containerPlacementPolicy,
- reconTaskConfig));
+ reconScmTasks.add(pipelineSyncTask);
+ reconScmTasks.add(containerHealthTask);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]