This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 85391e9 fix opentsdb emitter always be running and fail sending tags
whose value contains colon (#6251)
85391e9 is described below
commit 85391e9fb3cef355dacbbf5c443773fb7aab445a
Author: QiuMM <[email protected]>
AuthorDate: Sat Sep 15 03:14:15 2018 +0800
fix opentsdb emitter always be running and fail sending tags whose value
contains colon (#6251)
* fix opentsdb emitter always be running
* check if emitter started
* add more details about consumeDelay in doc
* fix possible thread unsafe
* fix fail sending tags whose value contain colon
---
.../extensions-contrib/opentsdb-emitter.md | 7 +-
.../druid/emitter/opentsdb/EventConverter.java | 12 ++-
.../druid/emitter/opentsdb/OpentsdbEmitter.java | 26 +++++-
.../emitter/opentsdb/OpentsdbEmitterConfig.java | 15 +++
.../druid/emitter/opentsdb/OpentsdbSender.java | 103 ++++++++++++++-------
.../druid/emitter/opentsdb/EventConverterTest.java | 10 +-
.../opentsdb/OpentsdbEmitterConfigTest.java | 2 +-
.../druid/emitter/opentsdb/OpentsdbSenderTest.java | 2 +-
8 files changed, 129 insertions(+), 48 deletions(-)
diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md
b/docs/content/development/extensions-contrib/opentsdb-emitter.md
index dafdfd0..f11a121 100644
--- a/docs/content/development/extensions-contrib/opentsdb-emitter.md
+++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md
@@ -8,7 +8,7 @@ To use this extension, make sure to
[include](../../operations/including-extensi
## Introduction
-This extension emits druid metrics to
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter
only emits service metric events to OpenTSDB (See
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
+This extension emits druid metrics to
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP (Using `Jersey
client`). And this emitter only emits service metric events to OpenTSDB (See
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
## Configuration
@@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter
are under `druid.emitt
|--------|-----------|---------|-------|
|`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none|
|`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none|
-|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in
milliseconds).|no|2000|
-|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000|
+|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection
timeout(in milliseconds).|no|2000|
+|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in
milliseconds).|no|2000|
|`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will
be sent as one batch)|no|100|
|`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to
buffer events.|no|1000|
+|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in milliseconds).
Actually, we use `ScheduledExecutorService` to schedule consuming events, so
this `consumeDelay` means the delay between the termination of one execution
and the commencement of the next. If your druid nodes produce metric events
fast, then you should decrease this `consumeDelay` or increase the
`maxQueueSize`.|no|10000|
|`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics
and dimensions for every Druid
metric|no|./src/main/resources/defaultMetrics.json|
### Druid to OpenTSDB Event Converter
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
index 3745dd0..c290457 100644
---
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
+++
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
@@ -39,6 +39,8 @@ public class EventConverter
{
private static final Logger log = new Logger(EventConverter.class);
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+ private static final String COLON = ":";
+ private static final String DEFAULT_COLON_REPLACEMENT = "_";
private final Map<String, Set<String>> metricMap;
@@ -72,15 +74,19 @@ public class EventConverter
Number value = serviceMetricEvent.getValue();
Map<String, Object> tags = new HashMap<>();
- String service = serviceMetricEvent.getService();
- String host = serviceMetricEvent.getHost();
+ String service = serviceMetricEvent.getService().replaceAll(COLON,
DEFAULT_COLON_REPLACEMENT);
+ String host = serviceMetricEvent.getHost().replaceAll(COLON,
DEFAULT_COLON_REPLACEMENT);
tags.put("service", service);
tags.put("host", host);
Map<String, Object> userDims = serviceMetricEvent.getUserDims();
for (String dim : metricMap.get(metric)) {
if (userDims.containsKey(dim)) {
- tags.put(dim, userDims.get(dim));
+ Object dimValue = userDims.get(dim);
+ if (dimValue instanceof String) {
+ dimValue = ((String) dimValue).replaceAll(COLON,
DEFAULT_COLON_REPLACEMENT);
+ }
+ tags.put(dim, dimValue);
}
}
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
index 0fa5e60..515c297 100644
---
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
+++
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
@@ -20,17 +20,21 @@
package org.apache.druid.emitter.opentsdb;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class OpentsdbEmitter implements Emitter
{
private static final Logger log = new Logger(OpentsdbEmitter.class);
private final OpentsdbSender sender;
private final EventConverter converter;
+ private final AtomicBoolean started = new AtomicBoolean(false);
public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper)
{
@@ -40,7 +44,8 @@ public class OpentsdbEmitter implements Emitter
config.getConnectionTimeout(),
config.getReadTimeout(),
config.getFlushThreshold(),
- config.getMaxQueueSize()
+ config.getMaxQueueSize(),
+ config.getConsumeDelay()
);
this.converter = new EventConverter(mapper, config.getMetricMapPath());
}
@@ -48,11 +53,21 @@ public class OpentsdbEmitter implements Emitter
@Override
public void start()
{
+ synchronized (started) {
+ if (!started.get()) {
+ log.info("Starting Opentsdb Emitter.");
+ sender.start();
+ started.set(true);
+ }
+ }
}
@Override
public void emit(Event event)
{
+ if (!started.get()) {
+ throw new ISE("WTF emit was called while service is not started yet");
+ }
if (event instanceof ServiceMetricEvent) {
OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent)
event);
if (opentsdbEvent != null) {
@@ -69,12 +84,17 @@ public class OpentsdbEmitter implements Emitter
@Override
public void flush()
{
- sender.flush();
+ if (started.get()) {
+ sender.flush();
+ }
}
@Override
public void close()
{
- sender.close();
+ if (started.get()) {
+ sender.close();
+ started.set(false);
+ }
}
}
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
index f953c48..fd230b2 100644
---
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
+++
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
@@ -27,6 +27,7 @@ public class OpentsdbEmitterConfig
{
private static final int DEFAULT_FLUSH_THRESHOLD = 100;
private static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
+ private static final long DEFAULT_CONSUME_DELAY_MILLIS = 10000;
private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000;
private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000;
@@ -49,6 +50,9 @@ public class OpentsdbEmitterConfig
private final int maxQueueSize;
@JsonProperty
+ private final long consumeDelay;
+
+ @JsonProperty
private final String metricMapPath;
@JsonCreator
@@ -59,6 +63,7 @@ public class OpentsdbEmitterConfig
@JsonProperty("readTimeout") Integer readTimeout,
@JsonProperty("flushThreshold") Integer flushThreshold,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
+ @JsonProperty("consumeDelay") Long consumeDelay,
@JsonProperty("metricMapPath") String metricMapPath
)
{
@@ -71,6 +76,7 @@ public class OpentsdbEmitterConfig
(readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS
: readTimeout;
this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ?
DEFAULT_FLUSH_THRESHOLD : flushThreshold;
this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ?
DEFAULT_MAX_QUEUE_SIZE : maxQueueSize;
+ this.consumeDelay = (consumeDelay == null || consumeDelay < 0) ?
DEFAULT_CONSUME_DELAY_MILLIS : consumeDelay;
this.metricMapPath = metricMapPath;
}
@@ -104,6 +110,9 @@ public class OpentsdbEmitterConfig
if (maxQueueSize != that.maxQueueSize) {
return false;
}
+ if (consumeDelay != that.consumeDelay) {
+ return false;
+ }
return metricMapPath != null ? metricMapPath.equals(that.metricMapPath)
: that.metricMapPath == null;
}
@@ -117,6 +126,7 @@ public class OpentsdbEmitterConfig
result = 31 * result + readTimeout;
result = 31 * result + flushThreshold;
result = 31 * result + maxQueueSize;
+ result = 31 * result + (int) consumeDelay;
result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() :
0);
return result;
}
@@ -151,6 +161,11 @@ public class OpentsdbEmitterConfig
return maxQueueSize;
}
+ public long getConsumeDelay()
+ {
+ return consumeDelay;
+ }
+
public String getMetricMapPath()
{
return metricMapPath;
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
index 2f93d7c..8f7e3f7 100644
---
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
+++
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
@@ -20,6 +20,7 @@
package org.apache.druid.emitter.opentsdb;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import org.apache.druid.java.util.common.logger.Logger;
@@ -29,8 +30,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class OpentsdbSender
@@ -40,28 +43,40 @@ public class OpentsdbSender
*/
private static final String PATH = "/api/put";
private static final Logger log = new Logger(OpentsdbSender.class);
+ private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private final AtomicLong countLostEvents = new AtomicLong(0);
private final int flushThreshold;
- private final List<OpentsdbEvent> events;
private final BlockingQueue<OpentsdbEvent> eventQueue;
+ private final ScheduledExecutorService scheduler;
+ private final EventConsumer eventConsumer;
+ private final long consumeDelay;
private final Client client;
private final WebResource webResource;
- private final ExecutorService executor = Executors.newFixedThreadPool(1);
- private volatile boolean running = true;
- public OpentsdbSender(String host, int port, int connectionTimeout, int
readTimeout, int flushThreshold, int maxQueueSize)
+ public OpentsdbSender(
+ String host,
+ int port,
+ int connectionTimeout,
+ int readTimeout,
+ int flushThreshold,
+ int maxQueueSize,
+ long consumeDelay
+ )
{
this.flushThreshold = flushThreshold;
- events = new ArrayList<>(flushThreshold);
+ this.consumeDelay = consumeDelay;
eventQueue = new ArrayBlockingQueue<>(maxQueueSize);
+ scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("OpentsdbEventSender-%s")
+ .build());
+ eventConsumer = new EventConsumer();
client = Client.create();
client.setConnectTimeout(connectionTimeout);
client.setReadTimeout(readTimeout);
webResource = client.resource("http://" + host + ":" + port + PATH);
-
- executor.execute(new EventConsumer());
}
public void enqueue(OpentsdbEvent event)
@@ -76,46 +91,70 @@ public class OpentsdbSender
}
}
+ public void start()
+ {
+ scheduler.scheduleWithFixedDelay(
+ eventConsumer,
+ consumeDelay,
+ consumeDelay,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
public void flush()
{
- sendEvents();
+ try {
+ EventConsumer flushConsumer = new EventConsumer();
+ Future future = scheduler.schedule(flushConsumer, 0,
TimeUnit.MILLISECONDS);
+ future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ // send remaining events which size may less than flushThreshold
+ eventConsumer.sendEvents();
+ flushConsumer.sendEvents();
+ }
+ catch (Exception e) {
+ log.warn(e, e.getMessage());
+ }
}
public void close()
{
flush();
client.destroy();
- running = false;
- executor.shutdown();
+ scheduler.shutdown();
}
- private void sendEvents()
+ private class EventConsumer implements Runnable
{
- if (!events.isEmpty()) {
- try {
- webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
- }
- catch (Exception e) {
- log.error(e, "send to opentsdb server failed");
- }
- finally {
- events.clear();
- }
+ private final List<OpentsdbEvent> events;
+
+ public EventConsumer()
+ {
+ events = new ArrayList<>(flushThreshold);
}
- }
- private class EventConsumer implements Runnable
- {
@Override
public void run()
{
- while (running) {
- if (!eventQueue.isEmpty()) {
- OpentsdbEvent event = eventQueue.poll();
- events.add(event);
- if (events.size() >= flushThreshold) {
- sendEvents();
- }
+ while (!eventQueue.isEmpty() && !scheduler.isShutdown()) {
+ OpentsdbEvent event = eventQueue.poll();
+ events.add(event);
+ if (events.size() >= flushThreshold) {
+ sendEvents();
+ }
+ }
+ }
+
+ public void sendEvents()
+ {
+ if (!events.isEmpty()) {
+ try {
+ webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
+ }
+ catch (Exception e) {
+ log.error(e, "error occurred when sending metrics to opentsdb
server.");
+ }
+ finally {
+ events.clear();
}
}
}
diff --git
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
index da3b37d..7b3f205 100644
---
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
+++
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
@@ -54,15 +54,15 @@ public class EventConverterTest
{
DateTime dateTime = DateTimes.nowUtc();
ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder()
- .setDimension("dataSource", "data-source")
+ .setDimension("dataSource", "foo:bar")
.setDimension("type", "groupBy")
.build(dateTime, "query/time", 10)
- .build("broker", "brokerHost1");
+ .build("druid:broker", "127.0.0.1:8080");
Map<String, Object> expectedTags = new HashMap<>();
- expectedTags.put("service", "broker");
- expectedTags.put("host", "brokerHost1");
- expectedTags.put("dataSource", "data-source");
+ expectedTags.put("service", "druid_broker");
+ expectedTags.put("host", "127.0.0.1_8080");
+ expectedTags.put("dataSource", "foo_bar");
expectedTags.put("type", "groupBy");
OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent);
diff --git
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
index b263316..11ccb9d 100644
---
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
+++
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
@@ -39,7 +39,7 @@ public class OpentsdbEmitterConfigTest
@Test
public void testSerDeserOpentsdbEmitterConfig() throws Exception
{
- OpentsdbEmitterConfig opentsdbEmitterConfig = new
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null);
+ OpentsdbEmitterConfig opentsdbEmitterConfig = new
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, 10000L, null);
String opentsdbEmitterConfigString =
mapper.writeValueAsString(opentsdbEmitterConfig);
OpentsdbEmitterConfig expectedOpentsdbEmitterConfig =
mapper.reader(OpentsdbEmitterConfig.class)
.readValue(opentsdbEmitterConfigString);
diff --git
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
index d28f04d..79c8ae5 100644
---
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
+++
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
@@ -27,7 +27,7 @@ public class OpentsdbSenderTest
@Test
public void testUrl()
{
- OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000,
100, 1000);
+ OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000,
100, 1000, 10000L);
String expectedUrl = "http://localhost:9999/api/put";
Assert.assertEquals(expectedUrl,
sender.getWebResource().getURI().toString());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]