This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0bc9f9f303 #12912 Fix KafkaEmitter not emitting queryType for a native
query (#12915)
0bc9f9f303 is described below
commit 0bc9f9f30395309995a851fb4ed60a0521381b82
Author: Bartosz Mikulski <[email protected]>
AuthorDate: Wed Aug 24 10:37:00 2022 +0200
#12912 Fix KafkaEmitter not emitting queryType for a native query (#12915)
Fixes KafkaEmitter not emitting queryType for a native query. The Event to
JSON serialization was extracted to the external class: EventToJsonSerializer.
This was done to simplify the testing logic for the serialization as well as
extract the responsibility of serialization to the separate class.
The logic builds ObjectNode incrementally based on the event .toMap method.
Parsing each entry individually ensures that the Jackson polymorphic
annotations are respected. Not respecting these annotation caused the missing
of the queryType from output event.
---
.../apache/druid/java/util/emitter/core/Event.java | 4 +-
.../druid/java/util/emitter/core/EventMap.java | 109 +++++++++++++
.../util/emitter/core/EventMapSerializer.java} | 28 ++--
.../java/util/emitter/service/AlertEvent.java | 6 +-
.../util/emitter/service/ServiceMetricEvent.java | 45 +++---
.../druid/java/util/emitter/core/IntEvent.java | 4 +-
.../emitter/service/ServiceMetricEventTest.java | 1 +
.../druid/java/util/emitter/service/UnitEvent.java | 15 +-
.../apache/druid/emitter/kafka/KafkaEmitter.java | 27 ++--
.../opentelemetry/OpenTelemetryEmitterTest.java | 6 +-
.../druid/server/log/DefaultRequestLogEvent.java | 32 ++--
.../druid/server/log/AlertEventSerdeTest.java | 65 ++++++++
.../server/log/DefaultRequestLogEventTest.java | 177 +++++++++++++++++++++
.../server/log/ServiceMetricEventSerdeTest.java | 58 +++++++
14 files changed, 497 insertions(+), 80 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java
b/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java
index 37e6e57bf5..1f5a20c618 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Event.java
@@ -19,13 +19,11 @@
package org.apache.druid.java.util.emitter.core;
-import java.util.Map;
-
/**
*/
public interface Event
{
- Map<String, Object> toMap();
+ EventMap toMap();
String getFeed();
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java
b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java
new file mode 100644
index 0000000000..5cf031b2dc
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMap.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.core;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * EventMap is a hash map implementation. It can be safely serialzed to JSON
using Jackson serializer as it
+ * respects the polymorphic annotations on entires (unlike standard Map). The
example of polymorphic class is a query
+ * interface, where different native query types are resolved by additional
field called "queryType".
+ * This implementation ensures that the annotation on the values are respected
during serialization.
+ */
+@JsonSerialize(using = EventMapSerializer.class)
+public class EventMap extends HashMap<String, Object>
+{
+ /**
+ * Returns builder with Fluent API to build EventMap instance using method
chaining
+ */
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * Convert this EventMap to a builder. Performs copy of the whole EventMap.
+ */
+ public Builder asBuilder()
+ {
+ return new Builder().putAll(this);
+ }
+
+ public static class Builder
+ {
+
+ private final EventMap map;
+
+ protected Builder()
+ {
+ map = new EventMap();
+ }
+
+ /**
+ * Adds key -> value pair to the map
+ */
+ public Builder put(String key, Object value)
+ {
+ map.put(key, value);
+ return this;
+ }
+
+ /**
+ * Adds key -> value pair to the map only if value is not null
+ */
+ public Builder putNonNull(String key, Object value)
+ {
+ if (value != null) {
+ map.put(key, value);
+ }
+ return this;
+ }
+
+ /**
+ * Adds map entry to the map
+ */
+ public Builder put(Map.Entry<String, Object> entry)
+ {
+ map.put(entry.getKey(), entry.getValue());
+ return this;
+ }
+
+ /**
+ * Adds all key -> value pairs from other map
+ */
+ public Builder putAll(Map<? extends String, ? extends Object> other)
+ {
+ map.putAll(other);
+ return this;
+ }
+
+ /**
+ * Builds and returns the EventMap
+ */
+ public EventMap build()
+ {
+ return map;
+ }
+ }
+
+}
diff --git
a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java
similarity index 62%
copy from
core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
copy to
core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java
index 3fae083e15..ff1121973e 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
+++
b/core/src/main/java/org/apache/druid/java/util/emitter/core/EventMapSerializer.java
@@ -19,26 +19,22 @@
package org.apache.druid.java.util.emitter.core;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
import java.util.Map;
-class IntEvent implements Event
+public class EventMapSerializer extends JsonSerializer<EventMap>
{
- int index;
-
- IntEvent()
- {
- }
-
@Override
- public Map<String, Object> toMap()
+ public void serialize(EventMap map, JsonGenerator gen, SerializerProvider
serializers) throws IOException
{
- return null;
+ gen.writeStartObject();
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ gen.writeObjectField(entry.getKey(), entry.getValue());
+ }
+ gen.writeEndObject();
}
-
- @Override
- public String getFeed()
- {
- return null;
- }
-
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java
b/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java
index a5d805dc14..e2a7987c15 100644
---
a/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java
+++
b/core/src/main/java/org/apache/druid/java/util/emitter/service/AlertEvent.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.joda.time.DateTime;
import java.util.Collections;
@@ -148,9 +149,10 @@ public class AlertEvent implements Event
@Override
@JsonValue
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
- return ImmutableMap.<String, Object>builder()
+ return EventMap
+ .builder()
.put("feed", getFeed())
.put("timestamp", createdTime.toString())
.putAll(serviceDimensions)
diff --git
a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
index 65ddc70341..f40491ce70 100644
---
a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
+++
b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
@@ -27,6 +27,7 @@ import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.joda.time.DateTime;
import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.TreeMap;
/**
+ *
*/
@PublicApi
public class ServiceMetricEvent implements Event
@@ -105,28 +107,29 @@ public class ServiceMetricEvent implements Event
@Override
@JsonValue
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
- return ImmutableMap.<String, Object>builder()
- .put("feed", getFeed())
- .put("timestamp", createdTime.toString())
- .putAll(serviceDims)
- .put("metric", metric)
- .put("value", value)
- .putAll(
- Maps.filterEntries(
- userDims,
- new Predicate<Map.Entry<String, Object>>()
- {
- @Override
- public boolean apply(Map.Entry<String,
Object> input)
- {
- return input.getKey() != null;
- }
- }
- )
- )
- .build();
+ return EventMap
+ .builder()
+ .put("feed", getFeed())
+ .put("timestamp", createdTime.toString())
+ .putAll(serviceDims)
+ .put("metric", metric)
+ .put("value", value)
+ .putAll(
+ Maps.filterEntries(
+ userDims,
+ new Predicate<Map.Entry<String, Object>>()
+ {
+ @Override
+ public boolean apply(Map.Entry<String, Object> input)
+ {
+ return input.getKey() != null;
+ }
+ }
+ )
+ )
+ .build();
}
public static class Builder
diff --git
a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
b/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
index 3fae083e15..143504706e 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
+++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/IntEvent.java
@@ -19,8 +19,6 @@
package org.apache.druid.java.util.emitter.core;
-import java.util.Map;
-
class IntEvent implements Event
{
int index;
@@ -30,7 +28,7 @@ class IntEvent implements Event
}
@Override
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
return null;
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
index d713a016b0..5a97f76564 100644
---
a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
+++
b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
@@ -290,4 +290,5 @@ public class ServiceMetricEventTest
{
ServiceMetricEvent.builder().build("foo", 0 / 0f);
}
+
}
diff --git
a/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java
b/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java
index d65769d9f4..f8e408db70 100644
---
a/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java
+++
b/core/src/test/java/org/apache/druid/java/util/emitter/service/UnitEvent.java
@@ -23,12 +23,13 @@ package org.apache.druid.java.util.emitter.service;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
/**
+ *
*/
public class UnitEvent implements Event
{
@@ -50,12 +51,14 @@ public class UnitEvent implements Event
@Override
@JsonValue
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
- Map<String, Object> result = new HashMap<>(dimensions);
- result.put("feed", feed);
- result.put("metrics", ImmutableMap.of("value", value));
- return ImmutableMap.copyOf(result);
+ return EventMap
+ .builder()
+ .putAll(dimensions)
+ .put("feed", feed)
+ .put("metrics", ImmutableMap.of("value", value))
+ .build();
}
@Override
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index cdab2a7d87..129a374b58 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -22,13 +22,13 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
import
org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
@@ -39,7 +39,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -126,8 +125,12 @@ public class KafkaEmitter implements Emitter
scheduler.schedule(this::sendRequestToKafka, sendInterval,
TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
- log.info("Message lost counter: metricLost=[%d], alertLost=[%d],
requestLost=[%d], invalidLost=[%d]",
- metricLost.get(), alertLost.get(), requestLost.get(),
invalidLost.get()
+ log.info(
+ "Message lost counter: metricLost=[%d], alertLost=[%d],
requestLost=[%d], invalidLost=[%d]",
+ metricLost.get(),
+ alertLost.get(),
+ requestLost.get(),
+ invalidLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES,
TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
@@ -166,14 +169,16 @@ public class KafkaEmitter implements Emitter
public void emit(final Event event)
{
if (event != null) {
- ImmutableMap.Builder<String, Object> resultBuilder =
ImmutableMap.<String, Object>builder().putAll(event.toMap());
- if (config.getClusterName() != null) {
- resultBuilder.put("clusterName", config.getClusterName());
- }
- Map<String, Object> result = resultBuilder.build();
-
try {
- String resultJson = jsonMapper.writeValueAsString(result);
+ EventMap map = event.toMap();
+ if (config.getClusterName() != null) {
+ map = map.asBuilder()
+ .put("clusterName", config.getClusterName())
+ .build();
+ }
+
+ String resultJson = jsonMapper.writeValueAsString(map);
+
ObjectContainer<String> objectContainer = new ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
diff --git
a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java
b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java
index 1db937498f..58ee485f9c 100644
---
a/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java
+++
b/extensions-contrib/opentelemetry-emitter/src/test/java/org/apache/druid/emitter/opentelemetry/OpenTelemetryEmitterTest.java
@@ -32,6 +32,7 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
import org.junit.Assert;
@@ -39,7 +40,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -98,9 +98,9 @@ public class OpenTelemetryEmitterTest
new Event()
{
@Override
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
- return Collections.emptyMap();
+ return new EventMap();
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
index 4744b2f6d0..d7f75d216e 100644
---
a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
+++
b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
@@ -23,12 +23,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.guice.annotations.PublicApi;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.query.Query;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.joda.time.DateTime;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -55,23 +55,25 @@ public final class DefaultRequestLogEvent implements
RequestLogEvent
*/
@JsonValue(value = false)
@Override
- public Map<String, Object> toMap()
+ public EventMap toMap()
{
- final Map<String, Object> map = new HashMap<>();
- map.put("feed", getFeed());
- map.put("timestamp", getCreatedTime());
- map.put("service", getService());
- map.put("host", getHost());
- if (getQuery() != null) {
- map.put("query", getQuery());
- }
+ final EventMap.Builder builder = EventMap
+ .builder()
+ .put("feed", getFeed())
+ .put("timestamp", getCreatedTime())
+ .put("service", getService())
+ .put("host", getHost())
+ .putNonNull("query", getQuery());
+
if (getSql() != null) {
- map.put("sql", getSql());
- map.put("sqlQueryContext", getSqlQueryContext());
+ builder.put("sql", getSql())
+ .put("sqlQueryContext", getSqlQueryContext());
}
- map.put("remoteAddr", getRemoteAddr());
- map.put("queryStats", getQueryStats());
- return map;
+
+ builder.put("remoteAddr", getRemoteAddr())
+ .put("queryStats", getQueryStats());
+
+ return builder.build();
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java
b/server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java
new file mode 100644
index 0000000000..7fcd0db189
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/log/AlertEventSerdeTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.log;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class AlertEventSerdeTest
+{
+ @Test
+ public void testSerializeAlertEventMap() throws JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ String timestamp = "2022-08-17T18:51:00.000Z";
+ Event event = new AlertEvent(
+ DateTimes.of(timestamp),
+ "my-service",
+ "my-host",
+ AlertEvent.Severity.DEFAULT,
+ "my-description",
+ Collections.emptyMap()
+ );
+
+ String actual = mapper.writeValueAsString(event.toMap());
+ String expected = "{"
+ + "\"feed\":\"alerts\","
+ + "\"timestamp\":\""
+ + timestamp
+ + "\","
+ + "\"severity\":\""
+ + AlertEvent.Severity.DEFAULT
+ + "\","
+ + "\"service\":\"my-service\","
+ + "\"host\":\"my-host\","
+ + "\"description\":\"my-description\","
+ + "\"data\":{}"
+ + "}";
+ Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual));
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
index 126705ec8b..ebeffcf8c9 100644
---
a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
+++
b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.log;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -26,6 +27,8 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@@ -37,6 +40,7 @@ import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -162,4 +166,177 @@ public class DefaultRequestLogEventTest
Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
}
+
+ @Test
+ public void testSerializeSqlLogRequestMap() throws JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ String timestamp = "2022-08-17T18:51:00.000Z";
+
+ Event event = DefaultRequestLogEventBuilderFactory.instance()
+
.createRequestLogEventBuilder(
+ "requests",
+
RequestLogLine.forSql(
+ "SELECT * FROM
dummy",
+
Collections.emptyMap(),
+
DateTimes.of(timestamp),
+ "127.0.0.1",
+ new
QueryStats(ImmutableMap.of())
+ )
+ )
+ .build("my-service",
"my-host");
+
+ String actual = mapper.writeValueAsString(event.toMap());
+ String expected = "{"
+ + "\"feed\":\"requests\","
+ + "\"timestamp\":\""
+ + timestamp
+ + "\","
+ + "\"service\":\"my-service\","
+ + "\"host\":\"my-host\","
+ + "\"sql\":\"SELECT * FROM dummy\","
+ + "\"sqlQueryContext\":{},"
+ + "\"queryStats\":{},"
+ + "\"remoteAddr\":\"127.0.0.1\""
+ + "}";
+
+ Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual));
+ }
+
+ @Test
+ public void testSerializeNativeLogRequestMap() throws JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+
+ RequestLogLine nativeLine = RequestLogLine.forNative(
+ new TimeseriesQuery(
+ new TableDataSource("dummy"),
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(Intervals.of(
+ "2015-01-01/2015-01-02"))),
+ true,
+ VirtualColumns.EMPTY,
+ null,
+ Granularities.ALL,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ 5,
+ ImmutableMap.of("key", "value")
+ ),
+ DateTimes.of(2019, 12, 12, 3, 1),
+ "127.0.0.1",
+ new QueryStats(ImmutableMap.of(
+ "query/time",
+ 13L,
+ "query/bytes",
+ 10L,
+ "success",
+ true,
+ "identity",
+ "allowAll"
+ ))
+ );
+
+ Event event = DefaultRequestLogEventBuilderFactory.instance()
+
.createRequestLogEventBuilder("my-feed", nativeLine)
+ .build("my-service",
"my-host");
+
+ String actual = mapper.writeValueAsString(event.toMap());
+ String queryString = "{"
+ + "\"queryType\":\"timeseries\","
+ +
"\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},"
+ +
"\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},"
+ + "\"descending\":true,"
+ + "\"granularity\":{\"type\":\"all\"},"
+ + "\"limit\":5,"
+ + "\"context\":{\"key\":\"value\"}"
+ + "}";
+
+ String expected = "{"
+ + "\"feed\":\"my-feed\","
+ + "\"host\":\"my-host\","
+ + "\"service\":\"my-service\","
+ + "\"timestamp\":\"2019-12-12T03:01:00.000Z\","
+ + "\"query\":"
+ + queryString
+ + ","
+ + "\"remoteAddr\":\"127.0.0.1\","
+ +
"\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}";
+
+ Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual));
+ }
+
+ @Test
+ public void testSerializeNativeLogRequestMapWithAdditionalParameters()
throws JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+
+
+ RequestLogLine nativeLine = RequestLogLine.forNative(
+ new TimeseriesQuery(
+ new TableDataSource("dummy"),
+ new MultipleIntervalSegmentSpec(
+ ImmutableList.of(Intervals.of(
+ "2015-01-01/2015-01-02"))),
+ true,
+ VirtualColumns.EMPTY,
+ null,
+ Granularities.ALL,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ 5,
+ ImmutableMap.of("key", "value")
+ ),
+ DateTimes.of(2019, 12, 12, 3, 1),
+ "127.0.0.1",
+ new QueryStats(ImmutableMap.of(
+ "query/time",
+ 13L,
+ "query/bytes",
+ 10L,
+ "success",
+ true,
+ "identity",
+ "allowAll"
+ ))
+ );
+
+ Event event = DefaultRequestLogEventBuilderFactory.instance()
+
.createRequestLogEventBuilder("my-feed", nativeLine)
+ .build("my-service",
"my-host");
+
+ EventMap map = EventMap.builder()
+ .putNonNull("number", 1)
+ .putNonNull("text", "some text")
+ .putNonNull("null", null)
+ .putAll(event.toMap())
+ .build();
+
+ String actual = mapper.writeValueAsString(map);
+ String queryString = "{"
+ + "\"queryType\":\"timeseries\","
+ +
"\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},"
+ +
"\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},"
+ + "\"descending\":true,"
+ + "\"granularity\":{\"type\":\"all\"},"
+ + "\"limit\":5,"
+ + "\"context\":{\"key\":\"value\"}"
+ + "}";
+
+ String expected = "{"
+ + "\"feed\":\"my-feed\","
+ + "\"host\":\"my-host\","
+ + "\"service\":\"my-service\","
+ + "\"timestamp\":\"2019-12-12T03:01:00.000Z\","
+ + "\"query\":"
+ + queryString
+ + ","
+ + "\"remoteAddr\":\"127.0.0.1\","
+ + "\"number\":1,"
+ + "\"text\":\"some text\","
+ +
"\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}";
+
+ Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual));
+ }
+
}
diff --git
a/server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java
b/server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java
new file mode 100644
index 0000000000..341bab57bb
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/log/ServiceMetricEventSerdeTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.log;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ServiceMetricEventSerdeTest
+{
+
+ @Test
+ public void testSerializeServiceMetricEventMap() throws
JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ String timestamp = "2022-08-17T18:51:00.000Z";
+ Event event = ServiceMetricEvent.builder()
+ .setFeed("my-feed")
+ .build(DateTimes.of(timestamp), "m1", 1)
+ .build("my-service", "my-host");
+
+ String actual = mapper.writeValueAsString(event.toMap());
+ String expected = "{"
+ + "\"feed\":\"my-feed\","
+ + "\"timestamp\":\""
+ + timestamp
+ + "\","
+ + "\"metric\":\"m1\","
+ + "\"value\":1,"
+ + "\"service\":\"my-service\","
+ + "\"host\":\"my-host\""
+ + "}";
+ Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]