jon-wei closed pull request #6251: fix opentsdb emitter always be running and fail sending tags whose value contains colon URL: https://github.com/apache/incubator-druid/pull/6251
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md b/docs/content/development/extensions-contrib/opentsdb-emitter.md index dafdfd07552..f11a1219247 100644 --- a/docs/content/development/extensions-contrib/opentsdb-emitter.md +++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md @@ -8,7 +8,7 @@ To use this extension, make sure to [include](../../operations/including-extensi ## Introduction -This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics). +This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP (Using `Jersey client`). And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics). ## Configuration @@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter are under `druid.emitt |--------|-----------|---------|-------| |`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none| |`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none| -|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in milliseconds).|no|2000| -|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in milliseconds).|no|2000| |`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will be sent as one batch)|no|100| |`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to buffer events.|no|1000| +|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in milliseconds). Actually, we use `ScheduledExecutorService` to schedule consuming events, so this `consumeDelay` means the delay between the termination of one execution and the commencement of the next. If your druid nodes produce metric events fast, then you should decrease this `consumeDelay` or increase the `maxQueueSize`.|no|10000| |`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics and dimensions for every Druid metric|no|./src/main/resources/defaultMetrics.json| ### Druid to OpenTSDB Event Converter diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java index 3745dd0dee7..c29045722ab 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java @@ -39,6 +39,8 @@ { private static final Logger log = new Logger(EventConverter.class); private static final Pattern WHITESPACE = Pattern.compile("[\\s]+"); + private static final String COLON = ":"; + private static final String DEFAULT_COLON_REPLACEMENT = "_"; private final Map<String, Set<String>> metricMap; @@ -72,15 +74,19 @@ public OpentsdbEvent convert(ServiceMetricEvent serviceMetricEvent) Number value = serviceMetricEvent.getValue(); Map<String, Object> tags = new HashMap<>(); - String service = serviceMetricEvent.getService(); - String host = serviceMetricEvent.getHost(); + String service = serviceMetricEvent.getService().replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); + String host = serviceMetricEvent.getHost().replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); tags.put("service", service); tags.put("host", host); Map<String, Object> userDims = serviceMetricEvent.getUserDims(); for (String dim : metricMap.get(metric)) { if (userDims.containsKey(dim)) { - tags.put(dim, userDims.get(dim)); + Object dimValue = userDims.get(dim); + if (dimValue instanceof String) { + dimValue = ((String) dimValue).replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); + } + tags.put(dim, dimValue); } } diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java index 0fa5e601007..515c297a99a 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java @@ -20,17 +20,21 @@ package org.apache.druid.emitter.opentsdb; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import java.util.concurrent.atomic.AtomicBoolean; + public class OpentsdbEmitter implements Emitter { private static final Logger log = new Logger(OpentsdbEmitter.class); private final OpentsdbSender sender; private final EventConverter converter; + private final AtomicBoolean started = new AtomicBoolean(false); public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) { @@ -40,7 +44,8 @@ public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) config.getConnectionTimeout(), config.getReadTimeout(), config.getFlushThreshold(), - config.getMaxQueueSize() + config.getMaxQueueSize(), + config.getConsumeDelay() ); this.converter = new EventConverter(mapper, config.getMetricMapPath()); } @@ -48,11 +53,21 @@ public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) @Override public void start() { + synchronized (started) { + if (!started.get()) { + log.info("Starting Opentsdb Emitter."); + sender.start(); + started.set(true); + } + } } @Override public void emit(Event event) { + if (!started.get()) { + throw new ISE("WTF emit was called while service is not started yet"); + } if (event instanceof ServiceMetricEvent) { OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) event); if (opentsdbEvent != null) { @@ -69,12 +84,17 @@ public void emit(Event event) @Override public void flush() { - sender.flush(); + if (started.get()) { + sender.flush(); + } } @Override public void close() { - sender.close(); + if (started.get()) { + sender.close(); + started.set(false); + } } } diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java index f953c480aef..fd230b21bb8 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java @@ -27,6 +27,7 @@ { private static final int DEFAULT_FLUSH_THRESHOLD = 100; private static final int DEFAULT_MAX_QUEUE_SIZE = 1000; + private static final long DEFAULT_CONSUME_DELAY_MILLIS = 10000; private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000; private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000; @@ -48,6 +49,9 @@ @JsonProperty private final int maxQueueSize; + @JsonProperty + private final long consumeDelay; + @JsonProperty private final String metricMapPath; @@ -59,6 +63,7 @@ public OpentsdbEmitterConfig( @JsonProperty("readTimeout") Integer readTimeout, @JsonProperty("flushThreshold") Integer flushThreshold, @JsonProperty("maxQueueSize") Integer maxQueueSize, + @JsonProperty("consumeDelay") Long consumeDelay, @JsonProperty("metricMapPath") String metricMapPath ) { @@ -71,6 +76,7 @@ public OpentsdbEmitterConfig( (readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS : readTimeout; this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? DEFAULT_FLUSH_THRESHOLD : flushThreshold; this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? DEFAULT_MAX_QUEUE_SIZE : maxQueueSize; + this.consumeDelay = (consumeDelay == null || consumeDelay < 0) ? DEFAULT_CONSUME_DELAY_MILLIS : consumeDelay; this.metricMapPath = metricMapPath; } @@ -104,6 +110,9 @@ public boolean equals(Object o) if (maxQueueSize != that.maxQueueSize) { return false; } + if (consumeDelay != that.consumeDelay) { + return false; + } return metricMapPath != null ? metricMapPath.equals(that.metricMapPath) : that.metricMapPath == null; } @@ -117,6 +126,7 @@ public int hashCode() result = 31 * result + readTimeout; result = 31 * result + flushThreshold; result = 31 * result + maxQueueSize; + result = 31 * result + (int) consumeDelay; result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 0); return result; } @@ -151,6 +161,11 @@ public int getMaxQueueSize() return maxQueueSize; } + public long getConsumeDelay() + { + return consumeDelay; + } + public String getMetricMapPath() { return metricMapPath; diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java index 2f93d7c2d12..8f7e3f7a506 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java @@ -20,6 +20,7 @@ package org.apache.druid.emitter.opentsdb; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.WebResource; import org.apache.druid.java.util.common.logger.Logger; @@ -29,8 +30,10 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class OpentsdbSender @@ -40,28 +43,40 @@ */ private static final String PATH = "/api/put"; private static final Logger log = new Logger(OpentsdbSender.class); + private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min private final AtomicLong countLostEvents = new AtomicLong(0); private final int flushThreshold; - private final List<OpentsdbEvent> events; private final BlockingQueue<OpentsdbEvent> eventQueue; + private final ScheduledExecutorService scheduler; + private final EventConsumer eventConsumer; + private final long consumeDelay; private final Client client; private final WebResource webResource; - private final ExecutorService executor = Executors.newFixedThreadPool(1); - private volatile boolean running = true; - public OpentsdbSender(String host, int port, int connectionTimeout, int readTimeout, int flushThreshold, int maxQueueSize) + public OpentsdbSender( + String host, + int port, + int connectionTimeout, + int readTimeout, + int flushThreshold, + int maxQueueSize, + long consumeDelay + ) { this.flushThreshold = flushThreshold; - events = new ArrayList<>(flushThreshold); + this.consumeDelay = consumeDelay; eventQueue = new ArrayBlockingQueue<>(maxQueueSize); + scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("OpentsdbEventSender-%s") + .build()); + eventConsumer = new EventConsumer(); client = Client.create(); client.setConnectTimeout(connectionTimeout); client.setReadTimeout(readTimeout); webResource = client.resource("http://" + host + ":" + port + PATH); - - executor.execute(new EventConsumer()); } public void enqueue(OpentsdbEvent event) @@ -76,46 +91,70 @@ public void enqueue(OpentsdbEvent event) } } + public void start() + { + scheduler.scheduleWithFixedDelay( + eventConsumer, + consumeDelay, + consumeDelay, + TimeUnit.MILLISECONDS + ); + } + public void flush() { - sendEvents(); + try { + EventConsumer flushConsumer = new EventConsumer(); + Future future = scheduler.schedule(flushConsumer, 0, TimeUnit.MILLISECONDS); + future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS); + // send remaining events which size may less than flushThreshold + eventConsumer.sendEvents(); + flushConsumer.sendEvents(); + } + catch (Exception e) { + log.warn(e, e.getMessage()); + } } public void close() { flush(); client.destroy(); - running = false; - executor.shutdown(); + scheduler.shutdown(); } - private void sendEvents() + private class EventConsumer implements Runnable { - if (!events.isEmpty()) { - try { - webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post(); - } - catch (Exception e) { - log.error(e, "send to opentsdb server failed"); - } - finally { - events.clear(); - } + private final List<OpentsdbEvent> events; + + public EventConsumer() + { + events = new ArrayList<>(flushThreshold); } - } - private class EventConsumer implements Runnable - { @Override public void run() { - while (running) { - if (!eventQueue.isEmpty()) { - OpentsdbEvent event = eventQueue.poll(); - events.add(event); - if (events.size() >= flushThreshold) { - sendEvents(); - } + while (!eventQueue.isEmpty() && !scheduler.isShutdown()) { + OpentsdbEvent event = eventQueue.poll(); + events.add(event); + if (events.size() >= flushThreshold) { + sendEvents(); + } + } + } + + public void sendEvents() + { + if (!events.isEmpty()) { + try { + webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post(); + } + catch (Exception e) { + log.error(e, "error occurred when sending metrics to opentsdb server."); + } + finally { + events.clear(); } } } diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java index da3b37d4d41..7b3f205c141 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java @@ -54,15 +54,15 @@ public void testConvert() { DateTime dateTime = DateTimes.nowUtc(); ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder() - .setDimension("dataSource", "data-source") + .setDimension("dataSource", "foo:bar") .setDimension("type", "groupBy") .build(dateTime, "query/time", 10) - .build("broker", "brokerHost1"); + .build("druid:broker", "127.0.0.1:8080"); Map<String, Object> expectedTags = new HashMap<>(); - expectedTags.put("service", "broker"); - expectedTags.put("host", "brokerHost1"); - expectedTags.put("dataSource", "data-source"); + expectedTags.put("service", "druid_broker"); + expectedTags.put("host", "127.0.0.1_8080"); + expectedTags.put("dataSource", "foo_bar"); expectedTags.put("type", "groupBy"); OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent); diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java index b2633164394..11ccb9de571 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java @@ -39,7 +39,7 @@ public void setUp() @Test public void testSerDeserOpentsdbEmitterConfig() throws Exception { - OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null); + OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, 10000L, null); String opentsdbEmitterConfigString = mapper.writeValueAsString(opentsdbEmitterConfig); OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = mapper.reader(OpentsdbEmitterConfig.class) .readValue(opentsdbEmitterConfigString); diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java index d28f04d526e..79c8ae52ce4 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java @@ -27,7 +27,7 @@ @Test public void testUrl() { - OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000); + OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000, 10000L); String expectedUrl = "http://localhost:9999/api/put"; Assert.assertEquals(expectedUrl, sender.getWebResource().getURI().toString()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
