This is an automated email from the ASF dual-hosted git repository.

jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 32cb1f8  TEZ-4048. Make proto history logger queue size configurable
32cb1f8 is described below

commit 32cb1f8e06b08ccd339b0972e0fe3f11859c024c
Author: Prasanth Jayachandran <[email protected]>
AuthorDate: Mon Mar 4 12:26:05 2019 -0600

    TEZ-4048. Make proto history logger queue size configurable
    
    Signed-off-by: Jonathan Eagles <[email protected]>
    (cherry picked from commit 8bc2e3703ed3682d48a3f781828a4391c8d14e3a)
---
 .../src/main/java/org/apache/tez/dag/api/TezConfiguration.java | 10 ++++++++++
 .../dag/history/logging/proto/ProtoHistoryLoggingService.java  |  9 ++++++---
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8360240..a206bb7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1526,6 +1526,16 @@ public class TezConfiguration extends Configuration {
   public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT 
= 60L;
 
   /**
+   * Int value. Maximum queue size for proto history event logger.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE =
+      TEZ_PREFIX + "history.logging.queue.size";
+  public static final int TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT = 
100000;
+
+
+  /**
    * Boolean value. Set this to true, if the underlying file system does not 
support flush (Ex: s3).
    * The dag submitted, initialized and started events are written into a file 
and closed. The rest
    * of the events are written into another file.
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index d2e0b4d..008b05d 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -52,8 +52,7 @@ public class ProtoHistoryLoggingService extends 
HistoryLoggingService {
       new HistoryEventProtoConverter();
   private boolean loggingDisabled = false;
 
-  private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<>(10000);
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
   private Thread eventHandlingThread;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -81,7 +80,11 @@ public class ProtoHistoryLoggingService extends 
HistoryLoggingService {
         TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
     splitDagStartEvents = 
conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
         TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
-    LOG.info("Inited ProtoHistoryLoggingService");
+    final int queueSize = 
conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE,
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT);
+    eventQueue = new LinkedBlockingQueue<>(queueSize);
+    LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {} 
splitDagStartEvents: {} queueSize: {}",
+        loggingDisabled, splitDagStartEvents, queueSize);
   }
 
   @Override

Reply via email to