jon-wei closed pull request #6251: fix opentsdb emitter always be running  and 
fail sending tags whose value contains colon
URL: https://github.com/apache/incubator-druid/pull/6251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/development/extensions-contrib/opentsdb-emitter.md 
b/docs/content/development/extensions-contrib/opentsdb-emitter.md
index dafdfd07552..f11a1219247 100644
--- a/docs/content/development/extensions-contrib/opentsdb-emitter.md
+++ b/docs/content/development/extensions-contrib/opentsdb-emitter.md
@@ -8,7 +8,7 @@ To use this extension, make sure to 
[include](../../operations/including-extensi
 
 ## Introduction
 
-This extension emits druid metrics to 
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP. And this emitter 
only emits service metric events to OpenTSDB (See 
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
+This extension emits druid metrics to 
[OpenTSDB](https://github.com/OpenTSDB/opentsdb) over HTTP (Using `Jersey 
client`). And this emitter only emits service metric events to OpenTSDB (See 
http://druid.io/docs/latest/operations/metrics.html for a list of metrics).
 
 ## Configuration
 
@@ -18,10 +18,11 @@ All the configuration parameters for the opentsdb emitter 
are under `druid.emitt
 |--------|-----------|---------|-------|
 |`druid.emitter.opentsdb.host`|The host of the OpenTSDB server.|yes|none|
 |`druid.emitter.opentsdb.port`|The port of the OpenTSDB server.|yes|none|
-|`druid.emitter.opentsdb.connectionTimeout`|Connection timeout(in 
milliseconds).|no|2000|
-|`druid.emitter.opentsdb.readTimeout`|Read timeout(in milliseconds).|no|2000|  
+|`druid.emitter.opentsdb.connectionTimeout`|`Jersey client` connection 
timeout(in milliseconds).|no|2000|
+|`druid.emitter.opentsdb.readTimeout`|`Jersey client` read timeout(in 
milliseconds).|no|2000|
 |`druid.emitter.opentsdb.flushThreshold`|Queue flushing threshold.(Events will 
be sent as one batch)|no|100|
 |`druid.emitter.opentsdb.maxQueueSize`|Maximum size of the queue used to 
buffer events.|no|1000|
+|`druid.emitter.opentsdb.consumeDelay`|Queue consuming delay(in milliseconds). 
Actually, we use `ScheduledExecutorService` to schedule consuming events, so 
this `consumeDelay` means the delay between the termination of one execution 
and the commencement of the next. If your druid nodes produce metric events 
fast, then you should decrease this `consumeDelay` or increase the 
`maxQueueSize`.|no|10000|
 |`druid.emitter.opentsdb.metricMapPath`|JSON file defining the desired metrics 
and dimensions for every Druid 
metric|no|./src/main/resources/defaultMetrics.json|
 
 ### Druid to OpenTSDB Event Converter
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
index 3745dd0dee7..c29045722ab 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/EventConverter.java
@@ -39,6 +39,8 @@
 {
   private static final Logger log = new Logger(EventConverter.class);
   private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");
+  private static final String COLON = ":";
+  private static final String DEFAULT_COLON_REPLACEMENT = "_";
 
   private final Map<String, Set<String>> metricMap;
 
@@ -72,15 +74,19 @@ public OpentsdbEvent convert(ServiceMetricEvent 
serviceMetricEvent)
     Number value = serviceMetricEvent.getValue();
 
     Map<String, Object> tags = new HashMap<>();
-    String service = serviceMetricEvent.getService();
-    String host = serviceMetricEvent.getHost();
+    String service = serviceMetricEvent.getService().replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
+    String host = serviceMetricEvent.getHost().replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
     tags.put("service", service);
     tags.put("host", host);
 
     Map<String, Object> userDims = serviceMetricEvent.getUserDims();
     for (String dim : metricMap.get(metric)) {
       if (userDims.containsKey(dim)) {
-        tags.put(dim, userDims.get(dim));
+        Object dimValue = userDims.get(dim);
+        if (dimValue instanceof String) {
+          dimValue = ((String) dimValue).replaceAll(COLON, 
DEFAULT_COLON_REPLACEMENT);
+        }
+        tags.put(dim, dimValue);
       }
     }
 
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
index 0fa5e601007..515c297a99a 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitter.java
@@ -20,17 +20,21 @@
 package org.apache.druid.emitter.opentsdb;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class OpentsdbEmitter implements Emitter
 {
   private static final Logger log = new Logger(OpentsdbEmitter.class);
 
   private final OpentsdbSender sender;
   private final EventConverter converter;
+  private final AtomicBoolean started = new AtomicBoolean(false);
 
   public OpentsdbEmitter(OpentsdbEmitterConfig config, ObjectMapper mapper)
   {
@@ -40,7 +44,8 @@ public OpentsdbEmitter(OpentsdbEmitterConfig config, 
ObjectMapper mapper)
         config.getConnectionTimeout(),
         config.getReadTimeout(),
         config.getFlushThreshold(),
-        config.getMaxQueueSize()
+        config.getMaxQueueSize(),
+        config.getConsumeDelay()
     );
     this.converter = new EventConverter(mapper, config.getMetricMapPath());
   }
@@ -48,11 +53,21 @@ public OpentsdbEmitter(OpentsdbEmitterConfig config, 
ObjectMapper mapper)
   @Override
   public void start()
   {
+    synchronized (started) {
+      if (!started.get()) {
+        log.info("Starting Opentsdb Emitter.");
+        sender.start();
+        started.set(true);
+      }
+    }
   }
 
   @Override
   public void emit(Event event)
   {
+    if (!started.get()) {
+      throw new ISE("WTF emit was called while service is not started yet");
+    }
     if (event instanceof ServiceMetricEvent) {
       OpentsdbEvent opentsdbEvent = converter.convert((ServiceMetricEvent) 
event);
       if (opentsdbEvent != null) {
@@ -69,12 +84,17 @@ public void emit(Event event)
   @Override
   public void flush()
   {
-    sender.flush();
+    if (started.get()) {
+      sender.flush();
+    }
   }
 
   @Override
   public void close()
   {
-    sender.close();
+    if (started.get()) {
+      sender.close();
+      started.set(false);
+    }
   }
 }
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
index f953c480aef..fd230b21bb8 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfig.java
@@ -27,6 +27,7 @@
 {
   private static final int DEFAULT_FLUSH_THRESHOLD = 100;
   private static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_CONSUME_DELAY_MILLIS = 10000;
   private static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 2000;
   private static final int DEFAULT_READ_TIMEOUT_MILLIS = 2000;
 
@@ -48,6 +49,9 @@
   @JsonProperty
   private final int maxQueueSize;
 
+  @JsonProperty
+  private final long consumeDelay;
+
   @JsonProperty
   private final String metricMapPath;
 
@@ -59,6 +63,7 @@ public OpentsdbEmitterConfig(
       @JsonProperty("readTimeout") Integer readTimeout,
       @JsonProperty("flushThreshold") Integer flushThreshold,
       @JsonProperty("maxQueueSize") Integer maxQueueSize,
+      @JsonProperty("consumeDelay") Long consumeDelay,
       @JsonProperty("metricMapPath") String metricMapPath
   )
   {
@@ -71,6 +76,7 @@ public OpentsdbEmitterConfig(
         (readTimeout == null || readTimeout < 0) ? DEFAULT_READ_TIMEOUT_MILLIS 
: readTimeout;
     this.flushThreshold = (flushThreshold == null || flushThreshold < 0) ? 
DEFAULT_FLUSH_THRESHOLD : flushThreshold;
     this.maxQueueSize = (maxQueueSize == null || maxQueueSize < 0) ? 
DEFAULT_MAX_QUEUE_SIZE : maxQueueSize;
+    this.consumeDelay = (consumeDelay == null || consumeDelay < 0) ? 
DEFAULT_CONSUME_DELAY_MILLIS : consumeDelay;
     this.metricMapPath = metricMapPath;
   }
 
@@ -104,6 +110,9 @@ public boolean equals(Object o)
     if (maxQueueSize != that.maxQueueSize) {
       return false;
     }
+    if (consumeDelay != that.consumeDelay) {
+      return false;
+    }
     return metricMapPath != null ? metricMapPath.equals(that.metricMapPath)
                                  : that.metricMapPath == null;
   }
@@ -117,6 +126,7 @@ public int hashCode()
     result = 31 * result + readTimeout;
     result = 31 * result + flushThreshold;
     result = 31 * result + maxQueueSize;
+    result = 31 * result + (int) consumeDelay;
     result = 31 * result + (metricMapPath != null ? metricMapPath.hashCode() : 
0);
     return result;
   }
@@ -151,6 +161,11 @@ public int getMaxQueueSize()
     return maxQueueSize;
   }
 
+  public long getConsumeDelay()
+  {
+    return consumeDelay;
+  }
+
   public String getMetricMapPath()
   {
     return metricMapPath;
diff --git 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
index 2f93d7c2d12..8f7e3f7a506 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/main/java/org/apache/druid/emitter/opentsdb/OpentsdbSender.java
@@ -20,6 +20,7 @@
 package org.apache.druid.emitter.opentsdb;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -29,8 +30,10 @@
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class OpentsdbSender
@@ -40,28 +43,40 @@
    */
   private static final String PATH = "/api/put";
   private static final Logger log = new Logger(OpentsdbSender.class);
+  private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
 
   private final AtomicLong countLostEvents = new AtomicLong(0);
   private final int flushThreshold;
-  private final List<OpentsdbEvent> events;
   private final BlockingQueue<OpentsdbEvent> eventQueue;
+  private final ScheduledExecutorService scheduler;
+  private final EventConsumer eventConsumer;
+  private final long consumeDelay;
   private final Client client;
   private final WebResource webResource;
-  private final ExecutorService executor = Executors.newFixedThreadPool(1);
-  private volatile boolean running = true;
 
-  public OpentsdbSender(String host, int port, int connectionTimeout, int 
readTimeout, int flushThreshold, int maxQueueSize)
+  public OpentsdbSender(
+      String host,
+      int port,
+      int connectionTimeout,
+      int readTimeout,
+      int flushThreshold,
+      int maxQueueSize,
+      long consumeDelay
+  )
   {
     this.flushThreshold = flushThreshold;
-    events = new ArrayList<>(flushThreshold);
+    this.consumeDelay = consumeDelay;
     eventQueue = new ArrayBlockingQueue<>(maxQueueSize);
+    scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("OpentsdbEventSender-%s")
+        .build());
+    eventConsumer = new EventConsumer();
 
     client = Client.create();
     client.setConnectTimeout(connectionTimeout);
     client.setReadTimeout(readTimeout);
     webResource = client.resource("http://"; + host + ":" + port + PATH);
-
-    executor.execute(new EventConsumer());
   }
 
   public void enqueue(OpentsdbEvent event)
@@ -76,46 +91,70 @@ public void enqueue(OpentsdbEvent event)
     }
   }
 
+  public void start()
+  {
+    scheduler.scheduleWithFixedDelay(
+        eventConsumer,
+        consumeDelay,
+        consumeDelay,
+        TimeUnit.MILLISECONDS
+    );
+  }
+
   public void flush()
   {
-    sendEvents();
+    try {
+      EventConsumer flushConsumer = new EventConsumer();
+      Future future = scheduler.schedule(flushConsumer, 0, 
TimeUnit.MILLISECONDS);
+      future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+      // send remaining events which size may less than flushThreshold
+      eventConsumer.sendEvents();
+      flushConsumer.sendEvents();
+    }
+    catch (Exception e) {
+      log.warn(e, e.getMessage());
+    }
   }
 
   public void close()
   {
     flush();
     client.destroy();
-    running = false;
-    executor.shutdown();
+    scheduler.shutdown();
   }
 
-  private void sendEvents()
+  private class EventConsumer implements Runnable
   {
-    if (!events.isEmpty()) {
-      try {
-        webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
-      }
-      catch (Exception e) {
-        log.error(e, "send to opentsdb server failed");
-      }
-      finally {
-        events.clear();
-      }
+    private final List<OpentsdbEvent> events;
+
+    public EventConsumer()
+    {
+      events = new ArrayList<>(flushThreshold);
     }
-  }
 
-  private class EventConsumer implements Runnable
-  {
     @Override
     public void run()
     {
-      while (running) {
-        if (!eventQueue.isEmpty()) {
-          OpentsdbEvent event = eventQueue.poll();
-          events.add(event);
-          if (events.size() >= flushThreshold) {
-            sendEvents();
-          }
+      while (!eventQueue.isEmpty() && !scheduler.isShutdown()) {
+        OpentsdbEvent event = eventQueue.poll();
+        events.add(event);
+        if (events.size() >= flushThreshold) {
+          sendEvents();
+        }
+      }
+    }
+
+    public void sendEvents()
+    {
+      if (!events.isEmpty()) {
+        try {
+          webResource.entity(events, MediaType.APPLICATION_JSON_TYPE).post();
+        }
+        catch (Exception e) {
+          log.error(e, "error occurred when sending metrics to opentsdb 
server.");
+        }
+        finally {
+          events.clear();
         }
       }
     }
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
index da3b37d4d41..7b3f205c141 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/EventConverterTest.java
@@ -54,15 +54,15 @@ public void testConvert()
   {
     DateTime dateTime = DateTimes.nowUtc();
     ServiceMetricEvent configuredEvent = new ServiceMetricEvent.Builder()
-        .setDimension("dataSource", "data-source")
+        .setDimension("dataSource", "foo:bar")
         .setDimension("type", "groupBy")
         .build(dateTime, "query/time", 10)
-        .build("broker", "brokerHost1");
+        .build("druid:broker", "127.0.0.1:8080");
 
     Map<String, Object> expectedTags = new HashMap<>();
-    expectedTags.put("service", "broker");
-    expectedTags.put("host", "brokerHost1");
-    expectedTags.put("dataSource", "data-source");
+    expectedTags.put("service", "druid_broker");
+    expectedTags.put("host", "127.0.0.1_8080");
+    expectedTags.put("dataSource", "foo_bar");
     expectedTags.put("type", "groupBy");
 
     OpentsdbEvent opentsdbEvent = converter.convert(configuredEvent);
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
index b2633164394..11ccb9de571 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbEmitterConfigTest.java
@@ -39,7 +39,7 @@ public void setUp()
   @Test
   public void testSerDeserOpentsdbEmitterConfig() throws Exception
   {
-    OpentsdbEmitterConfig opentsdbEmitterConfig = new 
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, null);
+    OpentsdbEmitterConfig opentsdbEmitterConfig = new 
OpentsdbEmitterConfig("localhost", 9999, 2000, 2000, 200, 2000, 10000L, null);
     String opentsdbEmitterConfigString = 
mapper.writeValueAsString(opentsdbEmitterConfig);
     OpentsdbEmitterConfig expectedOpentsdbEmitterConfig = 
mapper.reader(OpentsdbEmitterConfig.class)
                                                                 
.readValue(opentsdbEmitterConfigString);
diff --git 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
index d28f04d526e..79c8ae52ce4 100644
--- 
a/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
+++ 
b/extensions-contrib/opentsdb-emitter/src/test/java/org/apache/druid/emitter/opentsdb/OpentsdbSenderTest.java
@@ -27,7 +27,7 @@
   @Test
   public void testUrl()
   {
-    OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 
100, 1000);
+    OpentsdbSender sender = new OpentsdbSender("localhost", 9999, 2000, 2000, 
100, 1000, 10000L);
     String expectedUrl = "http://localhost:9999/api/put";;
     Assert.assertEquals(expectedUrl, 
sender.getWebResource().getURI().toString());
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to