This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 522d312  [GOBBLIN-1423] Add config to Event reporter to control queue 
capacity…
522d312 is described below

commit 522d31205cba743209437976946adf7bf0c4f1be
Author: vbohra <[email protected]>
AuthorDate: Fri Apr 16 14:23:25 2021 -0700

    [GOBBLIN-1423] Add config to Event reporter to control queue capacity…
    
    Closes #3259 from vikrambohra/GOBBLIN-1423
---
 .../gobblin/metrics/reporter/EventReporter.java    | 38 +++++++++++++++++++---
 .../gobblin/metrics/KafkaReportingFormats.java     |  9 +++--
 .../gobblin/metrics/kafka/KafkaEventReporter.java  |  9 +++--
 .../reporter/KeyValueEventObjectReporter.java      | 11 +------
 .../metrics/reporter/KafkaEventReporterTest.java   | 25 +++++++++++++-
 5 files changed, 69 insertions(+), 23 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 70e581c..83c6a33 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.metrics.reporter;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
@@ -45,11 +46,15 @@ import com.codahale.metrics.Timer;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.notification.EventNotification;
@@ -74,14 +79,28 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
   protected static final String METRIC_KEY_PREFIX = "gobblin.metrics";
   protected static final String EVENTS_QUALIFIER = "events";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(EventReporter.class);
-  private static final int QUEUE_CAPACITY = 100;
+  public static final int DEFAULT_QUEUE_CAPACITY = 100;
+  public static final String QUEUE_CAPACITY_KEY = 
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + 
".queue.capacity";
+  public static final int DEFAULT_QUEUE_OFFER_TIMEOUT_SECS = 10;
+  public static final String QUEUE_OFFER_TIMOUT_SECS_KEY = 
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX + 
".queue.offer.timeout.secs";
   private static final String NULL_STRING = "null";
 
   private final MetricContext metricContext;
   private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
+  @Getter
+  private final int queueCapacity;
+  @Getter
+  private final int queueOfferTimeoutSecs;
   private final ExecutorService immediateReportExecutor;
   private final UUID notificationTargetKey;
   protected final Closer closer;
+  protected final Config config;
+  private static final Config FALLBACK = ConfigFactory.parseMap(
+      ImmutableMap.<String, Object>builder()
+          .put(QUEUE_CAPACITY_KEY, DEFAULT_QUEUE_CAPACITY)
+          .put(QUEUE_OFFER_TIMOUT_SECS_KEY, DEFAULT_QUEUE_OFFER_TIMEOUT_SECS)
+          .build());
+
 
   public EventReporter(Builder builder) {
     super(builder.context, builder.name, builder.filter, builder.rateUnit, 
builder.durationUnit);
@@ -101,7 +120,11 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
         return null;
       }
     });
-    this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
+
+    this.config = builder.config.withFallback(FALLBACK);
+    this.queueCapacity = this.config.getInt(QUEUE_CAPACITY_KEY);
+    this.queueOfferTimeoutSecs = 
this.config.getInt(QUEUE_OFFER_TIMOUT_SECS_KEY);
+    this.reportingQueue = Queues.newLinkedBlockingQueue(this.queueCapacity);
   }
 
   /**
@@ -120,11 +143,11 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
    * @param event {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to 
add to queue.
    */
   public void addEventToReportingQueue(GobblinTrackingEvent event) {
-    if (this.reportingQueue.size() > QUEUE_CAPACITY * 2 / 3) {
+    if (this.reportingQueue.size() > this.queueCapacity * 2 / 3) {
       immediatelyScheduleReport();
     }
     try {
-      if (!this.reportingQueue.offer(sanitizeEvent(event), 10, 
TimeUnit.SECONDS)) {
+      if (!this.reportingQueue.offer(sanitizeEvent(event), 
this.queueOfferTimeoutSecs, TimeUnit.SECONDS)) {
         log.error("Enqueuing of event {} at reporter with class {} timed out. 
Sending of events is probably stuck.",
             event, this.getClass().getCanonicalName());
       }
@@ -189,6 +212,7 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
     protected MetricFilter filter;
     protected TimeUnit rateUnit;
     protected TimeUnit durationUnit;
+    protected Config config;
 
     protected Builder(MetricContext context) {
       this.context = context;
@@ -196,6 +220,12 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
       this.rateUnit = TimeUnit.SECONDS;
       this.durationUnit = TimeUnit.MILLISECONDS;
       this.filter = MetricFilter.ALL;
+      this.config = ConfigFactory.empty();
+    }
+
+    public T withConfig(Config config) {
+      this.config = (config == null) ? ConfigFactory.empty() : config;
+      return self();
     }
 
     protected abstract T self();
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index 094bb83..2186911 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -82,12 +82,13 @@ public enum KafkaReportingFormats {
       builder.withPusherClassName(pusherClassName);
 
       Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      builder.withConfig(allConfig);
       // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
       // kafka config
       Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
           .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
 
-      builder.withConfig(kafkaConfig);
+      builder.withKafkaConfig(kafkaConfig);
 
       return builder.build(brokers, topic);
     }
@@ -124,12 +125,13 @@ public enum KafkaReportingFormats {
       builder.withPusherClassName(pusherClassName);
 
       Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      builder.withConfig(allConfig);
       // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
       // kafka config
       Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
           .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
 
-      builder.withConfig(kafkaConfig);
+      builder.withKafkaConfig(kafkaConfig);
 
       return builder.build(brokers, topic);
     }
@@ -153,12 +155,13 @@ public enum KafkaReportingFormats {
       builder.withPusherClassName(pusherClassName);
 
       Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      builder.withConfig(allConfig);
       // the kafka configuration is composed of the metrics reporting specific 
keys with a fallback to the shared
       // kafka config
       Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig, 
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
           .withFallback(ConfigUtils.getConfigOrEmpty(allConfig, 
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
 
-      builder.withConfig(kafkaConfig);
+      builder.withKafkaConfig(kafkaConfig);
 
       return builder.build(brokers, topic);
     }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
index 8c5d3be..061c1da 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -33,7 +33,6 @@ import 
org.apache.gobblin.metrics.reporter.util.AvroSerializer;
 import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 
-
 /**
  * Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
  */
@@ -52,7 +51,7 @@ public class KafkaEventReporter extends EventReporter {
       this.kafkaPusher = builder.kafkaPusher.get();
     } else {
         String pusherClassName = 
builder.pusherClassName.or(PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
-        this.kafkaPusher = PusherUtils.getPusher(pusherClassName, 
builder.brokers, builder.topic, builder.config);
+        this.kafkaPusher = PusherUtils.getPusher(pusherClassName, 
builder.brokers, builder.topic, builder.kafkaConfig);
     }
     this.closer.register(this.kafkaPusher);
   }
@@ -122,7 +121,7 @@ public class KafkaEventReporter extends EventReporter {
     protected String brokers;
     protected String topic;
     protected Optional<Pusher> kafkaPusher;
-    protected Optional<Config> config = Optional.absent();
+    protected Optional<Config> kafkaConfig = Optional.absent();
     protected Optional<String> pusherClassName = Optional.absent();
 
     protected Builder(MetricContext context) {
@@ -141,8 +140,8 @@ public class KafkaEventReporter extends EventReporter {
     /**
      * Set additional configuration.
      */
-    public T withConfig(Config config) {
-      this.config = Optional.of(config);
+    public T withKafkaConfig(Config config) {
+      this.kafkaConfig = Optional.of(config);
       return self();
     }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
index b97e48c..cd81d61 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -63,7 +63,7 @@ public class KeyValueEventObjectReporter extends 
EventReporter {
   public KeyValueEventObjectReporter(Builder builder) {
     super(builder);
 
-    Config config = builder.config.get();
+    Config config = builder.config;
     Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
     String pusherClassName =
         ConfigUtils.getString(config, PUSHER_CLASS, 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
@@ -132,7 +132,6 @@ public class KeyValueEventObjectReporter extends 
EventReporter {
 
     protected String brokers;
     protected String topic;
-    protected Optional<Config> config = Optional.absent();
     protected Optional<Map<String, String>> namespaceOverride = 
Optional.absent();
 
     public Builder(MetricContext context) {
@@ -144,14 +143,6 @@ public class KeyValueEventObjectReporter extends 
EventReporter {
       return this;
     }
 
-    /**
-     * Set additional configuration.
-     */
-    public Builder withConfig(Config config) {
-      this.config = Optional.of(config);
-      return self();
-    }
-
     public Builder namespaceOverride(Optional<Map<String, String>> 
namespaceOverride) {
       this.namespaceOverride = namespaceOverride;
       return self();
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
index add42f4..1ca9f57 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
@@ -24,7 +24,10 @@ import java.util.Map;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -54,7 +57,6 @@ public class KafkaEventReporterTest {
 
     MockKafkaPusher pusher = new MockKafkaPusher();
     KafkaEventReporter kafkaReporter = getBuilder(context, 
pusher).build("localhost:0000", "topic");
-
     String namespace = "gobblin.metrics.test";
     String eventName = "testEvent";
 
@@ -136,6 +138,27 @@ public class KafkaEventReporterTest {
     Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
   }
 
+  @Test
+  public void testEventReporterConfigs() throws IOException {
+
+    MetricContext context = MetricContext.builder("context").build();
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, 
pusher).build("localhost:0000", "topic");
+    Assert.assertEquals(kafkaReporter.getQueueCapacity(), 
EventReporter.DEFAULT_QUEUE_CAPACITY);
+    Assert.assertEquals(kafkaReporter.getQueueOfferTimeoutSecs(), 
EventReporter.DEFAULT_QUEUE_OFFER_TIMEOUT_SECS);
+
+    Config config = ConfigFactory.parseMap(
+        ImmutableMap.<String, Object>builder()
+            .put(EventReporter.QUEUE_CAPACITY_KEY, 200)
+            .put(EventReporter.QUEUE_OFFER_TIMOUT_SECS_KEY, 5)
+            .build());
+
+    kafkaReporter = getBuilder(context, 
pusher).withConfig(config).build("localhost:0000", "topic");
+    Assert.assertEquals(kafkaReporter.getQueueCapacity(), 200);
+    Assert.assertEquals(kafkaReporter.getQueueOfferTimeoutSecs(), 5);
+  }
+
   /**
    * Extract the next metric from the Kafka iterator
    * Assumes existence of the metric has already been checked.

Reply via email to