hellobabygogo closed pull request #6655: add ganglia-emitter
URL: https://github.com/apache/incubator-druid/pull/6655
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 1f442b78b4e..b13208d8ab8 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -281,7 +281,9 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer</argument>
<argument>-c</argument>
-
<argument>org.apache.druid.extensions.contrib:graphite-emitter</argument>
+
<argument>io.druid.extensions.contrib:ganglia-emitter</argument>
+ <argument>-c</argument>
+
<argument>io.druid.extensions.contrib:druid-opentsdb-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-opentsdb-emitter</argument>
<argument>-c</argument>
diff --git a/extensions-contrib/ganglia-emitter/pom.xml
b/extensions-contrib/ganglia-emitter/pom.xml
new file mode 100644
index 00000000000..5c4f6bf313a
--- /dev/null
+++ b/extensions-contrib/ganglia-emitter/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>druid</artifactId>
+ <groupId>io.druid</groupId>
+ <version>0.13.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ganglia-emitter</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>druid-api</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>java-util</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>info.ganglia.gmetric4j</groupId>
+ <artifactId>gmetric4j</artifactId>
+ <version>1.0.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/DimensionConverter.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/DimensionConverter.java
new file mode 100644
index 00000000000..4ad51225463
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/DimensionConverter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.emitter.ganglia;
+
+import com.google.common.collect.ImmutableList;
+import io.druid.java.util.common.logger.Logger;
+
+import java.util.Map;
+
+/**
+ * Created by yangxuan on 2018/9/6.
+ */
+public class DimensionConverter
+{
+ private static final Logger log = new Logger(DimensionConverter.class);
+
+ public GangliaMetric addFilteredUserDims(
+ String service,
+ String metric,
+ Map<String, Object> userDims,
+ ImmutableList.Builder<String> builder,
+ Map<String, GangliaMetric> metricMap
+ )
+ {
+ /*
+ Find the metric in the map. If we cant find it try to look it up
prefixed by the service name.
+ This is because some metrics are reported differently, but with the
same name, from different services.
+ */
+ GangliaMetric gangliaMetric = null;
+ if (metricMap.containsKey(metric)) {
+ gangliaMetric = metricMap.get(metric);
+ log.debug("metric : " + metric + " gangliaMetric.dimensions:" +
gangliaMetric.dimensions + " dim: " + userDims);
+ } else if (metricMap.containsKey(service + "-" + metric)) {
+ gangliaMetric = metricMap.get(service + "-" + metric);
+ log.debug(" server metric : " + metric + " gangliaMetric.dimensions:" +
gangliaMetric.dimensions + " dim: " + userDims);
+ } else {
+ log.debug("not found");
+ }
+ if (gangliaMetric != null) {
+ for (String dim : gangliaMetric.dimensions) {
+ if (userDims.containsKey(dim)) {
+ builder.add(userDims.get(dim).toString());
+ }
+ }
+ return gangliaMetric;
+ }
+ return null;
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitter.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitter.java
new file mode 100644
index 00000000000..ab52ae9b7dc
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitter.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.emitter.ganglia;
+
+import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import info.ganglia.gmetric4j.gmetric.GMetric;
+import info.ganglia.gmetric4j.gmetric.GangliaException;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.java.util.emitter.core.Emitter;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by yangxuan on 2018/9/6.
+ */
+public class GangliaEmitter implements Emitter
+{
+ private static final Logger log = new Logger(GangliaEmitter.class);
+ private static final String DRUID_METRIC_SEPARATOR = "\\/";
+ private static final String STATSD_SEPARATOR = ":|\\|";
+ private static final String BLANK = "\\s+";
+
+ private final GangliaEmitterConfig config;
+ private final MetricRegistry registry;
+ private GMetric ganglia;
+ private final DimensionConverter converter;
+ private volatile ScheduledExecutorService scheduledExecutor;
+ private final ObjectMapper mapper;
+ private final LinkedBlockingQueue<GangliaEvent> eventsQueue;
+ private AtomicLong countLostEvents = new AtomicLong(0);
+ private final ScheduledExecutorService exec =
Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("GangliaEmitter-%s")
+ .build()); // Thread pool of two in order to schedule flush runnable
+
+ private Map<String, GangliaMetric> metricMap = new HashMap<>();
+
+ public GangliaEmitter(GangliaEmitterConfig config, ObjectMapper mapper)
+ {
+ this.config = config;
+ this.registry = new MetricRegistry();
+ this.converter = new DimensionConverter();
+ this.mapper = mapper;
+ this.eventsQueue = new LinkedBlockingQueue(config.getMaxQueueSize());
+ }
+
+ @Override
+ public void start()
+ {
+ try {
+ String spoof = InetAddress.getLocalHost().getHostAddress() + ":" +
InetAddress.getLocalHost().getCanonicalHostName().split("\\.")[0];
+ log.info("Starting Ganglia Emitter");
+
+ this.ganglia = new GMetric(
+ config.getHostname(),
+ config.getPort(),
+ GMetric.UDPAddressingMode.UNICAST,
+ 0,
+ true,
+ null,
+ spoof
+ );
+
+ //load config
+ scheduledExecutor =
Execs.scheduledSingleThreaded("ganglia_metrics_load__scheduled_%d");
+ scheduledExecutor.scheduleAtFixedRate(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ metricMap = readMap(mapper, config.getDimensionMapPath());
+ }
+ catch (Exception e) {
+ log.error(e.toString());
+ }
+ }
+ }, 0L, config.getLoadPeriod(), TimeUnit.MILLISECONDS
+ );
+
+ exec.scheduleAtFixedRate(
+ new ConsumerRunnable(),
+ config.getFlushPeriod(),
+ config.getFlushPeriod(),
+ TimeUnit.MILLISECONDS
+ );
+ log.info("constructed GMetric. host=[%s] port=[%s]",
config.getHostname(), config.getPort());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void emit(Event event)
+ {
+ if (event instanceof ServiceMetricEvent) {
+ ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
+ String host = metricEvent.getHost();
+ String service = metricEvent.getService();
+ String metric = metricEvent.getMetric();
+ Map<String, Object> userDims = metricEvent.getUserDims();
+ ImmutableList.Builder<String> nameBuilder = new
ImmutableList.Builder<>();
+ if (config.getIncludeHost()) {
+ nameBuilder.add(host);
+ }
+ nameBuilder.add(service);
+ GangliaMetric gangliaMetric = converter.addFilteredUserDims(service,
metric, userDims, nameBuilder, metricMap);
+ if (gangliaMetric == null) {
+ log.debug("Metric=[%s] has no ganglia type mapping", gangliaMetric);
+ return;
+ }
+ nameBuilder.add(metric);
+ String fullName = Joiner.on(config.getSeparator())
+ .join(nameBuilder.build())
+ .replaceAll(DRUID_METRIC_SEPARATOR,
config.getSeparator())
+ .replaceAll(STATSD_SEPARATOR,
config.getSeparator())
+ .replaceAll(BLANK, config.getBlankHolder());
+ fullName = sanitize(fullName);
+ GangliaEvent gangliaEvent = new GangliaEvent(fullName,
metricEvent.getValue(), service.replaceAll("/", "_"));
+ try {
+ final boolean successful = eventsQueue.offer(
+ gangliaEvent,
+ config.getEmitWaitTime(),
+ TimeUnit.MILLISECONDS
+ );
+ if (!successful) {
+ if (countLostEvents.getAndIncrement() % 1000 == 0) {
+ log.error(
+ "Lost total of [%s] events because of emitter queue is full.
Please increase the capacity or/and the consumer frequency",
+ countLostEvents.get()
+ );
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ log.error(e, "got interrupted with message [%s]", e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ log.info("Closing Ganglia Emitter");
+ ganglia.close();
+ }
+
+ private String sanitize(String toClean)
+ {
+ return toClean.replaceAll("[^A-Za-z0-9:_.-]+", "");
+ }
+
+ private Map<String, GangliaMetric> readMap(ObjectMapper mapper, String
dimensionMapPath)
+ {
+ try {
+ InputStream is;
+ if (Strings.isNullOrEmpty(dimensionMapPath)) {
+ log.info("Using default ganglia metric dimension and types");
+ is =
this.getClass().getClassLoader().getResourceAsStream("defaultMetricDimensions.json");
+ } else {
+ log.info("Using ganglia metric dimensions at types at [%s]",
dimensionMapPath);
+ is = new FileInputStream(new File(dimensionMapPath));
+ }
+ return mapper.reader(new TypeReference<Map<String, GangliaMetric>>()
+ {
+ }).readValue(is);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ throw new ISE(e, "Failed to parse ganglia metric dimensions and types");
+ }
+ }
+
+ private class ConsumerRunnable implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ log.info("eventQueu size :" + eventsQueue.size());
+ while (eventsQueue.size() > 0 && !exec.isShutdown()) {
+ try {
+ final GangliaEvent gangliaEvent = eventsQueue.poll(
+ config.getWaitForEventTime(),
+ TimeUnit.MILLISECONDS
+ );
+ if (gangliaEvent != null) {
+ try {
+ ganglia.announce(
+ gangliaEvent.getName(),
+ gangliaEvent.getValue().longValue(),
+ gangliaEvent.getService()
+ );
+ }
+ catch (GangliaException e) {
+ log.error(e, "Dropping event [%s]. Unable to send to Ganglia.",
gangliaEvent);
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ log.debug(e, e.getMessage());
+ log.info("Ganglia connection broken for: %s", e.getMessage());
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterConfig.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterConfig.java
new file mode 100644
index 00000000000..21d2f61ae22
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterConfig.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.emitter.ganglia;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Created by yangxuan on 2018/9/6.
+ */
+public class GangliaEmitterConfig
+{
+ private static final Long DEFAULT_LOAD_PERIOD = (long) (60 * 1000); // load
every one minute
+ private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); //
flush every one minute
+ private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for
get operations on the queue 1 sec
+ @JsonProperty
+ private final String hostname;
+ @JsonProperty
+ private final int port;
+ @JsonProperty
+ private final String dimensionMapPath;
+ @JsonProperty
+ private final Boolean includeHost;
+ @JsonProperty
+ private final String separator;
+ @JsonProperty
+ private final String blankHolder;
+ @JsonProperty
+ private final Long loadPeriod;
+ @JsonProperty
+ private final Long flushPeriod;
+ @JsonProperty
+ private final Integer maxQueueSize;
+ @JsonProperty
+ private final Long waitForEventTime;
+ @JsonProperty
+ private final Long emitWaitTime;
+ //waiting up to the specified wait time if necessary for an event to become
available.
+
+ @JsonCreator
+ public GangliaEmitterConfig(
+ @JsonProperty("hostname") String hostname,
+ @JsonProperty("port") Integer port,
+ @JsonProperty("dimensionMapPath") String dimensionMapPath,
+ @JsonProperty("includeHost") Boolean includeHost,
+ @JsonProperty("separator") String separator,
+ @JsonProperty("blankHolder") String blankHolder,
+ @JsonProperty("loadPeriod") Long loadPeriod,
+ @JsonProperty("flushPeriod") Long flushPeriod,
+ @JsonProperty("maxQueueSize") Integer maxQueueSize,
+ @JsonProperty("emitWaitTime") Long emitWaitTime,
+ @JsonProperty("waitForEventTime") Long waitForEventTime
+ )
+ {
+ this.hostname = hostname;
+ this.port = port;
+ this.dimensionMapPath = dimensionMapPath;
+ this.includeHost = includeHost != null ? includeHost : false;
+ this.separator = separator != null ? separator : ".";
+ this.blankHolder = blankHolder != null ? blankHolder : "-";
+ this.loadPeriod = loadPeriod == null ? DEFAULT_LOAD_PERIOD : loadPeriod;
+ this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD :
flushPeriod;
+ this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE :
maxQueueSize;
+ this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT :
waitForEventTime;
+ this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GangliaEmitterConfig that = (GangliaEmitterConfig) o;
+ if (separator != null ? !separator.equals(that.separator) : that.separator
!= null) {
+ return false;
+ }
+ if (includeHost != null ? !includeHost.equals(that.includeHost) :
that.includeHost != null) {
+ return false;
+ }
+ if (!getLoadPeriod().equals(that.getLoadPeriod())) {
+ return false;
+ }
+ if (!getFlushPeriod().equals(that.getFlushPeriod())) {
+ return false;
+ }
+ if (!getMaxQueueSize().equals(that.getMaxQueueSize())) {
+ return false;
+ }
+ if (!getWaitForEventTime().equals(that.getWaitForEventTime())) {
+ return false;
+ }
+ return dimensionMapPath != null ?
dimensionMapPath.equals(that.dimensionMapPath) : that.dimensionMapPath == null;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = separator != null ? separator.hashCode() : 0;
+ result = 31 * result + (includeHost != null ? includeHost.hashCode() : 0);
+ result = 31 * result + getLoadPeriod().hashCode();
+ result = 31 * result + getFlushPeriod().hashCode();
+ result = 31 * result + getMaxQueueSize().hashCode();
+ result = 31 * result + (dimensionMapPath != null ?
dimensionMapPath.hashCode() : 0);
+ result = 31 * result + (blankHolder != null ? blankHolder.hashCode() : 0);
+ result = 31 * result + getWaitForEventTime().hashCode();
+ result = 31 * result + getEmitWaitTime().hashCode();
+ return result;
+ }
+
+ @JsonProperty
+ public String getHostname()
+ {
+ return hostname;
+ }
+
+ @JsonProperty
+ public int getPort()
+ {
+ return port;
+ }
+
+ @JsonProperty
+ public String getDimensionMapPath()
+ {
+ return dimensionMapPath;
+ }
+
+ @JsonProperty
+ public Boolean getIncludeHost()
+ {
+ return includeHost;
+ }
+
+ @JsonProperty
+ public String getSeparator()
+ {
+ return separator;
+ }
+
+ @JsonProperty
+ public String getBlankHolder()
+ {
+ return blankHolder;
+ }
+
+ @JsonProperty
+ public Long getLoadPeriod()
+ {
+ return loadPeriod;
+ }
+
+ @JsonProperty
+ public Long getFlushPeriod()
+ {
+ return flushPeriod;
+ }
+
+ @JsonProperty
+ public Integer getMaxQueueSize()
+ {
+ return maxQueueSize;
+ }
+
+ @JsonProperty
+ public Long getWaitForEventTime()
+ {
+ return waitForEventTime;
+ }
+
+ @JsonProperty
+ public Long getEmitWaitTime()
+ {
+ return emitWaitTime;
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterModule.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterModule.java
new file mode 100644
index 00000000000..6e33a7c48dc
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEmitterModule.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.emitter.ganglia;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.ManageLifecycle;
+import io.druid.initialization.DruidModule;
+import io.druid.java.util.emitter.core.Emitter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by yangxuan on 2018/9/6.
+ */
+public class GangliaEmitterModule implements DruidModule
+{
+ private static final String EMITTER_TYPE = "ganglia";
+
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE,
GangliaEmitterConfig.class);
+ }
+
+ @Provides
+ @ManageLifecycle
+ @Named(EMITTER_TYPE)
+ public Emitter getEmitter(GangliaEmitterConfig gangliaEmitterConfig,
ObjectMapper mapper)
+ throws IOException
+ {
+ return new GangliaEmitter(gangliaEmitterConfig, mapper);
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEvent.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEvent.java
new file mode 100644
index 00000000000..2a1623ee4b6
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.emitter.ganglia;
+
+import com.google.common.base.Preconditions;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Created by yangxuan on 2018/9/11.
+ */
+public class GangliaEvent
+{
+ private final String name;
+ private final Number value;
+ private final String service;
+
+ GangliaEvent(@NotNull String name, Number value, @NotNull String service)
+ {
+ this.name = Preconditions.checkNotNull(name, "name can not be null");
+ this.value = value;
+ this.service = Preconditions.checkNotNull(service, "service can not be
null");
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public Number getValue()
+ {
+ return value;
+ }
+
+ public String getService()
+ {
+ return service;
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaMetric.java
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaMetric.java
new file mode 100644
index 00000000000..e4b8572db28
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/java/io/druid/emitter/ganglia/GangliaMetric.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.emitter.ganglia;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.SortedSet;
+
+/**
+ * Created by yangxuan on 2018/9/6.
+ */
+public class GangliaMetric
+{
+ public final SortedSet<String> dimensions;
+ public final boolean convertRange;
+
+ @JsonCreator
+ public GangliaMetric(
+ @JsonProperty("dimensions") SortedSet<String> dimensions,
+ @JsonProperty("convertRange") boolean convertRange
+ )
+ {
+ this.dimensions = dimensions;
+ this.convertRange = convertRange;
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/main/resources/META-INF.services/io.druid.initialization.DruidModule
b/extensions-contrib/ganglia-emitter/src/main/resources/META-INF.services/io.druid.initialization.DruidModule
new file mode 100644
index 00000000000..76d0d8c2b4f
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/resources/META-INF.services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.emitter.ganglia.GangliaEmitterModule
\ No newline at end of file
diff --git
a/extensions-contrib/ganglia-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/ganglia-emitter/src/main/resources/defaultMetricDimensions.json
new file mode 100644
index 00000000000..0b107eeec39
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/main/resources/defaultMetricDimensions.json
@@ -0,0 +1,424 @@
+{
+ "query/time": {
+ "dimensions": [
+ "dataSource",
+ "type"
+ ]
+ },
+ "query/node/time": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "query/node/ttfb": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "query/intervalChunk/time": {
+ "dimensions": []
+ },
+ "query/segment/time": {
+ "dimensions": []
+ },
+ "query/wait/time": {
+ "dimensions": []
+ },
+ "segment/scan/pending": {
+ "dimensions": []
+ },
+ "query/segmentAndCache/time": {
+ "dimensions": []
+ },
+ "query/cpu/time": {
+ "dimensions": [
+ "dataSource",
+ "type"
+ ]
+ },
+ "query/cache/delta/numEntries": {
+ "dimensions": []
+ },
+ "query/cache/delta/sizeBytes": {
+ "dimensions": []
+ },
+ "query/cache/delta/hits": {
+ "dimensions": []
+ },
+ "query/cache/delta/misses": {
+ "dimensions": []
+ },
+ "query/cache/delta/evictions": {
+ "dimensions": []
+ },
+ "query/cache/delta/hitRate": {
+ "dimensions": [],
+ "convertRange": true
+ },
+ "query/cache/delta/averageBytes": {
+ "dimensions": []
+ },
+ "query/cache/delta/timeouts": {
+ "dimensions": []
+ },
+ "query/cache/delta/errors": {
+ "dimensions": []
+ },
+ "query/cache/total/numEntries": {
+ "dimensions": []
+ },
+ "query/cache/total/sizeBytes": {
+ "dimensions": []
+ },
+ "query/cache/total/hits": {
+ "dimensions": []
+ },
+ "query/cache/total/misses": {
+ "dimensions": []
+ },
+ "query/cache/total/evictions": {
+ "dimensions": []
+ },
+ "query/cache/total/hitRate": {
+ "dimensions": [],
+ "convertRange": true
+ },
+ "query/cache/total/averageBytes": {
+ "dimensions": []
+ },
+ "query/cache/total/timeouts": {
+ "dimensions": []
+ },
+ "query/cache/total/errors": {
+ "dimensions": []
+ },
+ "ingest/events/thrownAway": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/events/unparseable": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/events/processed": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/rows/output": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/persist/count": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/persist/time": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/persist/cpu": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/persist/backPressure": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/persist/failed": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/handoff/failed": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/merge/time": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "ingest/merge/cpu": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "task/run/time": {
+ "dimensions": [
+ "dataSource",
+ "taskType"
+ ]
+ },
+ "segment/added/bytes": {
+ "dimensions": [
+ "dataSource",
+ "taskType"
+ ]
+ },
+ "segment/moved/bytes": {
+ "dimensions": [
+ "dataSource",
+ "taskType"
+ ]
+ },
+ "segment/nuked/bytes": {
+ "dimensions": [
+ "dataSource",
+ "taskType"
+ ]
+ },
+ "segment/assigned/count": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/moved/count": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/dropped/count": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/deleted/count": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/unneeded/count": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/cost/raw": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/cost/normalization": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/cost/normalized": {
+ "dimensions": [
+ "tier"
+ ]
+ },
+ "segment/loadQueue/size": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "segment/loadQueue/failed": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "segment/loadQueue/count": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "segment/dropQueue/count": {
+ "dimensions": [
+ "server"
+ ]
+ },
+ "segment/size": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "segment/overShadowed/count": {
+ "dimensions": []
+ },
+ "segment/max": {
+ "dimensions": []
+ },
+ "segment/used": {
+ "dimensions": [
+ "dataSource",
+ "tier",
+ "priority"
+ ]
+ },
+ "segment/usedPercent": {
+ "dimensions": [
+ "dataSource",
+ "tier",
+ "priority"
+ ],
+ "convertRange": true
+ },
+ "jvm/pool/committed": {
+ "dimensions": [
+ "poolKind",
+ "poolName"
+ ]
+ },
+ "jvm/pool/init": {
+ "dimensions": [
+ "poolKind",
+ "poolName"
+ ]
+ },
+ "jvm/pool/max": {
+ "dimensions": [
+ "poolKind",
+ "poolName"
+ ]
+ },
+ "jvm/pool/used": {
+ "dimensions": [
+ "poolKind",
+ "poolName"
+ ]
+ },
+ "jvm/bufferpool/count": {
+ "dimensions": [
+ "bufferPoolName"
+ ]
+ },
+ "jvm/bufferpool/used": {
+ "dimensions": [
+ "bufferPoolName"
+ ]
+ },
+ "jvm/bufferpool/capacity": {
+ "dimensions": [
+ "bufferPoolName"
+ ]
+ },
+ "jvm/mem/init": {
+ "dimensions": [
+ "memKind"
+ ]
+ },
+ "jvm/mem/max": {
+ "dimensions": [
+ "memKind"
+ ]
+ },
+ "jvm/mem/used": {
+ "dimensions": [
+ "memKind"
+ ]
+ },
+ "jvm/mem/committed": {
+ "dimensions": [
+ "memKind"
+ ]
+ },
+ "jvm/gc/count": {
+ "dimensions": [
+ "gcName"
+ ]
+ },
+ "jvm/gc/time": {
+ "dimensions": [
+ "gcName"
+ ]
+ },
+ "ingest/events/buffered": {
+ "dimensions": [
+ "serviceName, bufferCapacity"
+ ]
+ },
+ "sys/swap/free": {
+ "dimensions": []
+ },
+ "sys/swap/max": {
+ "dimensions": []
+ },
+ "sys/swap/pageIn": {
+ "dimensions": []
+ },
+ "sys/swap/pageOut": {
+ "dimensions": []
+ },
+ "sys/disk/write/count": {
+ "dimensions": [
+ "fsDevName"
+ ]
+ },
+ "sys/disk/read/count": {
+ "dimensions": [
+ "fsDevName"
+ ]
+ },
+ "sys/disk/write/size": {
+ "dimensions": [
+ "fsDevName"
+ ]
+ },
+ "sys/disk/read/size": {
+ "dimensions": [
+ "fsDevName"
+ ]
+ },
+ "sys/net/write/size": {
+ "dimensions": []
+ },
+ "sys/net/read/size": {
+ "dimensions": []
+ },
+ "sys/fs/used": {
+ "dimensions": [
+ "fsDevName",
+ "fsDirName",
+ "fsTypeName",
+ "fsSysTypeName",
+ "fsOptions"
+ ]
+ },
+ "sys/fs/max": {
+ "dimensions": [
+ "fsDevName",
+ "fsDirName",
+ "fsTypeName",
+ "fsSysTypeName",
+ "fsOptions"
+ ]
+ },
+ "sys/mem/used": {
+ "dimensions": []
+ },
+ "sys/mem/max": {
+ "dimensions": []
+ },
+ "sys/storage/used": {
+ "dimensions": [
+ "fsDirName"
+ ]
+ },
+ "sys/cpu": {
+ "dimensions": [
+ "cpuName",
+ "cpuTime"
+ ]
+ },
+ "coordinator-segment/count": {
+ "dimensions": [
+ "dataSource"
+ ]
+ },
+ "historical-segment/count": {
+ "dimensions": [
+ "dataSource",
+ "tier",
+ "priority"
+ ]
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/DimensionConverterTest.java
b/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/DimensionConverterTest.java
new file mode 100644
index 00000000000..80197086ef8
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/DimensionConverterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package java.io.druid.emitter.ganglia;
+import com.google.common.collect.ImmutableList;
+import io.druid.emitter.ganglia.DimensionConverter;
+import io.druid.emitter.ganglia.GangliaMetric;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Created by yangxuan on 2018/9/10.
+ */
+public class DimensionConverterTest
+{
+ @Test
+ public void testConvert() throws Exception
+ {
+ DimensionConverter dimensionConverter = new DimensionConverter();
+ String service = "127.0.0.1:8090";
+ String metric = "jvm/gc/count";
+
+
+ Map<String, Object> userDims = new HashMap<>();
+ userDims.put("gcName", "yong");
+ ImmutableList.Builder<String> nameBuilder = new ImmutableList.Builder<>();
+ nameBuilder.add(service);
+ nameBuilder.add(metric);
+ Map<String, GangliaMetric> metricMap = new HashMap<>();
+ SortedSet<String> dimensions = new TreeSet<>();
+ dimensions.add("gcName");
+ GangliaMetric ganglia = new GangliaMetric(dimensions, false);
+ metricMap.put("jvm/gc/count", ganglia);
+ Map<String, GangliaMetric> map = new HashMap<>();
+ GangliaMetric gangliaMetric =
dimensionConverter.addFilteredUserDims(service, metric, userDims, nameBuilder,
map);
+
+ Assert.assertNull(gangliaMetric);
+
+ map.put("jvm/gc/count", ganglia);
+ gangliaMetric = dimensionConverter.addFilteredUserDims(service, metric,
userDims, nameBuilder, map);
+ Assert.assertEquals("gcName", gangliaMetric.dimensions.first());
+
+ //String re =
"druid.broker.jvm.gc.count.[young].[cms]".replaceAll("[^A-Za-z0-9:_.-]+", "");
+ //System.out.println(re);
+ }
+}
diff --git
a/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/GangliaEmitterTest.java
b/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/GangliaEmitterTest.java
new file mode 100644
index 00000000000..16155a71d08
--- /dev/null
+++
b/extensions-contrib/ganglia-emitter/src/test/java/io/druid/emitter/ganglia/GangliaEmitterTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package java.io.druid.emitter.ganglia;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.emitter.ganglia.GangliaEmitter;
+import io.druid.emitter.ganglia.GangliaEmitterConfig;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.junit.Test;
+
+/**
+ * Created by yangxuan on 2018/9/11.
+ */
+public class GangliaEmitterTest
+{
+ @Test
+ public void testEmitter()
+ {
+ GangliaEmitterConfig config = new GangliaEmitterConfig(
+ "localhost",
+ 10090,
+ null,
+ false,
+ null,
+ null,
+ 60000L,
+ 1000L,
+ 600000,
+ 1000L,
+ 0L
+ );
+ GangliaEmitter emitter = new GangliaEmitter(config, new ObjectMapper());
+ emitter.emit(new ServiceMetricEvent.Builder()
+ .setDimension("dataSource", "data-source")
+ .setDimension("type", "groupBy")
+ .setDimension("interval", "2013/2015")
+ .setDimension("some_random_dim1", "random_dim_value1")
+ .setDimension("some_random_dim2", "random_dim_value2")
+ .setDimension("hasFilters", "no")
+ .setDimension("duration", "P1D")
+ .setDimension("remoteAddress", "194.0.90.2")
+ .setDimension("id", "ID")
+ .setDimension("context", "{context}")
+ .build(DateTimes.nowUtc(), "query/time", 10)
+ .build("broker", "brokerHost1"));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 4c2938ebad2..775ef0b7e1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
<module>extensions-contrib/opentsdb-emitter</module>
<module>extensions-contrib/materialized-view-maintenance</module>
<module>extensions-contrib/materialized-view-selection</module>
+ <module>extensions-contrib/ganglia-emitter</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]