This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 522d312 [GOBBLIN-1423] Add config to Event reporter to control queue
capacity…
522d312 is described below
commit 522d31205cba743209437976946adf7bf0c4f1be
Author: vbohra <[email protected]>
AuthorDate: Fri Apr 16 14:23:25 2021 -0700
[GOBBLIN-1423] Add config to Event reporter to control queue capacity…
Closes #3259 from vikrambohra/GOBBLIN-1423
---
.../gobblin/metrics/reporter/EventReporter.java | 38 +++++++++++++++++++---
.../gobblin/metrics/KafkaReportingFormats.java | 9 +++--
.../gobblin/metrics/kafka/KafkaEventReporter.java | 9 +++--
.../reporter/KeyValueEventObjectReporter.java | 11 +------
.../metrics/reporter/KafkaEventReporterTest.java | 25 +++++++++++++-
5 files changed, 69 insertions(+), 23 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 70e581c..83c6a33 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.metrics.reporter;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
@@ -45,11 +46,15 @@ import com.codahale.metrics.Timer;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.notification.EventNotification;
@@ -74,14 +79,28 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
protected static final String METRIC_KEY_PREFIX = "gobblin.metrics";
protected static final String EVENTS_QUALIFIER = "events";
private static final Logger LOGGER =
LoggerFactory.getLogger(EventReporter.class);
- private static final int QUEUE_CAPACITY = 100;
+ public static final int DEFAULT_QUEUE_CAPACITY = 100;
+ public static final String QUEUE_CAPACITY_KEY =
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX +
".queue.capacity";
+ public static final int DEFAULT_QUEUE_OFFER_TIMEOUT_SECS = 10;
+ public static final String QUEUE_OFFER_TIMOUT_SECS_KEY =
ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX +
".queue.offer.timeout.secs";
private static final String NULL_STRING = "null";
private final MetricContext metricContext;
private final BlockingQueue<GobblinTrackingEvent> reportingQueue;
+ @Getter
+ private final int queueCapacity;
+ @Getter
+ private final int queueOfferTimeoutSecs;
private final ExecutorService immediateReportExecutor;
private final UUID notificationTargetKey;
protected final Closer closer;
+ protected final Config config;
+ private static final Config FALLBACK = ConfigFactory.parseMap(
+ ImmutableMap.<String, Object>builder()
+ .put(QUEUE_CAPACITY_KEY, DEFAULT_QUEUE_CAPACITY)
+ .put(QUEUE_OFFER_TIMOUT_SECS_KEY, DEFAULT_QUEUE_OFFER_TIMEOUT_SECS)
+ .build());
+
public EventReporter(Builder builder) {
super(builder.context, builder.name, builder.filter, builder.rateUnit,
builder.durationUnit);
@@ -101,7 +120,11 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
return null;
}
});
- this.reportingQueue = Queues.newLinkedBlockingQueue(QUEUE_CAPACITY);
+
+ this.config = builder.config.withFallback(FALLBACK);
+ this.queueCapacity = this.config.getInt(QUEUE_CAPACITY_KEY);
+ this.queueOfferTimeoutSecs =
this.config.getInt(QUEUE_OFFER_TIMOUT_SECS_KEY);
+ this.reportingQueue = Queues.newLinkedBlockingQueue(this.queueCapacity);
}
/**
@@ -120,11 +143,11 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
* @param event {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to
add to queue.
*/
public void addEventToReportingQueue(GobblinTrackingEvent event) {
- if (this.reportingQueue.size() > QUEUE_CAPACITY * 2 / 3) {
+ if (this.reportingQueue.size() > this.queueCapacity * 2 / 3) {
immediatelyScheduleReport();
}
try {
- if (!this.reportingQueue.offer(sanitizeEvent(event), 10,
TimeUnit.SECONDS)) {
+ if (!this.reportingQueue.offer(sanitizeEvent(event),
this.queueOfferTimeoutSecs, TimeUnit.SECONDS)) {
log.error("Enqueuing of event {} at reporter with class {} timed out.
Sending of events is probably stuck.",
event, this.getClass().getCanonicalName());
}
@@ -189,6 +212,7 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
protected MetricFilter filter;
protected TimeUnit rateUnit;
protected TimeUnit durationUnit;
+ protected Config config;
protected Builder(MetricContext context) {
this.context = context;
@@ -196,6 +220,12 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
this.rateUnit = TimeUnit.SECONDS;
this.durationUnit = TimeUnit.MILLISECONDS;
this.filter = MetricFilter.ALL;
+ this.config = ConfigFactory.empty();
+ }
+
+ public T withConfig(Config config) {
+ this.config = (config == null) ? ConfigFactory.empty() : config;
+ return self();
}
protected abstract T self();
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
index 094bb83..2186911 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
@@ -82,12 +82,13 @@ public enum KafkaReportingFormats {
builder.withPusherClassName(pusherClassName);
Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ builder.withConfig(allConfig);
// the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
// kafka config
Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
.withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
- builder.withConfig(kafkaConfig);
+ builder.withKafkaConfig(kafkaConfig);
return builder.build(brokers, topic);
}
@@ -124,12 +125,13 @@ public enum KafkaReportingFormats {
builder.withPusherClassName(pusherClassName);
Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ builder.withConfig(allConfig);
// the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
// kafka config
Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
.withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
- builder.withConfig(kafkaConfig);
+ builder.withKafkaConfig(kafkaConfig);
return builder.build(brokers, topic);
}
@@ -153,12 +155,13 @@ public enum KafkaReportingFormats {
builder.withPusherClassName(pusherClassName);
Config allConfig = ConfigUtils.propertiesToConfig(properties);
+ builder.withConfig(allConfig);
// the kafka configuration is composed of the metrics reporting specific
keys with a fallback to the shared
// kafka config
Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX)
.withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
- builder.withConfig(kafkaConfig);
+ builder.withKafkaConfig(kafkaConfig);
return builder.build(brokers, topic);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
index 8c5d3be..061c1da 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventReporter.java
@@ -33,7 +33,6 @@ import
org.apache.gobblin.metrics.reporter.util.AvroSerializer;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
-
/**
* Reports {@link GobblinTrackingEvent} to a Kafka topic serialized as JSON.
*/
@@ -52,7 +51,7 @@ public class KafkaEventReporter extends EventReporter {
this.kafkaPusher = builder.kafkaPusher.get();
} else {
String pusherClassName =
builder.pusherClassName.or(PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
- this.kafkaPusher = PusherUtils.getPusher(pusherClassName,
builder.brokers, builder.topic, builder.config);
+ this.kafkaPusher = PusherUtils.getPusher(pusherClassName,
builder.brokers, builder.topic, builder.kafkaConfig);
}
this.closer.register(this.kafkaPusher);
}
@@ -122,7 +121,7 @@ public class KafkaEventReporter extends EventReporter {
protected String brokers;
protected String topic;
protected Optional<Pusher> kafkaPusher;
- protected Optional<Config> config = Optional.absent();
+ protected Optional<Config> kafkaConfig = Optional.absent();
protected Optional<String> pusherClassName = Optional.absent();
protected Builder(MetricContext context) {
@@ -141,8 +140,8 @@ public class KafkaEventReporter extends EventReporter {
/**
* Set additional configuration.
*/
- public T withConfig(Config config) {
- this.config = Optional.of(config);
+ public T withKafkaConfig(Config config) {
+ this.kafkaConfig = Optional.of(config);
return self();
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
index b97e48c..cd81d61 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -63,7 +63,7 @@ public class KeyValueEventObjectReporter extends
EventReporter {
public KeyValueEventObjectReporter(Builder builder) {
super(builder);
- Config config = builder.config.get();
+ Config config = builder.config;
Config pusherConfig = ConfigUtils.getConfigOrEmpty(config,
PUSHER_CONFIG).withFallback(config);
String pusherClassName =
ConfigUtils.getString(config, PUSHER_CLASS,
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
@@ -132,7 +132,6 @@ public class KeyValueEventObjectReporter extends
EventReporter {
protected String brokers;
protected String topic;
- protected Optional<Config> config = Optional.absent();
protected Optional<Map<String, String>> namespaceOverride =
Optional.absent();
public Builder(MetricContext context) {
@@ -144,14 +143,6 @@ public class KeyValueEventObjectReporter extends
EventReporter {
return this;
}
- /**
- * Set additional configuration.
- */
- public Builder withConfig(Config config) {
- this.config = Optional.of(config);
- return self();
- }
-
public Builder namespaceOverride(Optional<Map<String, String>>
namespaceOverride) {
this.namespaceOverride = namespaceOverride;
return self();
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
index add42f4..1ca9f57 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
@@ -24,7 +24,10 @@ import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -54,7 +57,6 @@ public class KafkaEventReporterTest {
MockKafkaPusher pusher = new MockKafkaPusher();
KafkaEventReporter kafkaReporter = getBuilder(context,
pusher).build("localhost:0000", "topic");
-
String namespace = "gobblin.metrics.test";
String eventName = "testEvent";
@@ -136,6 +138,27 @@ public class KafkaEventReporterTest {
Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
}
+ @Test
+ public void testEventReporterConfigs() throws IOException {
+
+ MetricContext context = MetricContext.builder("context").build();
+
+ MockKafkaPusher pusher = new MockKafkaPusher();
+ KafkaEventReporter kafkaReporter = getBuilder(context,
pusher).build("localhost:0000", "topic");
+ Assert.assertEquals(kafkaReporter.getQueueCapacity(),
EventReporter.DEFAULT_QUEUE_CAPACITY);
+ Assert.assertEquals(kafkaReporter.getQueueOfferTimeoutSecs(),
EventReporter.DEFAULT_QUEUE_OFFER_TIMEOUT_SECS);
+
+ Config config = ConfigFactory.parseMap(
+ ImmutableMap.<String, Object>builder()
+ .put(EventReporter.QUEUE_CAPACITY_KEY, 200)
+ .put(EventReporter.QUEUE_OFFER_TIMOUT_SECS_KEY, 5)
+ .build());
+
+ kafkaReporter = getBuilder(context,
pusher).withConfig(config).build("localhost:0000", "topic");
+ Assert.assertEquals(kafkaReporter.getQueueCapacity(), 200);
+ Assert.assertEquals(kafkaReporter.getQueueOfferTimeoutSecs(), 5);
+ }
+
/**
* Extract the next metric from the Kafka iterator
* Assumes existence of the metric has already been checked.