ashutoshcipher commented on a change in pull request #3793:
URL: https://github.com/apache/hadoop/pull/3793#discussion_r770301824
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -59,16 +65,79 @@ public TimelineServiceV1Publisher() {
}
private TimelineClient client;
+ private LinkedBlockingQueue<TimelineEntity> entityQueue;
+ private ExecutorService sendEventThreadPool;
+ private int dispatcherPoolSize;
+ private int dispatcherBatchSize;
+ private int putEventInterval;
+ private boolean isTimeLineServerBatchEnabled;
+ private volatile boolean stopped = false;
+ private PutEventThread putEventThread;
+ private Object sendEntityLock;
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ isTimeLineServerBatchEnabled =
+ conf.getBoolean(
+ YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
+ if (isTimeLineServerBatchEnabled) {
+ putEventInterval =
+
conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+
YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+ * 1000;
+ dispatcherPoolSize = conf.getInt(
+ YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+ dispatcherBatchSize = conf.getInt(
+
YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+ YarnConfiguration.
+ DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
+ putEventThread = new PutEventThread();
+ sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+ entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
+ sendEntityLock = new Object();
+ LOG.info("Timeline service v1 batch publishing enabled");
+ } else {
+ LOG.info("Timeline service v1 batch publishing disabled");
+ }
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
getDispatcher().register(SystemMetricsEventType.class,
new TimelineV1EventHandler());
}
+ protected void serviceStart() throws Exception {
+ if (isTimeLineServerBatchEnabled) {
+ stopped = false;
+ putEventThread.start();
+ }
+ super.serviceStart();
+ }
+
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ if (isTimeLineServerBatchEnabled) {
+ stopped = true;
+ putEventThread.interrupt();
+ try {
+ putEventThread.join();
+ SendEntity task = new SendEntity();
+ if (!task.buffer.isEmpty()) {
+ LOG.info(String.format("Initiating final putEntities, remaining
entities left in entityQueue: %d", task.buffer.size()));
Review comment:
Will do
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -374,16 +443,62 @@ private static TimelineEntity
createContainerEntity(ContainerId containerId) {
}
private void putEntity(TimelineEntity entity) {
- try {
+ if (isTimeLineServerBatchEnabled) {
+ try {
+ entityQueue.put(entity);
+ if (entityQueue.size() > dispatcherBatchSize) {
+ SendEntity task = null;
+ synchronized (sendEntityLock) {
+ if (entityQueue.size() > dispatcherBatchSize) {
+ task = new SendEntity();
+ }
+ }
+ if (task != null) {
+ sendEventThreadPool.submit(task);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity batch [ " +
entity.getEntityType() + ","
+ + entity.getEntityId() + " ] ", e);
+ }
+ }
+ else {
Review comment:
Will do
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
##########
@@ -374,16 +443,62 @@ private static TimelineEntity
createContainerEntity(ContainerId containerId) {
}
private void putEntity(TimelineEntity entity) {
- try {
+ if (isTimeLineServerBatchEnabled) {
+ try {
+ entityQueue.put(entity);
+ if (entityQueue.size() > dispatcherBatchSize) {
+ SendEntity task = null;
+ synchronized (sendEntityLock) {
+ if (entityQueue.size() > dispatcherBatchSize) {
+ task = new SendEntity();
+ }
+ }
+ if (task != null) {
+ sendEventThreadPool.submit(task);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity batch [ " +
entity.getEntityType() + ","
+ + entity.getEntityId() + " ] ", e);
+ }
+ }
+ else {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Publishing the entity " + entity.getEntityId()
+ + ", JSON-style content: "
+ + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+ client.putEntities(entity);
+ } catch (Exception e) {
+ LOG.error("Error when publishing entity [ " + entity.getEntityType() +
","
+ + entity.getEntityId() + " ] ", e);
+ }
+ }
+ }
+
+ private class SendEntity implements Runnable {
+
+ private ArrayList<TimelineEntity> buffer;
+
+ public SendEntity(){
Review comment:
yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]