This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 11a8624ef17 allow for kafka-emitter to have extra dimensions be set
for each event it emits (#15845)
11a8624ef17 is described below
commit 11a8624ef17dc6b7c264ef3ec162485fa21dce69
Author: Tom <[email protected]>
AuthorDate: Thu Feb 8 22:55:24 2024 -0800
allow for kafka-emitter to have extra dimensions be set for each event it
emits (#15845)
* allow for kafka-emitter to have extra dimensions be set for each event it
emits
* fix checktsyle issue in kafkaemitterconfig
* make changes to fix docs, and cleanup copy paste error in #toString()
* undo formatting to markdown table
* add more branches so test passes
* fix checkstyle issue
---
.../development/extensions-contrib/kafka-emitter.md | 2 ++
.../apache/druid/emitter/kafka/KafkaEmitter.java | 21 ++++++++++++++++-----
.../druid/emitter/kafka/KafkaEmitterConfig.java | 21 +++++++++++++++++----
.../druid/emitter/kafka/KafkaEmitterConfigTest.java | 5 ++++-
.../druid/emitter/kafka/KafkaEmitterTest.java | 2 +-
5 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/docs/development/extensions-contrib/kafka-emitter.md
b/docs/development/extensions-contrib/kafka-emitter.md
index 40b63ca73af..a3641e89cec 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -46,6 +46,7 @@ All the configuration parameters for the Kafka emitter are
under `druid.emitter.
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for
emitter's target to emit segment metadata. If `event.types` contains
`segment_metadata`, this field cannot be empty. | no | none
|
| `druid.emitter.kafka.producer.config` | JSON configuration to
set additional properties to Kafka producer.
| no | none |
| `druid.emitter.kafka.clusterName` | Optional value to
specify the name of your Druid cluster. It can help make groups in your
monitoring environment. | no | none
|
+| `druid.emitter.kafka.extra.dimensions` | Optional JSON configuration to
specify a map of extra string dimensions for the events emitted. These can help
make groups in your monitoring environment. | no | none |
### Example
@@ -57,5 +58,6 @@ druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
+druid.emitter.kafka.extra.dimensions={"region":"us-east-1","environment":"preProd"}
```
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 661543f707c..a0cb1a9afe1 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -198,11 +198,7 @@ public class KafkaEmitter implements Emitter
if (event != null) {
try {
EventMap map = event.toMap();
- if (config.getClusterName() != null) {
- map = map.asBuilder()
- .put("clusterName", config.getClusterName())
- .build();
- }
+ map = addExtraDimensionsToEvent(map);
String resultJson = jsonMapper.writeValueAsString(map);
@@ -239,6 +235,21 @@ public class KafkaEmitter implements Emitter
}
}
+ private EventMap addExtraDimensionsToEvent(EventMap map)
+ {
+ if (config.getClusterName() != null || config.getExtraDimensions() !=
null) {
+ EventMap.Builder eventMapBuilder = map.asBuilder();
+ if (config.getClusterName() != null) {
+ eventMapBuilder.put("clusterName", config.getClusterName());
+ }
+ if (config.getExtraDimensions() != null) {
+ eventMapBuilder.putAll(config.getExtraDimensions());
+ }
+ map = eventMapBuilder.build();
+ }
+ return map;
+ }
+
@Override
public void flush()
{
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index c7038079aa4..86f636e853d 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -62,7 +62,7 @@ public class KafkaEmitterConfig
public static final Set<EventType> DEFAULT_EVENT_TYPES =
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
- @Nullable @JsonProperty("event.types")
+ @Nonnull @JsonProperty("event.types")
private final Set<EventType> eventTypes;
@Nullable @JsonProperty("metric.topic")
private final String metricTopic;
@@ -72,8 +72,10 @@ public class KafkaEmitterConfig
private final String requestTopic;
@Nullable @JsonProperty("segmentMetadata.topic")
private final String segmentMetadataTopic;
- @JsonProperty
+ @Nullable @JsonProperty
private final String clusterName;
+ @Nullable @JsonProperty("extra.dimensions")
+ private final Map<String, String> extraDimensions;
@JsonProperty("producer.config")
private final Map<String, String> kafkaProducerConfig;
@JsonProperty("producer.hiddenProperties")
@@ -87,7 +89,8 @@ public class KafkaEmitterConfig
@Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
@Nullable @JsonProperty("segmentMetadata.topic") String
segmentMetadataTopic,
- @JsonProperty("clusterName") String clusterName,
+ @Nullable @JsonProperty("clusterName") String clusterName,
+ @Nullable @JsonProperty("extra.dimensions") Map<String, String>
extraDimensions,
@JsonProperty("producer.config") @Nullable Map<String, String>
kafkaProducerConfig,
@JsonProperty("producer.hiddenProperties") @Nullable
DynamicConfigProvider<String> kafkaProducerSecrets
)
@@ -99,10 +102,12 @@ public class KafkaEmitterConfig
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ?
Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can
not be null") : null;
this.segmentMetadataTopic =
this.eventTypes.contains(EventType.SEGMENT_METADATA) ?
Preconditions.checkNotNull(segmentMetadataTopic,
"druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
this.clusterName = clusterName;
+ this.extraDimensions = extraDimensions;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of()
: kafkaProducerConfig;
this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new
MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
}
+ @Nonnull
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes,
String requestTopic)
{
// Unless explicitly overridden, kafka emitter will always emit metrics
and alerts
@@ -143,12 +148,18 @@ public class KafkaEmitterConfig
return alertTopic;
}
- @JsonProperty
+ @Nullable @JsonProperty
public String getClusterName()
{
return clusterName;
}
+ @Nullable
+ public Map<String, String> getExtraDimensions()
+ {
+ return extraDimensions;
+ }
+
@Nullable
public String getRequestTopic()
{
@@ -228,6 +239,7 @@ public class KafkaEmitterConfig
result = 31 * result + (getRequestTopic() != null ?
getRequestTopic().hashCode() : 0);
result = 31 * result + (getSegmentMetadataTopic() != null ?
getSegmentMetadataTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ?
getClusterName().hashCode() : 0);
+ result = 31 * result + (getExtraDimensions() != null ?
getExtraDimensions().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
return result;
@@ -244,6 +256,7 @@ public class KafkaEmitterConfig
", request.topic='" + requestTopic + '\'' +
", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
", clusterName='" + clusterName + '\'' +
+ ", extra.dimensions='" + extraDimensions + '\'' +
", producer.config=" + kafkaProducerConfig + '\'' +
", producer.hiddenProperties=" + kafkaProducerSecrets +
'}';
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 603c8e6701b..fb10868d362 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -58,6 +58,7 @@ public class KafkaEmitterConfigTest
"requestTest",
"metadataTest",
"clusterNameTest",
+ ImmutableMap.of("env", "preProd"),
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
@@ -79,6 +80,7 @@ public class KafkaEmitterConfigTest
null,
"metadataTest",
"clusterNameTest",
+ null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
@@ -102,6 +104,7 @@ public class KafkaEmitterConfigTest
null,
"metadataTest",
"clusterNameTest",
+ null,
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build(),
DEFAULT_PRODUCER_SECRETS
@@ -117,7 +120,7 @@ public class KafkaEmitterConfigTest
{
KafkaEmitterConfig kafkaEmitterConfig = new
KafkaEmitterConfig("localhost:9092", null, "metricTest",
"alertTest", null, "metadataTest",
-
"clusterNameTest", null, null
+
"clusterNameTest", null, null, null
);
try {
@SuppressWarnings("unused")
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 1c30bee12fa..228982b6cd7 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -106,7 +106,7 @@ public class KafkaEmitterTest
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
- new KafkaEmitterConfig("", eventsType, "metrics", "alerts",
requestTopic, "metadata", "test-cluster", null, null),
+ new KafkaEmitterConfig("", eventsType, "metrics", "alerts",
requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId",
"cluster-101"), null, null),
mapper
)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]