This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70edbb5 AMBARI-23541. Sometimes host checks never complete.
(mpapirkovskyy)
70edbb5 is described below
commit 70edbb5cbfcd6c4ff83589650f715ae9c1b92790
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Wed Apr 11 20:19:13 2018 +0300
AMBARI-23541. Sometimes host checks never complete. (mpapirkovskyy)
---
.../ambari/server/agent/AgentReportsProcessor.java | 61 +++++++++++-----------
1 file changed, 31 insertions(+), 30 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
index 88c2665..ad5c6aa 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -17,11 +17,11 @@
*/
package org.apache.ambari.server.agent;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.configuration.Configuration;
@@ -37,12 +37,15 @@ import com.google.inject.persist.UnitOfWork;
public class AgentReportsProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(AgentReportsProcessor.class);
- private ScheduledExecutorService executor;
+ private final int poolSize;
- private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new
ConcurrentLinkedQueue<>();
+ private final List<ExecutorService> executors;
public void addAgentReport(AgentReport agentReport) {
- agentReportsQueue.add(agentReport);
+ int hash = agentReport.getHostName().hashCode();
+ hash = hash == Integer.MIN_VALUE ? 0 : hash;
+ int executorNumber = Math.abs(hash) % poolSize;
+ executors.get(executorNumber).execute(new
AgentReportProcessingTask(agentReport));
}
@Inject
@@ -55,40 +58,38 @@ public class AgentReportsProcessor {
public AgentReportsProcessor(Configuration configuration) {
ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build();
- int poolSize = configuration.getAgentsReportThreadPoolSize();
- executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
- for (int i=0; i< poolSize; i++) {
- executor.scheduleAtFixedRate(new AgentReportProcessingTask(),
- configuration.getAgentsReportProcessingStartTimeout(),
- configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS);
+ poolSize = configuration.getAgentsReportThreadPoolSize();
+ executors = new ArrayList<>();
+ for (int i = 0; i < poolSize; i++) {
+ executors.add(Executors.newSingleThreadExecutor(threadFactory));
}
}
private class AgentReportProcessingTask implements Runnable {
+ private final AgentReport agentReport;
+
+ public AgentReportProcessingTask(AgentReport agentReport) {
+ this.agentReport = agentReport;
+ }
+
@Override
public void run() {
try {
unitOfWork.begin();
- while (true) {
- AgentReport agentReport = agentReportsQueue.poll();
- if (agentReport == null) {
- break;
- }
- String hostName = agentReport.getHostName();
- try {
-
- //TODO rewrite with polymorphism usage.
- if (agentReport.getCommandReports() != null) {
- hh.handleCommandReportStatus(agentReport.getCommandReports(),
hostName);
- } else if (agentReport.getComponentStatuses() != null) {
-
hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
- } else if (agentReport.getHostStatusReport() != null) {
- hh.handleHostReportStatus(agentReport.getHostStatusReport(),
hostName);
- }
- } catch (AmbariException e) {
- LOG.error("Error processing agent reports", e);
+ String hostName = agentReport.getHostName();
+ try {
+
+ //TODO rewrite with polymorphism usage.
+ if (agentReport.getCommandReports() != null) {
+ hh.handleCommandReportStatus(agentReport.getCommandReports(),
hostName);
+ } else if (agentReport.getComponentStatuses() != null) {
+ hh.handleComponentReportStatus(agentReport.getComponentStatuses(),
hostName);
+ } else if (agentReport.getHostStatusReport() != null) {
+ hh.handleHostReportStatus(agentReport.getHostStatusReport(),
hostName);
}
+ } catch (AmbariException e) {
+ LOG.error("Error processing agent reports", e);
}
} finally {
unitOfWork.end();
--
To stop receiving notification emails like this one, please contact
[email protected].