This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a2bad0b3a2 Reduce allocations due to Jackson serialization. (#12468)
a2bad0b3a2 is described below
commit a2bad0b3a2c80a5ffb704ec8c4a0e18734455d38
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 27 14:17:26 2022 -0700
Reduce allocations due to Jackson serialization. (#12468)
* Reduce allocations due to Jackson serialization.
This patch attacks two sources of allocations during Jackson
serialization:
1) ObjectMapper.writeValue and JsonGenerator.writeObject create a new
DefaultSerializerProvider instance for each call. It has lots of
fields and creates pressure on the garbage collector. So, this patch
adds helper functions in JacksonUtils that enable reuse of
SerializerProvider objects and updates various call sites to make
use of this.
2) GroupByQueryToolChest copies the ObjectMapper for every query to
install a special module that supports backwards compatibility with
map-based rows. This isn't needed if resultAsArray is set and
all servers are running Druid 0.16.0 or later. This release was a
while ago. So, this patch disables backwards compatibility by default,
which eliminates the need to copy the heavyweight ObjectMapper. The
patch also introduces a configuration option that allows admins to
explicitly enable backwards compatibility.
* Add test.
* Update additional call sites and add to forbidden APIs.
---
codestyle/druid-forbidden-apis.txt | 2 +
.../java/util/common/jackson/JacksonUtils.java | 40 +++++-
.../druid/common/jackson/JacksonUtilsTest.java | 137 +++++++++++++++++++
.../data/input/impl/prefetch/JsonIteratorTest.java | 5 +-
docs/querying/groupbyquery.md | 1 +
.../jackson/DruidDefaultSerializersModule.java | 38 +++++-
.../druid/query/groupby/GroupByQueryConfig.java | 8 ++
.../query/groupby/GroupByQueryQueryToolChest.java | 17 ++-
.../groupby/epinephelinae/SpillingGrouper.java | 7 +-
.../druid/jackson/DefaultObjectMapperTest.java | 28 ++++
.../groupby/GroupByQueryQueryToolChestTest.java | 150 +++++++++++++++++----
.../client/cache/BackgroundCachePopulator.java | 5 +-
.../client/cache/ForegroundCachePopulator.java | 5 +-
.../org/apache/druid/metadata/input/SqlEntity.java | 7 +-
.../druid/query/ResultLevelCachingQueryRunner.java | 6 +-
.../druid/client/CachingQueryRunnerTest.java | 5 +-
.../segment/realtime/firehose/SqlFirehoseTest.java | 5 +-
.../apache/druid/sql/http/ArrayLinesWriter.java | 8 +-
.../org/apache/druid/sql/http/ArrayWriter.java | 6 +-
.../apache/druid/sql/http/ObjectLinesWriter.java | 8 +-
.../org/apache/druid/sql/http/ObjectWriter.java | 6 +-
21 files changed, 447 insertions(+), 47 deletions(-)
diff --git a/codestyle/druid-forbidden-apis.txt
b/codestyle/druid-forbidden-apis.txt
index a8d66c2ba8..6d47377a4c 100644
--- a/codestyle/druid-forbidden-apis.txt
+++ b/codestyle/druid-forbidden-apis.txt
@@ -1,6 +1,8 @@
com.fasterxml.jackson.databind.ObjectMapper#reader(com.fasterxml.jackson.core.type.TypeReference)
@ Use ObjectMapper#readerFor instead
com.fasterxml.jackson.databind.ObjectMapper#reader(com.fasterxml.jackson.databind.JavaType)
@ Use ObjectMapper#readerFor instead
com.fasterxml.jackson.databind.ObjectMapper#reader(java.lang.Class) @ Use
ObjectMapper#readerFor instead
+com.fasterxml.jackson.databind.ObjectMapper#writeValue(com.fasterxml.jackson.core.JsonGenerator,
java.lang.Object) @ Use JacksonUtils#writeObjectUsingSerializerProvider to
allow SerializerProvider reuse
+com.fasterxml.jackson.core.JsonGenerator#writeObject(java.lang.Object) @ Use
JacksonUtils#writeObjectUsingSerializerProvider to allow SerializerProvider
reuse
com.google.common.base.Charsets @ Use java.nio.charset.StandardCharsets instead
com.google.common.collect.Iterators#emptyIterator() @ Use
java.util.Collections#emptyIterator()
com.google.common.collect.Lists#newArrayList() @ Create java.util.ArrayList
directly
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
index 6dbdc620be..b269544482 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
@@ -19,9 +19,14 @@
package org.apache.druid.java.util.common.jackson;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
@@ -40,7 +45,9 @@ public final class JacksonUtils
{
};
- /** Silences Jackson's {@link IOException}. */
+ /**
+ * Silences Jackson's {@link IOException}.
+ */
public static <T> T readValue(ObjectMapper mapper, byte[] bytes, Class<T>
valueClass)
{
try {
@@ -51,6 +58,37 @@ public final class JacksonUtils
}
}
+ /**
+ * Returns a serializer for a particular class. If you have a {@link
SerializerProvider}, this is better than calling
+ * {@link JsonGenerator#writeObject(Object)} or {@link
ObjectMapper#writeValue(JsonGenerator, Object)}, because it
+ * avoids re-creating the {@link SerializerProvider} for each serialized
object.
+ */
+ public static JsonSerializer<Object> getSerializer(final SerializerProvider
serializerProvider, final Class<?> clazz)
+ throws JsonMappingException
+ {
+ // cache = true, property = null because this is what
DefaultSerializerProvider.serializeValue would do.
+ return serializerProvider.findTypedValueSerializer(clazz, true, null);
+ }
+
+ /**
+ * Serializes an object using a {@link JsonGenerator}. If you have a {@link
SerializerProvider}, this is better than
+ * calling {@link JsonGenerator#writeObject(Object)}, because it avoids
re-creating the {@link SerializerProvider}
+ * for each serialized object.
+ */
+ public static void writeObjectUsingSerializerProvider(
+ final JsonGenerator jsonGenerator,
+ final SerializerProvider serializers,
+ @Nullable final Object o
+ ) throws IOException
+ {
+ if (o == null) {
+ jsonGenerator.writeNull();
+ } else {
+ final JsonSerializer<Object> serializer = getSerializer(serializers,
o.getClass());
+ serializer.serialize(o, jsonGenerator, serializers);
+ }
+ }
+
private JacksonUtils()
{
}
diff --git
a/core/src/test/java/org/apache/druid/common/jackson/JacksonUtilsTest.java
b/core/src/test/java/org/apache/druid/common/jackson/JacksonUtilsTest.java
new file mode 100644
index 0000000000..b08ac2c1ef
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/common/jackson/JacksonUtilsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common.jackson;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class JacksonUtilsTest
+{
+ @Test
+ public void testWriteObjectUsingSerializerProvider() throws IOException
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
+
+ final JsonGenerator jg = objectMapper.getFactory().createGenerator(baos);
+ jg.writeStartArray();
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, new
SerializableClass(2));
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, null);
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, new
SerializableClass(3));
+ jg.writeEndArray();
+ jg.close();
+
+ final List<SerializableClass> deserializedValues = objectMapper.readValue(
+ baos.toByteArray(),
+ new TypeReference<List<SerializableClass>>() {}
+ );
+
+ Assert.assertEquals(
+ Arrays.asList(new SerializableClass(2), null, new
SerializableClass(3)),
+ deserializedValues
+ );
+ }
+
+ @Test
+ public void testWritePrimitivesUsingSerializerProvider() throws IOException
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
+
+ final JsonGenerator jg = objectMapper.getFactory().createGenerator(baos);
+ jg.writeStartArray();
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, "foo");
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, null);
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, 1.23);
+ jg.writeEndArray();
+ jg.close();
+
+ final List<Object> deserializedValues = objectMapper.readValue(
+ baos.toByteArray(),
+ new TypeReference<List<Object>>() {}
+ );
+
+ Assert.assertEquals(
+ Arrays.asList("foo", null, 1.23),
+ deserializedValues
+ );
+ }
+
+ public static class SerializableClass
+ {
+ private final int value;
+
+ @JsonCreator
+ public SerializableClass(@JsonProperty("value") final int value)
+ {
+ this.value = value;
+ }
+
+ @JsonProperty
+ public int getValue()
+ {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SerializableClass that = (SerializableClass) o;
+ return value == that.value;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(value);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SerializableClass{" +
+ "value=" + value +
+ '}';
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/JsonIteratorTest.java
b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/JsonIteratorTest.java
index bff12cc469..8a6241e25b 100644
---
a/core/src/test/java/org/apache/druid/data/input/impl/prefetch/JsonIteratorTest.java
+++
b/core/src/test/java/org/apache/druid/data/input/impl/prefetch/JsonIteratorTest.java
@@ -23,9 +23,11 @@ package org.apache.druid.data.input.impl.prefetch;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -50,9 +52,10 @@ public class JsonIteratorTest
};
try (FileOutputStream fos = new FileOutputStream(testFile)) {
final JsonGenerator jg = mapper.getFactory().createGenerator(fos);
+ final SerializerProvider serializers =
mapper.getSerializerProviderInstance();
jg.writeStartArray();
for (Map<String, Object> mapFromList : expectedList) {
- jg.writeObject(mapFromList);
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers,
mapFromList);
}
jg.writeEndArray();
jg.close();
diff --git a/docs/querying/groupbyquery.md b/docs/querying/groupbyquery.md
index 4defd0001d..9554f1e86e 100644
--- a/docs/querying/groupbyquery.md
+++ b/docs/querying/groupbyquery.md
@@ -409,6 +409,7 @@ Supported runtime properties:
|--------|-----------|-------|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|`druid.query.groupBy.singleThreaded`|Merge results using a single
thread.|false|
+|`druid.query.groupBy.intermediateResultAsMapCompat`|Whether Brokers are able
to understand map-based result rows. Setting this to `true` adds some overhead
to all groupBy queries. It is required for compatibility with data servers
running versions older than 0.16.0, which introduced [array-based result
rows](#array-based-result-rows).|false|
Supported query contexts:
diff --git
a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
index a8ec404ad8..b5bc6f5fe5 100644
---
a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
+++
b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.joda.time.DateTimeZone;
@@ -93,11 +94,26 @@ public class DruidDefaultSerializersModule extends
SimpleModule
null,
new Accumulator<Object, Object>()
{
+ // Save allocations in jgen.writeObject by caching
serializer.
+ JsonSerializer<Object> serializer = null;
+ Class<?> serializerClass = null;
+
@Override
- public Object accumulate(Object o, Object o1)
+ public Object accumulate(Object ignored, Object object)
{
try {
- jgen.writeObject(o1);
+ if (object == null) {
+ jgen.writeNull();
+ } else {
+ final Class<?> clazz = object.getClass();
+
+ if (serializerClass != clazz) {
+ serializer = JacksonUtils.getSerializer(provider,
clazz);
+ serializerClass = clazz;
+ }
+
+ serializer.serialize(object, jgen, provider);
+ }
}
catch (IOException e) {
throw new RuntimeException(e);
@@ -119,11 +135,27 @@ public class DruidDefaultSerializersModule extends
SimpleModule
public void serialize(Yielder yielder, final JsonGenerator jgen,
SerializerProvider provider)
throws IOException
{
+ // Save allocations in jgen.writeObject by caching serializer.
+ JsonSerializer<Object> serializer = null;
+ Class<?> serializerClass = null;
+
try {
jgen.writeStartArray();
while (!yielder.isDone()) {
final Object o = yielder.get();
- jgen.writeObject(o);
+ if (o == null) {
+ jgen.writeNull();
+ } else {
+ final Class<?> clazz = o.getClass();
+
+ if (serializerClass != clazz) {
+ serializer = JacksonUtils.getSerializer(provider, clazz);
+ serializerClass = clazz;
+ }
+
+ serializer.serialize(o, jgen, provider);
+ }
+
yielder = yielder.next(null);
}
jgen.writeEndArray();
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
index 986f4d2920..bba6433391 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryConfig.java
@@ -105,6 +105,9 @@ public class GroupByQueryConfig
@JsonProperty
private boolean vectorize = true;
+ @JsonProperty
+ private boolean intermediateResultAsMapCompat = false;
+
@JsonProperty
private boolean enableMultiValueUnnesting = true;
@@ -203,6 +206,11 @@ public class GroupByQueryConfig
return vectorize;
}
+ public boolean isIntermediateResultAsMapCompat()
+ {
+ return intermediateResultAsMapCompat;
+ }
+
public boolean isForcePushDownNestedQuery()
{
return forcePushDownNestedQuery;
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
index 28c2658547..0ae0a67d45 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
@@ -40,6 +41,7 @@ import
org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
@@ -91,21 +93,24 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
public static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private final GroupByStrategySelector strategySelector;
+ private final GroupByQueryConfig queryConfig;
private final GroupByQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting
public GroupByQueryQueryToolChest(GroupByStrategySelector strategySelector)
{
- this(strategySelector, DefaultGroupByQueryMetricsFactory.instance());
+ this(strategySelector, GroupByQueryConfig::new,
DefaultGroupByQueryMetricsFactory.instance());
}
@Inject
public GroupByQueryQueryToolChest(
GroupByStrategySelector strategySelector,
+ Supplier<GroupByQueryConfig> queryConfigSupplier,
GroupByQueryMetricsFactory queryMetricsFactory
)
{
this.strategySelector = strategySelector;
+ this.queryConfig = queryConfigSupplier.get();
this.queryMetricsFactory = queryMetricsFactory;
}
@@ -415,6 +420,12 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
{
final boolean resultAsArray =
query.getContextBoolean(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, false);
+ if (resultAsArray && !queryConfig.isIntermediateResultAsMapCompat()) {
+ // We can assume ResultRow are serialized and deserialized as arrays. No
need for special decoration,
+ // and we can save the overhead of making a copy of the ObjectMapper.
+ return objectMapper;
+ }
+
// Serializer that writes array- or map-based rows as appropriate, based
on the "resultAsArray" setting.
final JsonSerializer<ResultRow> serializer = new
JsonSerializer<ResultRow>()
{
@@ -426,9 +437,9 @@ public class GroupByQueryQueryToolChest extends
QueryToolChest<ResultRow, GroupB
) throws IOException
{
if (resultAsArray) {
- jg.writeObject(resultRow.getArray());
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers,
resultRow.getArray());
} else {
- jg.writeObject(resultRow.toMapBasedRow(query));
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers,
resultRow.toMapBasedRow(query));
}
}
};
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
index e54a1877aa..f4dcd73988 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -22,6 +22,7 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@@ -30,6 +31,7 @@ import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4BlockOutputStream;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.BaseQuery;
@@ -309,10 +311,11 @@ public class SpillingGrouper<KeyType> implements
Grouper<KeyType>
final LZ4BlockOutputStream compressedOut = new
LZ4BlockOutputStream(out);
final JsonGenerator jsonGenerator =
spillMapper.getFactory().createGenerator(compressedOut)
) {
+ final SerializerProvider serializers =
spillMapper.getSerializerProviderInstance();
+
while (iterator.hasNext()) {
BaseQuery.checkInterrupted();
-
- jsonGenerator.writeObject(iterator.next());
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, iterator.next());
}
return out.getFile();
diff --git
a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
index 1c5560be7c..ac20e86867 100644
---
a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
+++
b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java
@@ -22,11 +22,18 @@ package org.apache.druid.jackson;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielders;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+
/**
+ *
*/
public class DefaultObjectMapperTest
{
@@ -39,4 +46,25 @@ public class DefaultObjectMapperTest
Assert.assertEquals(StringUtils.format("\"%s\"", time),
mapper.writeValueAsString(time));
}
+
+ @Test
+ public void testYielder() throws Exception
+ {
+ final Sequence<Object> sequence = Sequences.simple(
+ Arrays.asList(
+ "a",
+ "b",
+ null,
+ DateTimes.utc(2L),
+ 5,
+ DateTimeZone.UTC,
+ "c"
+ )
+ );
+
+ Assert.assertEquals(
+ "[\"a\",\"b\",null,\"1970-01-01T00:00:00.002Z\",5,\"UTC\",\"c\"]",
+ mapper.writeValueAsString(Yielders.each(sequence))
+ );
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
index 6fbfa083ac..2a700c08bd 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
@@ -538,7 +538,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.build();
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(
+ new GroupByQueryQueryToolChest(null).getCacheStrategy(
query1
);
@@ -592,8 +592,35 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
final Object[] rowObjects = {DateTimes.of("2000").getMillis(), "foo", 100,
10.0};
final ResultRow resultRow = ResultRow.of(rowObjects);
+ Assert.assertArrayEquals(
+ "standard mapper reads ResultRows",
+ rowObjects,
+ objectMapper.readValue(
+ arraysObjectMapper.writeValueAsBytes(resultRow),
+ Object[].class
+ )
+ );
Assert.assertEquals(
+ "standard mapper reads MapBasedRows",
+ resultRow.toMapBasedRow(query),
+ objectMapper.readValue(
+ mapsObjectMapper.writeValueAsBytes(resultRow),
+ Row.class
+ )
+ );
+
+ Assert.assertEquals(
+ "array mapper reads arrays",
+ resultRow,
+ arraysObjectMapper.readValue(
+ arraysObjectMapper.writeValueAsBytes(resultRow),
+ ResultRow.class
+ )
+ );
+
+ Assert.assertEquals(
+ "array mapper reads arrays (2)",
resultRow,
arraysObjectMapper.readValue(
StringUtils.format("[%s, \"foo\", 100, 10.0]",
DateTimes.of("2000").getMillis()),
@@ -601,22 +628,75 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
)
);
- TestHelper.assertRow("",
+ Assert.assertEquals(
+ "map mapper reads arrays",
resultRow,
- arraysObjectMapper.readValue(
- StringUtils.format(
- "{\"version\":\"v1\","
- + "\"timestamp\":\"%s\","
- + "\"event\":"
- + " {\"test\":\"foo\", \"rows\":100, \"post\":10.0}"
- + "}",
- DateTimes.of("2000")
- ),
+ mapsObjectMapper.readValue(
+ arraysObjectMapper.writeValueAsBytes(resultRow),
+ ResultRow.class
+ )
+ );
+
+ Assert.assertEquals(
+ "map mapper reads maps",
+ resultRow,
+ mapsObjectMapper.readValue(
+ mapsObjectMapper.writeValueAsBytes(resultRow),
+ ResultRow.class
+ )
+ );
+ }
+
+ @Test
+ public void testResultSerdeIntermediateResultAsMapCompat() throws Exception
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+
.setDimensions(Collections.singletonList(DefaultDimensionSpec.of("test")))
+
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
+ .setPostAggregatorSpecs(Collections.singletonList(new
ConstantPostAggregator("post", 10)))
+ .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+ .build();
+
+ final GroupByQueryQueryToolChest toolChest = new
GroupByQueryQueryToolChest(
+ null,
+ () -> new GroupByQueryConfig()
+ {
+ @Override
+ public boolean isIntermediateResultAsMapCompat()
+ {
+ return true;
+ }
+ },
+ null
+ );
+
+ final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper(
+ objectMapper,
+
query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS,
true))
+ );
+ final ObjectMapper mapsObjectMapper = toolChest.decorateObjectMapper(
+ objectMapper,
+
query.withOverriddenContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS,
false))
+ );
+
+ final Object[] rowObjects = {DateTimes.of("2000").getMillis(), "foo", 100,
10.0};
+ final ResultRow resultRow = ResultRow.of(rowObjects);
+
+
+ Assert.assertEquals(
+ resultRow,
+ arraysObjectMapper.readValue(
+ StringUtils.format("[%s, \"foo\", 100, 10.0]",
DateTimes.of("2000").getMillis()),
ResultRow.class
)
);
Assert.assertArrayEquals(
+ "standard mapper reads ResultRows",
rowObjects,
objectMapper.readValue(
arraysObjectMapper.writeValueAsBytes(resultRow),
@@ -625,6 +705,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
+ "standard mapper reads MapBasedRows",
resultRow.toMapBasedRow(query),
objectMapper.readValue(
mapsObjectMapper.writeValueAsBytes(resultRow),
@@ -633,7 +714,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
- "arrays read arrays",
+ "array mapper reads arrays",
resultRow,
arraysObjectMapper.readValue(
arraysObjectMapper.writeValueAsBytes(resultRow),
@@ -642,7 +723,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
- "arrays read maps",
+ "array mapper reads maps",
resultRow,
arraysObjectMapper.readValue(
mapsObjectMapper.writeValueAsBytes(resultRow),
@@ -650,8 +731,24 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
)
);
+ TestHelper.assertRow(
+ "array mapper reads maps (2)",
+ resultRow,
+ arraysObjectMapper.readValue(
+ StringUtils.format(
+ "{\"version\":\"v1\","
+ + "\"timestamp\":\"%s\","
+ + "\"event\":"
+ + " {\"test\":\"foo\", \"rows\":100, \"post\":10.0}"
+ + "}",
+ DateTimes.of("2000")
+ ),
+ ResultRow.class
+ )
+ );
+
Assert.assertEquals(
- "maps read arrays",
+ "map mapper reads arrays",
resultRow,
mapsObjectMapper.readValue(
arraysObjectMapper.writeValueAsBytes(resultRow),
@@ -660,7 +757,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
);
Assert.assertEquals(
- "maps read maps",
+ "map mapper reads maps",
resultRow,
mapsObjectMapper.readValue(
mapsObjectMapper.writeValueAsBytes(resultRow),
@@ -689,7 +786,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.add("uniques", null)
.add("const", ColumnType.LONG)
.build(),
- new GroupByQueryQueryToolChest(null, null).resultArraySignature(query)
+ new GroupByQueryQueryToolChest(null).resultArraySignature(query)
);
}
@@ -714,7 +811,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.add("uniques", null)
.add("const", ColumnType.LONG)
.build(),
- new GroupByQueryQueryToolChest(null, null).resultArraySignature(query)
+ new GroupByQueryQueryToolChest(null).resultArraySignature(query)
);
}
@@ -735,7 +832,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
new Object[]{"foo", 1L, 2L, 3L, 1L},
new Object[]{"bar", 4L, 5L, 6L, 1L}
),
- new GroupByQueryQueryToolChest(null, null).resultsAsArrays(
+ new GroupByQueryQueryToolChest(null).resultsAsArrays(
query,
Sequences.simple(
ImmutableList.of(
@@ -764,7 +861,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
new Object[]{DateTimes.of("2000-01-01").getMillis(), "foo", 1L,
2L, 3L, 1L},
new Object[]{DateTimes.of("2000-01-02").getMillis(), "bar", 4L,
5L, 6L, 1L}
),
- new GroupByQueryQueryToolChest(null, null).resultsAsArrays(
+ new GroupByQueryQueryToolChest(null).resultsAsArrays(
query,
Sequences.simple(
ImmutableList.of(
@@ -780,7 +877,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
public void testCanPerformSubqueryOnGroupBys()
{
Assert.assertTrue(
- new GroupByQueryQueryToolChest(null, null).canPerformSubquery(
+ new GroupByQueryQueryToolChest(null).canPerformSubquery(
new GroupByQuery.Builder()
.setDataSource(
new QueryDataSource(
@@ -802,7 +899,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
public void testCanPerformSubqueryOnTimeseries()
{
Assert.assertFalse(
- new GroupByQueryQueryToolChest(null, null).canPerformSubquery(
+ new GroupByQueryQueryToolChest(null).canPerformSubquery(
Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
@@ -816,7 +913,7 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
public void testCanPerformSubqueryOnGroupByOfTimeseries()
{
Assert.assertFalse(
- new GroupByQueryQueryToolChest(null, null).canPerformSubquery(
+ new GroupByQueryQueryToolChest(null).canPerformSubquery(
new GroupByQuery.Builder()
.setDataSource(
new QueryDataSource(
@@ -886,12 +983,17 @@ public class GroupByQueryQueryToolChestTest extends
InitializedNullHandlingTest
.build();
CacheStrategy<ResultRow, Object, GroupByQuery> strategy =
- new GroupByQueryQueryToolChest(null, null).getCacheStrategy(
+ new GroupByQueryQueryToolChest(null).getCacheStrategy(
query1
);
// test timestamps that result in integer size millis
- final ResultRow result1 = ResultRow.of(123L, dimValue, 1,
getIntermediateComplexValue(valueType.getType(), dimValue));
+ final ResultRow result1 = ResultRow.of(
+ 123L,
+ dimValue,
+ 1,
+ getIntermediateComplexValue(valueType.getType(), dimValue)
+ );
Object preparedValue =
strategy.prepareForSegmentLevelCache().apply(result1);
diff --git
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
index 3c67cb016d..57209edea2 100644
---
a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
+++
b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java
@@ -21,6 +21,7 @@ package org.apache.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -31,6 +32,7 @@ import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
@@ -120,10 +122,11 @@ public class BackgroundCachePopulator implements
CachePopulator
{
try {
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
try (JsonGenerator gen =
objectMapper.getFactory().createGenerator(bytes)) {
for (CacheType result : results) {
- gen.writeObject(result);
+ JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers,
result);
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
cachePopulatorStats.incrementOversized();
diff --git
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
index 0bdeb3a491..8c3643a45f 100644
---
a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
+++
b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java
@@ -21,11 +21,13 @@ package org.apache.druid.client.cache;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.ByteArrayOutputStream;
@@ -65,6 +67,7 @@ public class ForegroundCachePopulator implements
CachePopulator
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final MutableBoolean tooBig = new MutableBoolean(false);
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
final JsonGenerator jsonGenerator;
try {
@@ -80,7 +83,7 @@ public class ForegroundCachePopulator implements
CachePopulator
input -> {
if (!tooBig.isTrue()) {
try {
- jsonGenerator.writeObject(cacheFn.apply(input));
+
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers,
cacheFn.apply(input));
// Not flushing jsonGenerator before checking this, but
should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste
cycles flushing.
diff --git
a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
index 724077a4c0..abc64baae5 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
@@ -21,9 +21,11 @@ package org.apache.druid.metadata.input;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.SQLMetadataStorageActionHandler;
@@ -117,7 +119,8 @@ public class SqlEntity implements InputEntity
throws IOException
{
try (FileOutputStream fos = new FileOutputStream(tempFile);
- final JsonGenerator jg =
objectMapper.getFactory().createGenerator(fos);) {
+ final JsonGenerator jg =
objectMapper.getFactory().createGenerator(fos)) {
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
// Execute the sql query and lazily retrieve the results into the file
in json format.
// foldCase is useful to handle differences in case sensitivity behavior
across databases.
@@ -152,7 +155,7 @@ public class SqlEntity implements InputEntity
).iterator();
jg.writeStartArray();
while (resultIterator.hasNext()) {
- jg.writeObject(resultIterator.next());
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers,
resultIterator.next());
}
jg.writeEndArray();
jg.close();
diff --git
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
index 8a2d383b28..2b8efbc330 100644
---
a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
+++
b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java
@@ -22,6 +22,7 @@ package org.apache.druid.query;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -33,6 +34,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResource;
@@ -247,6 +249,7 @@ public class ResultLevelCachingQueryRunner<T> implements
QueryRunner<T>
{
private final Cache cache;
private final ObjectMapper mapper;
+ private final SerializerProvider serialiers;
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
@Nullable
@@ -262,6 +265,7 @@ public class ResultLevelCachingQueryRunner<T> implements
QueryRunner<T>
{
this.cache = cache;
this.mapper = mapper;
+ this.serialiers = mapper.getSerializerProviderInstance();
this.key = key;
this.cacheConfig = cacheConfig;
this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() :
null;
@@ -285,7 +289,7 @@ public class ResultLevelCachingQueryRunner<T> implements
QueryRunner<T>
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
try (JsonGenerator gen =
mapper.getFactory().createGenerator(cacheObjectStream)) {
- gen.writeObject(cacheFn.apply(resultEntry));
+ JacksonUtils.writeObjectUsingSerializerProvider(gen, serialiers,
cacheFn.apply(resultEntry));
if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
stopPopulating();
}
diff --git
a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
index abbf316537..31ce462c1b 100644
--- a/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingQueryRunnerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.client;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -45,6 +46,7 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Druids;
@@ -495,8 +497,9 @@ public class CachingQueryRunnerTest
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (JsonGenerator gen = objectMapper.getFactory().createGenerator(bytes))
{
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
for (T result : results) {
- gen.writeObject(result);
+ JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers,
result);
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseTest.java
index 53b25e4c13..026b442881 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/SqlFirehoseTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -34,6 +35,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.impl.prefetch.JsonIterator;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
@@ -84,8 +86,9 @@ public class SqlFirehoseTest
File file = new File(TEST_DIR, "test_" + i++);
try (FileOutputStream fos = new FileOutputStream(file)) {
final JsonGenerator jg =
objectMapper.getFactory().createGenerator(fos);
+ final SerializerProvider serializers =
objectMapper.getSerializerProviderInstance();
jg.writeStartArray();
- jg.writeObject(m);
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, m);
jg.writeEndArray();
jg.close();
testFile.add(new FileInputStream(file));
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java
b/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java
index dc5849d8e1..beda6decea 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayLinesWriter.java
@@ -22,7 +22,9 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -30,11 +32,13 @@ import java.io.OutputStream;
public class ArrayLinesWriter implements ResultFormat.Writer
{
- private final OutputStream outputStream;
+ private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;
+ private final OutputStream outputStream;
public ArrayLinesWriter(final OutputStream outputStream, final ObjectMapper
jsonMapper) throws IOException
{
+ this.serializers = jsonMapper.getSerializerProviderInstance();
this.outputStream = outputStream;
this.jsonGenerator =
jsonMapper.writer().getFactory().createGenerator(outputStream);
jsonGenerator.setRootValueSeparator(new SerializedString("\n"));
@@ -75,7 +79,7 @@ public class ArrayLinesWriter implements ResultFormat.Writer
@Override
public void writeRowField(final String name, @Nullable final Object value)
throws IOException
{
- jsonGenerator.writeObject(value);
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, value);
}
@Override
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
index b8893de98e..cd863d5bf1 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java
@@ -21,7 +21,9 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -32,11 +34,13 @@ import java.io.OutputStream;
public class ArrayWriter implements ResultFormat.Writer
{
+ private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;
private final OutputStream outputStream;
public ArrayWriter(final OutputStream outputStream, final ObjectMapper
jsonMapper) throws IOException
{
+ this.serializers = jsonMapper.getSerializerProviderInstance();
this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
this.outputStream = outputStream;
@@ -79,7 +83,7 @@ public class ArrayWriter implements ResultFormat.Writer
@Override
public void writeRowField(final String name, @Nullable final Object value)
throws IOException
{
- jsonGenerator.writeObject(value);
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, value);
}
@Override
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java
b/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java
index 0d91514e9a..a593b9b21b 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectLinesWriter.java
@@ -22,7 +22,9 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -30,11 +32,13 @@ import java.io.OutputStream;
public class ObjectLinesWriter implements ResultFormat.Writer
{
- private final OutputStream outputStream;
+ private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;
+ private final OutputStream outputStream;
public ObjectLinesWriter(final OutputStream outputStream, final ObjectMapper
jsonMapper) throws IOException
{
+ this.serializers = jsonMapper.getSerializerProviderInstance();
this.outputStream = outputStream;
this.jsonGenerator =
jsonMapper.writer().getFactory().createGenerator(outputStream);
jsonGenerator.setRootValueSeparator(new SerializedString("\n"));
@@ -76,7 +80,7 @@ public class ObjectLinesWriter implements ResultFormat.Writer
public void writeRowField(final String name, @Nullable final Object value)
throws IOException
{
jsonGenerator.writeFieldName(name);
- jsonGenerator.writeObject(value);
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, value);
}
@Override
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
index 464cd4b554..bdab65a1f7 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java
@@ -21,7 +21,9 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -35,11 +37,13 @@ public class ObjectWriter implements ResultFormat.Writer
static final String TYPE_HEADER_NAME = "type";
static final String SQL_TYPE_HEADER_NAME = "sqlType";
+ private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;
private final OutputStream outputStream;
public ObjectWriter(final OutputStream outputStream, final ObjectMapper
jsonMapper) throws IOException
{
+ this.serializers = jsonMapper.getSerializerProviderInstance();
this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream);
this.outputStream = outputStream;
@@ -83,7 +87,7 @@ public class ObjectWriter implements ResultFormat.Writer
public void writeRowField(final String name, @Nullable final Object value)
throws IOException
{
jsonGenerator.writeFieldName(name);
- jsonGenerator.writeObject(value);
+ JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator,
serializers, value);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]