This is an automated email from the ASF dual-hosted git repository. jonwei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 85391e9 fix opentsdb emitter always be running and fail sending tags whose value contains colon (#6251) 85391e9 is described below commit 85391e9fb3cef355dacbbf5c443773fb7aab445a Author: QiuMM <csurj...@gmail.com> AuthorDate: Sat Sep 15 03:14:15 2018 +0800 fix opentsdb emitter always be running and fail sending tags whose value contains colon (#6251) * fix opentsdb emitter always be running * check if emitter started * add more details about consumeDelay in doc * fix possible thread unsafe * fix fail sending tags whose value contain colon --- .../extensions-contrib/opentsdb-emitter.md | 7 +- .../druid/emitter/opentsdb/EventConverter.java | 12 ++- .../druid/emitter/opentsdb/OpentsdbEmitter.java | 26 +++++- .../emitter/opentsdb/OpentsdbEmitterConfig.java | 15 +++ .../druid/emitter/opentsdb/OpentsdbSender.java | 103 ++++++++++++++------- .../druid/emitter/opentsdb/EventConverterTest.java | 10 +- .../opentsdb/OpentsdbEmitterConfigTest.java | 2 +- .../druid/emitter/opentsdb/OpentsdbSenderTest.java | 2 +- 8 files changed, 129 insertions(+), 48 deletions(-) diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md b/docs/content/development/extensions-contrib/opentsdb-emitter.md index dafdfd0..f11a121 100644 --- a/docs/content/development/extensions-contrib/opentsdb-emitter.md +++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md @@ -8,7 +8,7 @@ To use this extension, make sure to [include](../../operations/including-extensi ## Introduction -This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics). +This extension emits druid metrics to [OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP (Using `Jersey client`). And this emitter only emits service metric events to OpenTSDB (See http://druid.io/docs/latest/operations/metrics.html for a list of metrics). ## Configuration @@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter are under `druid.emitt |--------|-----------|---------|-------| |`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none| |`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none| -|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in milliseconds).|no|2000| -|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection timeout(in milliseconds).|no|2000| +|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in milliseconds).|no|2000| |`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will be sent as one batch)|no|100| |`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to buffer events.|no|1000| +|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in milliseconds). Actually, we use `ScheduledExecutorService` to schedule consuming events, so this `consumeDelay` means the delay between the termination of one execution and the commencement of the next. If your druid nodes produce metric events fast, then you should decrease this `consumeDelay` or increase the `maxQueueSize`.|no|10000| |`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics and dimensions for every Druid metric|no|./src/main/resources/defaultMetrics.json| ### Druid to OpenTSDB Event Converter diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java index 3745dd0..c290457 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java @@ -39,6 +39,8 @@ public class EventConverter { private static final Logger log = new Logger(EventConverter.class); private static final Pattern WHITESPACE = Pattern.compile("[\\s]+"); + private static final String COLON = ":"; + private static final String DEFAULT_COLON_REPLACEMENT = "_"; private final Map<String, Set<String>> metricMap; @@ -72,15 +74,19 @@ public class EventConverter Number value = serviceMetricEvent.getValue(); Map<String, Object> tags = new HashMap<>(); - String service = serviceMetricEvent.getService(); - String host = serviceMetricEvent.getHost(); + String service = serviceMetricEvent.getService().replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); + String host = serviceMetricEvent.getHost().replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); tags.put("service", service); tags.put("host", host); Map<String, Object> userDims = serviceMetricEvent.getUserDims(); for (String dim : metricMap.get(metric)) { if (userDims.containsKey(dim)) { - tags.put(dim, userDims.get(dim)); + Object dimValue = userDims.get(dim); + if (dimValue instanceof String) { + dimValue = ((String) dimValue).replaceAll(COLON, DEFAULT_COLON_REPLACEMENT); + } + tags.put(dim, dimValue); } } diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java index 0fa5e60..515c297 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java @@ -20,17 +20,21 @@ package org.apache.druid.emitter.opentsdb; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import java.util.concurrent.atomic.AtomicBoolean; + public class OpentsdbEmitter implements Emitter { private static final Logger log = new Logger(OpentsdbEmitter.class); private final OpentsdbSender sender; private final EventConverter converter; + private final AtomicBoolean started = new AtomicBoolean(false); public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper) { @@ -40,7 +44,8 @@ public class OpentsdbEmitter implements Emitter config.getConnectionTimeout(), config.getReadTimeout(), config.getFlushThreshold(), - config.getMaxQueueSize() + config.getMaxQueueSize(), + config.getConsumeDelay() ); this.converter = new EventConverter(mapper, config.getMetricMapPath()); } @@ -48,11 +53,21 @@ public class OpentsdbEmitter implements Emitter @Override public void start() { + synchronized (started) { + if (!started.get()) { + log.info("Starting Opentsdb Emitter."); + sender.start(); + started.set(true); + } + } } @Override public void emit(Event event) { + if (!started.get()) { + throw new ISE("WTF emit was called while service is not started yet"); + } if (event instanceof ServiceMetricEvent) { OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) event); if (opentsdbEvent != null) { @@ -69,12 +84,17 @@ public class OpentsdbEmitter implements Emitter @Override public void flush() { - sender.flush(); + if (started.get()) { + sender.flush(); + } } @Override public void close() { - sender.close(); + if (started.get()) { + sender.close(); + started.set(false); + } } } diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java index f953c48..fd230b2 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java @@ -27,6 +27,7 @@ public class OpentsdbEmitterConfig { private static final int DEFAULT_FLUSH_THRESHOLD = 100; private static final int DEFAULT_MAX_QUEUE_SIZE = 1000; + private static final long DEFAULT_CONSUME_DELAY_MILLIS = 10000; private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000; private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000; @@ -49,6 +50,9 @@ public class OpentsdbEmitterConfig private final int maxQueueSize; @JsonProperty + private final long consumeDelay; + + @JsonProperty private final String metricMapPath; @JsonCreator @@ -59,6 +63,7 @@ public class OpentsdbEmitterConfig @JsonProperty("readTimeout") Integer readTimeout, @JsonProperty("flushThreshold") Integer flushThreshold, @JsonProperty("maxQueueSize") Integer maxQueueSize, + @JsonProperty("consumeDelay") Long consumeDelay, @JsonProperty("metricMapPath") String metricMapPath ) { @@ -71,6 +76,7 @@ public class OpentsdbEmitterConfig (readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS : readTimeout; this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? DEFAULT_FLUSH_THRESHOLD : flushThreshold; this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? DEFAULT_MAX_QUEUE_SIZE : maxQueueSize; + this.consumeDelay = (consumeDelay == null || consumeDelay < 0) ? DEFAULT_CONSUME_DELAY_MILLIS : consumeDelay; this.metricMapPath = metricMapPath; } @@ -104,6 +110,9 @@ public class OpentsdbEmitterConfig if (maxQueueSize != that.maxQueueSize) { return false; } + if (consumeDelay != that.consumeDelay) { + return false; + } return metricMapPath != null ? metricMapPath.equals(that.metricMapPath) : that.metricMapPath == null; } @@ -117,6 +126,7 @@ public class OpentsdbEmitterConfig result = 31 * result + readTimeout; result = 31 * result + flushThreshold; result = 31 * result + maxQueueSize; + result = 31 * result + (int) consumeDelay; result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 0); return result; } @@ -151,6 +161,11 @@ public class OpentsdbEmitterConfig return maxQueueSize; } + public long getConsumeDelay() + { + return consumeDelay; + } + public String getMetricMapPath() { return metricMapPath; diff --git a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java index 2f93d7c..8f7e3f7 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java +++ b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java @@ -20,6 +20,7 @@ package org.apache.druid.emitter.opentsdb; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.WebResource; import org.apache.druid.java.util.common.logger.Logger; @@ -29,8 +30,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class OpentsdbSender @@ -40,28 +43,40 @@ public class OpentsdbSender */ private static final String PATH = "/api/put"; private static final Logger log = new Logger(OpentsdbSender.class); + private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min private final AtomicLong countLostEvents = new AtomicLong(0); private final int flushThreshold; - private final List<OpentsdbEvent> events; private final BlockingQueue<OpentsdbEvent> eventQueue; + private final ScheduledExecutorService scheduler; + private final EventConsumer eventConsumer; + private final long consumeDelay; private final Client client; private final WebResource webResource; - private final ExecutorService executor = Executors.newFixedThreadPool(1); - private volatile boolean running = true; - public OpentsdbSender(String host, int port, int connectionTimeout, int readTimeout, int flushThreshold, int maxQueueSize) + public OpentsdbSender( + String host, + int port, + int connectionTimeout, + int readTimeout, + int flushThreshold, + int maxQueueSize, + long consumeDelay + ) { this.flushThreshold = flushThreshold; - events = new ArrayList<>(flushThreshold); + this.consumeDelay = consumeDelay; eventQueue = new ArrayBlockingQueue<>(maxQueueSize); + scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("OpentsdbEventSender-%s") + .build()); + eventConsumer = new EventConsumer(); client = Client.create(); client.setConnectTimeout(connectionTimeout); client.setReadTimeout(readTimeout); webResource = client.resource("http://" + host + ":" + port + PATH); - - executor.execute(new EventConsumer()); } public void enqueue(OpentsdbEvent event) @@ -76,46 +91,70 @@ public class OpentsdbSender } } + public void start() + { + scheduler.scheduleWithFixedDelay( + eventConsumer, + consumeDelay, + consumeDelay, + TimeUnit.MILLISECONDS + ); + } + public void flush() { - sendEvents(); + try { + EventConsumer flushConsumer = new EventConsumer(); + Future future = scheduler.schedule(flushConsumer, 0, TimeUnit.MILLISECONDS); + future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS); + // send remaining events which size may less than flushThreshold + eventConsumer.sendEvents(); + flushConsumer.sendEvents(); + } + catch (Exception e) { + log.warn(e, e.getMessage()); + } } public void close() { flush(); client.destroy(); - running = false; - executor.shutdown(); + scheduler.shutdown(); } - private void sendEvents() + private class EventConsumer implements Runnable { - if (!events.isEmpty()) { - try { - webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post(); - } - catch (Exception e) { - log.error(e, "send to opentsdb server failed"); - } - finally { - events.clear(); - } + private final List<OpentsdbEvent> events; + + public EventConsumer() + { + events = new ArrayList<>(flushThreshold); } - } - private class EventConsumer implements Runnable - { @Override public void run() { - while (running) { - if (!eventQueue.isEmpty()) { - OpentsdbEvent event = eventQueue.poll(); - events.add(event); - if (events.size() >= flushThreshold) { - sendEvents(); - } + while (!eventQueue.isEmpty() && !scheduler.isShutdown()) { + OpentsdbEvent event = eventQueue.poll(); + events.add(event); + if (events.size() >= flushThreshold) { + sendEvents(); + } + } + } + + public void sendEvents() + { + if (!events.isEmpty()) { + try { + webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post(); + } + catch (Exception e) { + log.error(e, "error occurred when sending metrics to opentsdb server."); + } + finally { + events.clear(); } } } diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java index da3b37d..7b3f205 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java @@ -54,15 +54,15 @@ public class EventConverterTest { DateTime dateTime = DateTimes.nowUtc(); ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder() - .setDimension("dataSource", "data-source") + .setDimension("dataSource", "foo:bar") .setDimension("type", "groupBy") .build(dateTime, "query/time", 10) - .build("broker", "brokerHost1"); + .build("druid:broker", "127.0.0.1:8080"); Map<String, Object> expectedTags = new HashMap<>(); - expectedTags.put("service", "broker"); - expectedTags.put("host", "brokerHost1"); - expectedTags.put("dataSource", "data-source"); + expectedTags.put("service", "druid_broker"); + expectedTags.put("host", "127.0.0.1_8080"); + expectedTags.put("dataSource", "foo_bar"); expectedTags.put("type", "groupBy"); OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent); diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java index b263316..11ccb9d 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java @@ -39,7 +39,7 @@ public class OpentsdbEmitterConfigTest @Test public void testSerDeserOpentsdbEmitterConfig() throws Exception { - OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null); + OpentsdbEmitterConfig opentsdbEmitterConfig = new OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, 10000L, null); String opentsdbEmitterConfigString = mapper.writeValueAsString(opentsdbEmitterConfig); OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = mapper.reader(OpentsdbEmitterConfig.class) .readValue(opentsdbEmitterConfigString); diff --git a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java index d28f04d..79c8ae5 100644 --- a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java +++ b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java @@ -27,7 +27,7 @@ public class OpentsdbSenderTest @Test public void testUrl() { - OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000); + OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 100, 1000, 10000L); String expectedUrl = "http://localhost:9999/api/put"; Assert.assertEquals(expectedUrl, sender.getWebResource().getURI().toString()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org