This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70edbb5  AMBARI-23541. Sometimes host checks never complete. 
(mpapirkovskyy)
70edbb5 is described below

commit 70edbb5cbfcd6c4ff83589650f715ae9c1b92790
Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org>
AuthorDate: Wed Apr 11 20:19:13 2018 +0300

    AMBARI-23541. Sometimes host checks never complete. (mpapirkovskyy)
---
 .../ambari/server/agent/AgentReportsProcessor.java | 61 +++++++++++-----------
 1 file changed, 31 insertions(+), 30 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
index 88c2665..ad5c6aa 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -17,11 +17,11 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration;
@@ -37,12 +37,15 @@ import com.google.inject.persist.UnitOfWork;
 public class AgentReportsProcessor {
   private static final Logger LOG = 
LoggerFactory.getLogger(AgentReportsProcessor.class);
 
-  private ScheduledExecutorService executor;
+  private final int poolSize;
 
-  private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new 
ConcurrentLinkedQueue<>();
+  private final List<ExecutorService> executors;
 
   public void addAgentReport(AgentReport agentReport) {
-    agentReportsQueue.add(agentReport);
+    int hash = agentReport.getHostName().hashCode();
+    hash = hash == Integer.MIN_VALUE ? 0 : hash;
+    int executorNumber = Math.abs(hash) % poolSize;
+    executors.get(executorNumber).execute(new 
AgentReportProcessingTask(agentReport));
   }
 
   @Inject
@@ -55,40 +58,38 @@ public class AgentReportsProcessor {
   public AgentReportsProcessor(Configuration configuration) {
 
     ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build();
-    int poolSize = configuration.getAgentsReportThreadPoolSize();
-    executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
-    for (int i=0; i< poolSize; i++) {
-      executor.scheduleAtFixedRate(new AgentReportProcessingTask(),
-          configuration.getAgentsReportProcessingStartTimeout(),
-          configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS);
+    poolSize = configuration.getAgentsReportThreadPoolSize();
+    executors = new ArrayList<>();
+    for (int i = 0; i < poolSize; i++) {
+      executors.add(Executors.newSingleThreadExecutor(threadFactory));
     }
   }
 
   private class AgentReportProcessingTask implements Runnable {
 
+    private final AgentReport agentReport;
+
+    public AgentReportProcessingTask(AgentReport agentReport) {
+      this.agentReport = agentReport;
+    }
+
     @Override
     public void run() {
       try {
         unitOfWork.begin();
-        while (true) {
-          AgentReport agentReport = agentReportsQueue.poll();
-          if (agentReport == null) {
-            break;
-          }
-          String hostName = agentReport.getHostName();
-          try {
-
-            //TODO rewrite with polymorphism usage.
-            if (agentReport.getCommandReports() != null) {
-              hh.handleCommandReportStatus(agentReport.getCommandReports(), 
hostName);
-            } else if (agentReport.getComponentStatuses() != null) {
-              
hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
-            } else if (agentReport.getHostStatusReport() != null) {
-              hh.handleHostReportStatus(agentReport.getHostStatusReport(), 
hostName);
-            }
-          } catch (AmbariException e) {
-            LOG.error("Error processing agent reports", e);
+        String hostName = agentReport.getHostName();
+        try {
+
+          //TODO rewrite with polymorphism usage.
+          if (agentReport.getCommandReports() != null) {
+            hh.handleCommandReportStatus(agentReport.getCommandReports(), 
hostName);
+          } else if (agentReport.getComponentStatuses() != null) {
+            hh.handleComponentReportStatus(agentReport.getComponentStatuses(), 
hostName);
+          } else if (agentReport.getHostStatusReport() != null) {
+            hh.handleHostReportStatus(agentReport.getHostStatusReport(), 
hostName);
           }
+        } catch (AmbariException e) {
+          LOG.error("Error processing agent reports", e);
         }
       } finally {
         unitOfWork.end();

-- 
To stop receiving notification emails like this one, please contact
mpapirkovs...@apache.org.

Reply via email to