aajisaka commented on a change in pull request #3793:
URL: https://github.com/apache/hadoop/pull/3793#discussion_r768404741



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -59,16 +65,79 @@ public TimelineServiceV1Publisher() {
   }
 
   private TimelineClient client;
+  private LinkedBlockingQueue<TimelineEntity> entityQueue;
+  private ExecutorService sendEventThreadPool;
+  private int dispatcherPoolSize;
+  private int dispatcherBatchSize;
+  private int putEventInterval;
+  private boolean isTimeLineServerBatchEnabled;
+  private volatile boolean stopped = false;
+  private PutEventThread putEventThread;
+  private Object sendEntityLock;
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    isTimeLineServerBatchEnabled =
+        conf.getBoolean(
+            YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+            
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
+    if (isTimeLineServerBatchEnabled) {
+      putEventInterval =
+          
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+              
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+              * 1000;
+      dispatcherPoolSize = conf.getInt(
+          YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+      dispatcherBatchSize = conf.getInt(
+          
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
+      putEventThread = new PutEventThread();
+      sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+      entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
+      sendEntityLock = new Object();
+      LOG.info("Timeline service v1 batch publishing enabled");
+    } else {
+      LOG.info("Timeline service v1 batch publishing disabled");
+    }
     client = TimelineClient.createTimelineClient();
     addIfService(client);
     super.serviceInit(conf);
     getDispatcher().register(SystemMetricsEventType.class,
         new TimelineV1EventHandler());
   }
 
+  protected void serviceStart() throws Exception {
+    if (isTimeLineServerBatchEnabled) {
+      stopped = false;
+      putEventThread.start();
+    }
+    super.serviceStart();
+  }
+
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    if (isTimeLineServerBatchEnabled) {
+      stopped = true;
+      putEventThread.interrupt();
+      try {
+        putEventThread.join();
+        SendEntity task = new SendEntity();
+        if (!task.buffer.isEmpty()) {
+          LOG.info(String.format("Initiating final putEntities, remaining 
entities left in entityQueue: %d", task.buffer.size()));

Review comment:
       This line can be simplified
   ```suggestion
             LOG.info("Initiating final putEntities, remaining entities left in 
entityQueue: {}", task.buffer.size());
   ```

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -374,16 +443,62 @@ private static TimelineEntity 
createContainerEntity(ContainerId containerId) {
   }
 
   private void putEntity(TimelineEntity entity) {
-    try {
+    if (isTimeLineServerBatchEnabled) {
+      try {
+        entityQueue.put(entity);
+        if (entityQueue.size() > dispatcherBatchSize) {
+          SendEntity task = null;
+          synchronized (sendEntityLock) {
+            if (entityQueue.size() > dispatcherBatchSize) {
+              task = new SendEntity();
+            }
+          }
+          if (task != null) {
+            sendEventThreadPool.submit(task);
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity batch  [ " + 
entity.getEntityType() + ","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+    else {

Review comment:
       ```suggestion
       } else {
   ```

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -374,16 +443,62 @@ private static TimelineEntity 
createContainerEntity(ContainerId containerId) {
   }
 
   private void putEntity(TimelineEntity entity) {
-    try {
+    if (isTimeLineServerBatchEnabled) {
+      try {
+        entityQueue.put(entity);
+        if (entityQueue.size() > dispatcherBatchSize) {
+          SendEntity task = null;
+          synchronized (sendEntityLock) {
+            if (entityQueue.size() > dispatcherBatchSize) {
+              task = new SendEntity();
+            }
+          }
+          if (task != null) {
+            sendEventThreadPool.submit(task);
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity batch  [ " + 
entity.getEntityType() + ","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+    else {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Publishing the entity " + entity.getEntityId()
+              + ", JSON-style content: "
+              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        }
+        client.putEntities(entity);
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity [ " + entity.getEntityType() + 
","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+  }
+
+  private class SendEntity implements Runnable {
+
+    private ArrayList<TimelineEntity> buffer;
+
+    public SendEntity(){
+      buffer = new ArrayList();
+      entityQueue.drainTo(buffer);
+    }
+
+    @Override
+    public void run() {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Publishing the entity " + entity.getEntityId()
-            + ", JSON-style content: "
-            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        LOG.debug(String.format("Number of timeline entities being sent in 
batch: %d", buffer.size()));

Review comment:
       ```suggestion
           LOG.debug("Number of timeline entities being sent in batch: {}", 
buffer.size());
   ```

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -374,16 +443,62 @@ private static TimelineEntity 
createContainerEntity(ContainerId containerId) {
   }
 
   private void putEntity(TimelineEntity entity) {
-    try {
+    if (isTimeLineServerBatchEnabled) {
+      try {
+        entityQueue.put(entity);
+        if (entityQueue.size() > dispatcherBatchSize) {
+          SendEntity task = null;
+          synchronized (sendEntityLock) {
+            if (entityQueue.size() > dispatcherBatchSize) {
+              task = new SendEntity();
+            }
+          }
+          if (task != null) {
+            sendEventThreadPool.submit(task);
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity batch  [ " + 
entity.getEntityType() + ","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+    else {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Publishing the entity " + entity.getEntityId()
+              + ", JSON-style content: "
+              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        }
+        client.putEntities(entity);
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity [ " + entity.getEntityType() + 
","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+  }
+
+  private class SendEntity implements Runnable {
+
+    private ArrayList<TimelineEntity> buffer;
+
+    public SendEntity(){

Review comment:
       Would you add a whitespace between `()` and `{`?

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -408,4 +523,48 @@ public void handle(TimelineV1PublishEvent event) {
       putEntity(event.getEntity());
     }
   }
+
+  private class PutEventThread extends Thread {
+    public PutEventThread() {
+      super("PutEventThread");
+    }
+
+    @Override
+    public void run() {
+      LOG.info("System metrics publisher will put events every " +
+          String.valueOf(putEventInterval) + " milliseconds");
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        if (System.currentTimeMillis() % putEventInterval >= 1000) {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            LOG.warn(SystemMetricsPublisher.class.getName()
+                + " is interrupted. Exiting.");
+            break;
+          }
+          continue;
+        }
+        SendEntity task = null;
+        synchronized (sendEntityLock) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating SendEntity task in PutEventThread");
+          }
+          task = new SendEntity();
+        }
+        if (task != null) {
+          sendEventThreadPool.submit(task);
+        }
+        try {
+          // sleep added to avoid multiple SendEntity task within a single 
interval.
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.warn(SystemMetricsPublisher.class.getName()
+              + " is interrupted. Exiting.");
+          break;
+        }
+      }
+    }
+  }
 }
+
+

Review comment:
       These changes are not needed

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -59,16 +65,79 @@ public TimelineServiceV1Publisher() {
   }
 
   private TimelineClient client;
+  private LinkedBlockingQueue<TimelineEntity> entityQueue;
+  private ExecutorService sendEventThreadPool;
+  private int dispatcherPoolSize;
+  private int dispatcherBatchSize;
+  private int putEventInterval;
+  private boolean isTimeLineServerBatchEnabled;
+  private volatile boolean stopped = false;
+  private PutEventThread putEventThread;
+  private Object sendEntityLock;
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    isTimeLineServerBatchEnabled =
+        conf.getBoolean(
+            YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+            
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
+    if (isTimeLineServerBatchEnabled) {
+      putEventInterval =
+          
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+              
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+              * 1000;
+      dispatcherPoolSize = conf.getInt(
+          YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+      dispatcherBatchSize = conf.getInt(
+          
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);

Review comment:
       Would you validate these parameters in case they are set to negative 
value?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to