abhishekrb19 commented on code in PR #14281:
URL: https://github.com/apache/druid/pull/14281#discussion_r1198427132
##########
.idea/misc.xml:
##########
@@ -40,6 +40,11 @@
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
+ <option name="ignoredFiles">
+ <set>
+ <option value="$PROJECT_DIR$/integration-tests/pom.xml" />
Review Comment:
@harinirajendran, it looks like these changes accidentally slipped in?
##########
docs/development/extensions-contrib/kafka-emitter.md:
##########
@@ -36,20 +36,25 @@ to monitor the status of your Druid cluster with this
extension.
All the configuration parameters for the Kafka emitter are under
`druid.emitter.kafka`.
-|property|description|required?|default|
-|--------|-----------|---------|-------|
-|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker.
(`[hostname:port],[hostname:port]...`)|yes|none|
-|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to
emit service metric.|yes|none|
-|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to
emit alert.|yes|none|
-|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to
emit request logs. If left empty then request logs will not be sent to the
Kafka topic.|no|none|
-|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user
want to set additional properties to Kafka producer.|no|none|
-|`druid.emitter.kafka.clusterName`|Optional value to specify name of your
druid cluster. It can help make groups in your monitoring environment. |no|none|
+| Property | Description
| Required | Default |
+|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
+| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka
broker. (`[hostname:port],[hostname:port]...`)
| yes | none |
+| `druid.emitter.kafka.event.types` | Comma-separated event
types. <br/>Supported types are `alerts`, `metrics`, `requests`, and
`segmentMetadata`.
| no | `["metrics", "alerts"]` |
+| `druid.emitter.kafka.metric.topic` | Kafka topic name for
emitter's target to emit service metric. If `event.types` contains `metrics`,
this field cannot be empty.
| no | none |
+| `druid.emitter.kafka.alert.topic` | Kafka topic name for
emitter's target to emit alerts. If `event.types` contains `alerts`, this field
cannot empty. |
no | none |
+| `druid.emitter.kafka.request.topic` | Kafka topic name for
emitter's target to emit request logs. If `event.types` contains `requests`,
this field cannot be empty.
| no | none |
+| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for
emitter's target to emit segments related metadata. If `event.types` contains
`segmentMetadata`, this field cannot be empty.
| no | none |
+| `druid.emitter.kafka.producer.config` | JSON formatted
configuration to set additional properties to Kafka producer.
| no
| none |
+| `druid.emitter.kafka.clusterName` | Optional value to
specify the name of your Druid cluster. It can help make groups in your
monitoring environment.
| no | none |
### Example
```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
-druid.emitter.kafka.metric.topic=druid-metric
+druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
Review Comment:
We might as well add metrics to the list and keep the metric topic
configuration :)
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java:
##########
@@ -105,7 +111,7 @@ protected Producer<String, String> setKafkaProducer()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
Review Comment:
I think this would be a breaking change for existing consumers that expect
string types.
##########
extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java:
##########
@@ -77,19 +84,31 @@ public void testKafkaEmitter() throws InterruptedException
).build("service", "host")
);
- int totalEvents = serviceMetricEvents.size() + alertEvents.size() +
requestLogEvents.size();
+ final List<SegmentMetadataEvent> segmentMetadataEvents = ImmutableList.of(
+ new SegmentMetadataEvent(
+ "dummy_datasource",
+ DateTimes.of("2001-01-01T00:00:00.000Z"),
Review Comment:
👍
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java:
##########
@@ -21,53 +21,115 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class KafkaEmitterConfig
{
+ public enum EventType
+ {
+ METRICS,
+ ALERTS,
+ REQUESTS,
+ SEGMENTMETADATA {
+ @Override
+ public String toString()
+ {
+ return "segmentMetadata";
+ }
+ };
+
+ @JsonValue
+ @Override
+ public String toString()
+ {
+ return StringUtils.toLowerCase(this.name());
+ }
+ @JsonCreator
+ public static EventType fromString(String name)
+ {
+ return valueOf(StringUtils.toUpperCase(name));
+ }
+ }
+
+ public static final Set<EventType> DEFAULT_EVENT_TYPES =
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
- @JsonProperty("metric.topic")
+ @Nullable @JsonProperty("event.types")
+ private final Set<EventType> eventTypes;
+ @Nullable @JsonProperty("metric.topic")
private final String metricTopic;
- @JsonProperty("alert.topic")
+ @Nullable @JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
+ @Nullable @JsonProperty("segmentMetadata.topic")
+ private final String segmentMetadataTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
- private Map<String, String> kafkaProducerConfig;
+ private final Map<String, String> kafkaProducerConfig;
@JsonCreator
public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String
bootstrapServers,
- @JsonProperty("metric.topic") String metricTopic,
- @JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("event.types") Set<EventType> eventTypes,
+ @Nullable @JsonProperty("metric.topic") String metricTopic,
+ @Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
+ @Nullable @JsonProperty("segmentMetadata.topic") String
segmentMetadataTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String>
kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"bootstrap.servers can not be null");
- this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic
can not be null");
- this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can
not be null");
- this.requestTopic = requestTopic;
+ this.eventTypes = validateEventTypes(eventTypes, requestTopic);
+
+ this.metricTopic = this.eventTypes.contains(EventType.METRICS) ?
Preconditions.checkNotNull(metricTopic, "metric.topic can not be null") : null;
+ this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ?
Preconditions.checkNotNull(alertTopic, "alert.topic can not be null") : null;
+ this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ?
Preconditions.checkNotNull(requestTopic, "request.topic can not be null") :
null;
+ this.segmentMetadataTopic =
this.eventTypes.contains(EventType.SEGMENTMETADATA) ?
Preconditions.checkNotNull(segmentMetadataTopic, "segmentMetadata.topic can not
be null") : null;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of()
: kafkaProducerConfig;
}
+ private Set<EventType> validateEventTypes(Set<EventType> eventTypes, String
requestTopic)
Review Comment:
Should we rename this function since it's not really validating the supplied
event types? Or maybe have a default jackson getter for the event types
property that bakes in the backwards compatibility logic?
##########
extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java:
##########
@@ -21,53 +21,115 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class KafkaEmitterConfig
{
+ public enum EventType
+ {
+ METRICS,
+ ALERTS,
+ REQUESTS,
+ SEGMENTMETADATA {
+ @Override
+ public String toString()
+ {
+ return "segmentMetadata";
+ }
+ };
+
+ @JsonValue
+ @Override
+ public String toString()
+ {
+ return StringUtils.toLowerCase(this.name());
+ }
+ @JsonCreator
+ public static EventType fromString(String name)
+ {
+ return valueOf(StringUtils.toUpperCase(name));
+ }
+ }
+
+ public static final Set<EventType> DEFAULT_EVENT_TYPES =
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
- @JsonProperty("metric.topic")
+ @Nullable @JsonProperty("event.types")
+ private final Set<EventType> eventTypes;
+ @Nullable @JsonProperty("metric.topic")
private final String metricTopic;
- @JsonProperty("alert.topic")
+ @Nullable @JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
+ @Nullable @JsonProperty("segmentMetadata.topic")
+ private final String segmentMetadataTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
- private Map<String, String> kafkaProducerConfig;
+ private final Map<String, String> kafkaProducerConfig;
@JsonCreator
public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String
bootstrapServers,
- @JsonProperty("metric.topic") String metricTopic,
- @JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("event.types") Set<EventType> eventTypes,
+ @Nullable @JsonProperty("metric.topic") String metricTopic,
+ @Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
+ @Nullable @JsonProperty("segmentMetadata.topic") String
segmentMetadataTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String>
kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"bootstrap.servers can not be null");
- this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic
can not be null");
- this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can
not be null");
- this.requestTopic = requestTopic;
+ this.eventTypes = validateEventTypes(eventTypes, requestTopic);
+
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]