This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 32cb1f8 TEZ-4048. Make proto history logger queue size configurable
32cb1f8 is described below
commit 32cb1f8e06b08ccd339b0972e0fe3f11859c024c
Author: Prasanth Jayachandran <[email protected]>
AuthorDate: Mon Mar 4 12:26:05 2019 -0600
TEZ-4048. Make proto history logger queue size configurable
Signed-off-by: Jonathan Eagles <[email protected]>
(cherry picked from commit 8bc2e3703ed3682d48a3f781828a4391c8d14e3a)
---
.../src/main/java/org/apache/tez/dag/api/TezConfiguration.java | 10 ++++++++++
.../dag/history/logging/proto/ProtoHistoryLoggingService.java | 9 ++++++---
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8360240..a206bb7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1526,6 +1526,16 @@ public class TezConfiguration extends Configuration {
public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT
= 60L;
/**
+ * Int value. Maximum queue size for proto history event logger.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE =
+ TEZ_PREFIX + "history.logging.queue.size";
+ public static final int TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT =
100000;
+
+
+ /**
* Boolean value. Set this to true, if the underlying file system does not
support flush (Ex: s3).
* The dag submitted, initialized and started events are written into a file
and closed. The rest
* of the events are written into another file.
diff --git
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index d2e0b4d..008b05d 100644
---
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -52,8 +52,7 @@ public class ProtoHistoryLoggingService extends
HistoryLoggingService {
new HistoryEventProtoConverter();
private boolean loggingDisabled = false;
- private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
- new LinkedBlockingQueue<>(10000);
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
private Thread eventHandlingThread;
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -81,7 +80,11 @@ public class ProtoHistoryLoggingService extends
HistoryLoggingService {
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
splitDagStartEvents =
conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
- LOG.info("Inited ProtoHistoryLoggingService");
+ final int queueSize =
conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE,
+ TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT);
+ eventQueue = new LinkedBlockingQueue<>(queueSize);
+ LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {}
splitDagStartEvents: {} queueSize: {}",
+ loggingDisabled, splitDagStartEvents, queueSize);
}
@Override