This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 25.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/25.0.0 by this push:
new 93e2a7fc88 add protobuf flattener, direct to plain java conversion for
faster flattening (#13519) (#13546)
93e2a7fc88 is described below
commit 93e2a7fc88c3f63f2700d86bb7ebd45ee73c4feb
Author: Clint Wylie <[email protected]>
AuthorDate: Sun Dec 11 19:57:47 2022 -0800
add protobuf flattener, direct to plain java conversion for faster
flattening (#13519) (#13546)
* add protobuf flattener, direct to plain java conversion for faster
flattening, nested column tests
---
.../util/common/parsers/FlattenerJsonProvider.java | 65 +-----
.../util/common/parsers/JSONFlattenerMaker.java | 18 +-
.../common/parsers/FlattenerJsonProviderTest.java | 146 ++++++++++++
.../data/input/avro/GenericAvroJsonProvider.java | 93 +-------
.../data/input/orc/OrcStructJsonProvider.java | 114 +---------
.../parquet/simple/ParquetGroupJsonProvider.java | 104 +--------
extensions-core/protobuf-extensions/pom.xml | 19 +-
.../data/input/protobuf/ProtobufConverter.java | 244 +++++++++++++++++++++
.../input/protobuf/ProtobufFlattenerMaker.java | 114 ++++++++++
.../data/input/protobuf/ProtobufInputFormat.java | 9 +-
.../data/input/protobuf/ProtobufJsonProvider.java | 79 +++++++
.../druid/data/input/protobuf/ProtobufReader.java | 60 ++---
.../input/protobuf/ProtobufInputFormatTest.java | 243 +++++++++++++++++++-
.../data/input/protobuf/ProtobufReaderTest.java | 8 -
14 files changed, 879 insertions(+), 437 deletions(-)
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
similarity index 63%
copy from
extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
copy to
core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
index 15f81b6f20..2574f94b68 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
@@ -17,32 +17,19 @@
* under the License.
*/
-package org.apache.druid.data.input.orc;
+package org.apache.druid.java.util.common.parsers;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
-import org.apache.hadoop.io.Text;
-import org.apache.orc.mapred.OrcMap;
-import org.apache.orc.mapred.OrcStruct;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-public class OrcStructJsonProvider implements JsonProvider
+public abstract class FlattenerJsonProvider implements JsonProvider
{
- private final OrcStructConverter converter;
-
- OrcStructJsonProvider(OrcStructConverter converter)
- {
- this.converter = converter;
- }
-
@Override
public Object createArray()
{
@@ -52,19 +39,7 @@ public class OrcStructJsonProvider implements JsonProvider
@Override
public Object createMap()
{
- return new HashMap<>();
- }
-
- @Override
- public boolean isArray(final Object o)
- {
- return o instanceof List;
- }
-
- @Override
- public boolean isMap(final Object o)
- {
- return o == null || o instanceof Map || o instanceof OrcStruct;
+ return new LinkedHashMap<>();
}
@Override
@@ -86,38 +61,6 @@ public class OrcStructJsonProvider implements JsonProvider
throw new UnsupportedOperationException(o.getClass().getName());
}
- @Override
- public Collection<String> getPropertyKeys(final Object o)
- {
- if (o == null) {
- return Collections.emptySet();
- } else if (o instanceof Map) {
- return ((Map<Object, Object>)
o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
- } else if (o instanceof OrcStruct) {
- return ((OrcStruct) o).getSchema().getFieldNames();
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- public Object getMapValue(final Object o, final String s)
- {
- if (o == null) {
- return null;
- } else if (o instanceof OrcMap) {
- OrcMap map = (OrcMap) o;
- return map.get(new Text(s));
- } else if (o instanceof Map) {
- return ((Map) o).get(s);
- } else if (o instanceof OrcStruct) {
- OrcStruct struct = (OrcStruct) o;
- // get field by index since we have no way to know if this map is the
root or not
- return converter.convertField(struct,
struct.getSchema().getFieldNames().indexOf(s));
- }
- throw new UnsupportedOperationException(o.getClass().getName());
- }
-
@Override
public Object getArrayIndex(final Object o, final int i)
{
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
index 589ed5ffbc..b54b51f863 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
@@ -54,9 +54,9 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
+ private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
private final boolean keepNullValues;
- private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
public JSONFlattenerMaker(boolean keepNullValues)
{
@@ -66,7 +66,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
- return FluentIterable.from(() -> obj.fields())
+ return FluentIterable.from(obj::fields)
.filter(
entry -> {
final JsonNode val = entry.getValue();
@@ -137,13 +137,13 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
public Object finalizeConversionForMap(Object o)
{
if (o instanceof JsonNode) {
- return convertJsonNode((JsonNode) o);
+ return convertJsonNode((JsonNode) o, enc);
}
return o;
}
@Nullable
- private Object convertJsonNode(JsonNode val)
+ public static Object convertJsonNode(JsonNode val, CharsetEncoder enc)
{
if (val == null || val.isNull()) {
return null;
@@ -158,7 +158,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
}
if (val.isTextual()) {
- return charsetFix(val.asText());
+ return charsetFix(val.asText(), enc);
}
if (val.isBoolean()) {
@@ -175,7 +175,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
List<Object> newList = new ArrayList<>();
for (JsonNode entry : val) {
if (!entry.isNull()) {
- newList.add(finalizeConversionForMap(entry));
+ newList.add(convertJsonNode(entry, enc));
}
}
return newList;
@@ -185,7 +185,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
Map<String, Object> newMap = new LinkedHashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> it = val.fields();
it.hasNext(); ) {
Map.Entry<String, JsonNode> entry = it.next();
- newMap.put(entry.getKey(), finalizeConversionForMap(entry.getValue()));
+ newMap.put(entry.getKey(), convertJsonNode(entry.getValue(), enc));
}
return newMap;
}
@@ -197,7 +197,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
}
@Nullable
- private String charsetFix(String s)
+ private static String charsetFix(String s, CharsetEncoder enc)
{
if (s != null && !enc.canEncode(s)) {
// Some whacky characters are in this string (e.g. \uD900). These are
problematic because they are decodeable
@@ -209,7 +209,7 @@ public class JSONFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<JsonN
}
}
- private boolean isFlatList(JsonNode list)
+ private static boolean isFlatList(JsonNode list)
{
for (JsonNode obj : list) {
if (obj.isObject() || obj.isArray()) {
diff --git
a/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
b/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
new file mode 100644
index 0000000000..24214be14e
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class FlattenerJsonProviderTest
+{
+ FlattenerJsonProvider jsonProvider = new FlattenerJsonProvider()
+ {
+ @Override
+ public boolean isArray(final Object o)
+ {
+ throw new RuntimeException("not tested");
+ }
+
+ @Override
+ public boolean isMap(final Object o)
+ {
+ throw new RuntimeException("not tested");
+ }
+
+ @Override
+ public Collection<String> getPropertyKeys(final Object o)
+ {
+ throw new RuntimeException("not tested");
+ }
+
+ @Override
+ public Object getMapValue(final Object o, final String s)
+ {
+ throw new RuntimeException("not tested");
+ }
+ };
+
+ @Test
+ public void testMapStuff()
+ {
+ Object aMap = jsonProvider.createMap();
+ jsonProvider.setProperty(aMap, "key", "value");
+ Assert.assertEquals(ImmutableMap.of("key", "value"), aMap);
+ jsonProvider.removeProperty(aMap, "key");
+ Assert.assertEquals(ImmutableMap.of(), aMap);
+ Assert.assertEquals(aMap, jsonProvider.unwrap(aMap));
+
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.setProperty(jsonProvider.createArray(), "key",
"value")
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.removeProperty(jsonProvider.createArray(), "key")
+ );
+ }
+
+ @Test
+ public void testArrayStuff()
+ {
+ Object aList = jsonProvider.createArray();
+ jsonProvider.setArrayIndex(aList, 0, "a");
+ jsonProvider.setArrayIndex(aList, 1, "b");
+ jsonProvider.setArrayIndex(aList, 2, "c");
+ Assert.assertEquals(3, jsonProvider.length(aList));
+ Assert.assertEquals("a", jsonProvider.getArrayIndex(aList, 0));
+ Assert.assertEquals("b", jsonProvider.getArrayIndex(aList, 1));
+ Assert.assertEquals("c", jsonProvider.getArrayIndex(aList, 2));
+ List<String> expected = ImmutableList.of("a", "b", "c");
+ Assert.assertEquals(expected, aList);
+ Iterator<?> iter = jsonProvider.toIterable(aList).iterator();
+ Iterator<String> expectedIter = expected.iterator();
+ while (iter.hasNext()) {
+ Assert.assertEquals(expectedIter.next(), iter.next());
+ }
+ Assert.assertFalse(expectedIter.hasNext());
+ Assert.assertEquals(aList, jsonProvider.unwrap(aList));
+
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.getArrayIndex(jsonProvider.createMap(), 0)
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.setArrayIndex(jsonProvider.createMap(), 0, "a")
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.toIterable(jsonProvider.createMap())
+ );
+ }
+
+ @Test
+ public void testNotImplementedOnPurpose()
+ {
+ Object aList = jsonProvider.createArray();
+ Throwable t = Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.toJson(aList)
+ );
+ Assert.assertEquals("Unused", t.getMessage());
+
+ t = Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.parse("{}")
+ );
+ Assert.assertEquals("Unused", t.getMessage());
+
+
+ t = Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.parse(new
ByteArrayInputStream(StringUtils.toUtf8("{}")), "UTF-8")
+ );
+ Assert.assertEquals("Unused", t.getMessage());
+
+ t = Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> jsonProvider.getArrayIndex(aList, 0, false)
+ );
+ Assert.assertEquals("Deprecated", t.getMessage());
+ }
+}
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index 86bbb7d7e9..0705500767 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -20,22 +20,17 @@
package org.apache.druid.data.input.avro;
import com.google.common.collect.ImmutableMap;
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import javax.annotation.Nullable;
-
-import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -43,7 +38,7 @@ import java.util.stream.Collectors;
/**
* JsonProvider for JsonPath + Avro.
*/
-public class GenericAvroJsonProvider implements JsonProvider
+public class GenericAvroJsonProvider extends FlattenerJsonProvider
{
private final boolean extractUnionsByType;
@@ -52,36 +47,6 @@ public class GenericAvroJsonProvider implements JsonProvider
this.extractUnionsByType = extractUnionsByType;
}
- @Override
- public Object parse(final String s) throws InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object parse(final InputStream inputStream, final String s) throws
InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public String toJson(final Object o)
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object createArray()
- {
- return new ArrayList<>();
- }
-
- @Override
- public Object createMap()
- {
- return new HashMap<>();
- }
-
@Override
public boolean isArray(final Object o)
{
@@ -100,16 +65,6 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
- @Override
- public Iterable<?> toIterable(final Object o)
- {
- if (o instanceof List) {
- return (List) o;
- } else {
- throw new UnsupportedOperationException();
- }
- }
-
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@@ -124,34 +79,6 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
- @Override
- public Object getArrayIndex(final Object o, final int i)
- {
- return ((List) o).get(i);
- }
-
- @Override
- @Deprecated
- public Object getArrayIndex(final Object o, final int i, final boolean b)
- {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- public void setArrayIndex(final Object o, final int i, final Object o1)
- {
- if (o instanceof List) {
- final List list = (List) o;
- if (list.size() == i) {
- list.add(o1);
- } else {
- list.set(i, o1);
- }
- } else {
- throw new UnsupportedOperationException();
- }
- }
-
@Nullable
@Override
public Object getMapValue(final Object o, final String s)
@@ -189,28 +116,12 @@ public class GenericAvroJsonProvider implements
JsonProvider
}
}
- @Override
- public void removeProperty(final Object o, final Object o1)
- {
- if (o instanceof Map) {
- ((Map) o).remove(o1);
- } else {
- throw new UnsupportedOperationException();
- }
- }
-
@Override
public boolean isMap(final Object o)
{
return o == null || o instanceof Map || o instanceof GenericRecord;
}
- @Override
- public Object unwrap(final Object o)
- {
- return o;
- }
-
private boolean isExtractableUnion(final Schema.Field field)
{
return field.schema().isUnion() &&
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
index 15f81b6f20..384a8f833a 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
@@ -19,22 +19,18 @@
package org.apache.druid.data.input.orc;
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
-import java.io.InputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class OrcStructJsonProvider implements JsonProvider
+public class OrcStructJsonProvider extends FlattenerJsonProvider
{
private final OrcStructConverter converter;
@@ -43,18 +39,6 @@ public class OrcStructJsonProvider implements JsonProvider
this.converter = converter;
}
- @Override
- public Object createArray()
- {
- return new ArrayList<>();
- }
-
- @Override
- public Object createMap()
- {
- return new HashMap<>();
- }
-
@Override
public boolean isArray(final Object o)
{
@@ -67,25 +51,6 @@ public class OrcStructJsonProvider implements JsonProvider
return o == null || o instanceof Map || o instanceof OrcStruct;
}
- @Override
- public int length(final Object o)
- {
- if (o instanceof List) {
- return ((List) o).size();
- } else {
- return 0;
- }
- }
-
- @Override
- public Iterable<?> toIterable(final Object o)
- {
- if (o instanceof List) {
- return (List) o;
- }
- throw new UnsupportedOperationException(o.getClass().getName());
- }
-
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@@ -117,79 +82,4 @@ public class OrcStructJsonProvider implements JsonProvider
}
throw new UnsupportedOperationException(o.getClass().getName());
}
-
- @Override
- public Object getArrayIndex(final Object o, final int i)
- {
- if (o instanceof List) {
- return ((List) o).get(i);
- }
- throw new UnsupportedOperationException(o.getClass().getName());
- }
-
- @Override
- public void setArrayIndex(final Object o, final int i, final Object o1)
- {
- if (o instanceof List) {
- final List list = (List) o;
- if (list.size() == i) {
- list.add(o1);
- } else {
- list.set(i, o1);
- }
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- public void setProperty(final Object o, final Object o1, final Object o2)
- {
- if (o instanceof Map) {
- ((Map) o).put(o1, o2);
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- public void removeProperty(final Object o, final Object o1)
- {
- if (o instanceof Map) {
- ((Map) o).remove(o1);
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- @Deprecated
- public Object getArrayIndex(final Object o, final int i, final boolean b)
- {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- public Object parse(final String s) throws InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object parse(final InputStream inputStream, final String s) throws
InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public String toJson(final Object o)
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object unwrap(final Object o)
- {
- return o;
- }
}
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
index 3190c7e28a..27234ea24a 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
@@ -19,15 +19,11 @@
package org.apache.druid.data.input.parquet.simple;
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import org.apache.parquet.example.data.Group;
-import java.io.InputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -35,7 +31,7 @@ import java.util.stream.Collectors;
/**
* Provides json path for Parquet {@link Group} objects
*/
-public class ParquetGroupJsonProvider implements JsonProvider
+public class ParquetGroupJsonProvider extends FlattenerJsonProvider
{
private final ParquetGroupConverter converter;
@@ -44,18 +40,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
this.converter = converter;
}
- @Override
- public Object createArray()
- {
- return new ArrayList<>();
- }
-
- @Override
- public Object createMap()
- {
- return new HashMap<>();
- }
-
@Override
public boolean isArray(final Object o)
{
@@ -82,15 +66,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
}
}
- @Override
- public Iterable<?> toIterable(final Object o)
- {
- if (o instanceof List) {
- return (List) o;
- }
- throw new UnsupportedOperationException(o.getClass().getName());
- }
-
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@@ -118,80 +93,5 @@ public class ParquetGroupJsonProvider implements
JsonProvider
}
throw new UnsupportedOperationException(o.getClass().getName());
}
-
- @Override
- public Object getArrayIndex(final Object o, final int i)
- {
- if (o instanceof List) {
- return ((List) o).get(i);
- }
- throw new UnsupportedOperationException(o.getClass().getName());
- }
-
- @Override
- public void setArrayIndex(final Object o, final int i, final Object o1)
- {
- if (o instanceof List) {
- final List list = (List) o;
- if (list.size() == i) {
- list.add(o1);
- } else {
- list.set(i, o1);
- }
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- public void setProperty(final Object o, final Object o1, final Object o2)
- {
- if (o instanceof Map) {
- ((Map) o).put(o1, o2);
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- public void removeProperty(final Object o, final Object o1)
- {
- if (o instanceof Map) {
- ((Map) o).remove(o1);
- } else {
- throw new UnsupportedOperationException(o.getClass().getName());
- }
- }
-
- @Override
- @Deprecated
- public Object getArrayIndex(final Object o, final int i, final boolean b)
- {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- public Object parse(final String s) throws InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object parse(final InputStream inputStream, final String s) throws
InvalidJsonException
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public String toJson(final Object o)
- {
- throw new UnsupportedOperationException("Unused");
- }
-
- @Override
- public Object unwrap(final Object o)
- {
- return o;
- }
}
diff --git a/extensions-core/protobuf-extensions/pom.xml
b/extensions-core/protobuf-extensions/pom.xml
index c7b7fc6e8b..4e717f5df5 100644
--- a/extensions-core/protobuf-extensions/pom.xml
+++ b/extensions-core/protobuf-extensions/pom.xml
@@ -52,6 +52,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@@ -133,13 +139,22 @@
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
- <version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.thisptr</groupId>
+ <artifactId>jackson-jq</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- test -->
<dependency>
@@ -161,7 +176,7 @@
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
- <scope>test</scope>
+ <type>test-jar</type>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
new file mode 100644
index 0000000000..a032e33ac8
--- /dev/null
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Duration;
+import com.google.protobuf.FieldMask;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
+import com.google.protobuf.Message;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import com.google.protobuf.Value;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.FieldMaskUtil;
+import com.google.protobuf.util.JsonFormat;
+import com.google.protobuf.util.Timestamps;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert {@link Message} to plain java stuffs, based roughly on the
conversions done with {@link JsonFormat}
+ */
+public class ProtobufConverter
+{
+ private static final Map<String, SpecializedConverter> SPECIAL_CONVERSIONS =
buildSpecializedConversions();
+
+ @Nullable
+ public static Map<String, Object> convertMessage(Message msg) throws
InvalidProtocolBufferException
+ {
+ if (msg == null) {
+ return null;
+ }
+ final Map<Descriptors.FieldDescriptor, Object> fields = msg.getAllFields();
+ final Map<String, Object> converted =
Maps.newHashMapWithExpectedSize(fields.size());
+ for (Map.Entry<Descriptors.FieldDescriptor, Object> field :
fields.entrySet()) {
+ converted.put(field.getKey().getJsonName(), convertField(field.getKey(),
field.getValue()));
+ }
+ return converted;
+ }
+
+ @Nullable
+ private static Object convertField(Descriptors.FieldDescriptor field, Object
value)
+ throws InvalidProtocolBufferException
+ {
+ // handle special types
+ if (value instanceof Message) {
+ Message msg = (Message) value;
+ final String typeName = msg.getDescriptorForType().getFullName();
+ SpecializedConverter converter = SPECIAL_CONVERSIONS.get(typeName);
+ if (converter != null) {
+ return converter.convert(msg);
+ }
+ }
+
+ if (field.isMapField()) {
+ return convertMap(field, value);
+ } else if (field.isRepeated()) {
+ return convertList(field, (List<?>) value);
+ } else {
+ return convertSingleValue(field, value);
+ }
+ }
+
+ @Nonnull
+ private static List<Object> convertList(Descriptors.FieldDescriptor field,
List<?> value)
+ throws InvalidProtocolBufferException
+ {
+ final List<Object> theList =
Lists.newArrayListWithExpectedSize(value.size());
+ for (Object element : value) {
+ theList.add(convertSingleValue(field, element));
+ }
+ return theList;
+ }
+
+ @Nullable
+ private static Object convertMap(Descriptors.FieldDescriptor field, Object
value)
+ throws InvalidProtocolBufferException
+ {
+ final Descriptors.Descriptor type = field.getMessageType();
+ final Descriptors.FieldDescriptor keyField = type.findFieldByName("key");
+ final Descriptors.FieldDescriptor valueField =
type.findFieldByName("value");
+ if (keyField == null || valueField == null) {
+ throw new InvalidProtocolBufferException("Invalid map field.");
+ }
+
+ @SuppressWarnings("unchecked")
+ final List<Object> elements = (List<Object>) value;
+ final HashMap<String, Object> theMap =
Maps.newHashMapWithExpectedSize(elements.size());
+ for (Object element : elements) {
+ Message entry = (Message) element;
+ theMap.put(
+ (String) convertSingleValue(keyField, entry.getField(keyField)),
+ convertSingleValue(valueField, entry.getField(valueField))
+ );
+ }
+ return theMap;
+ }
+
+ @Nullable
+ private static Object convertSingleValue(Descriptors.FieldDescriptor field,
Object value)
+ throws InvalidProtocolBufferException
+ {
+ switch (field.getType()) {
+ case BYTES:
+ return ((ByteString) value).toByteArray();
+ case ENUM:
+ if
("google.protobuf.NullValue".equals(field.getEnumType().getFullName())) {
+ return null;
+ } else {
+ return ((Descriptors.EnumValueDescriptor) value).getName();
+ }
+ case MESSAGE:
+ case GROUP:
+ return convertMessage((Message) value);
+ default:
+ // pass through everything else
+ return value;
+ }
+ }
+
+ private static Map<String, SpecializedConverter>
buildSpecializedConversions()
+ {
+ final Map<String, SpecializedConverter> converters = new HashMap<>();
+ final SpecializedConverter parappaTheWrappa = msg -> {
+ final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+ final Descriptors.FieldDescriptor valueField =
descriptor.findFieldByName("value");
+ if (valueField == null) {
+ throw new InvalidProtocolBufferException("Invalid Wrapper type.");
+ }
+ return convertSingleValue(valueField, msg.getField(valueField));
+ };
+ converters.put(BoolValue.getDescriptor().getFullName(), parappaTheWrappa);
+ converters.put(Int32Value.getDescriptor().getFullName(), parappaTheWrappa);
+ converters.put(UInt32Value.getDescriptor().getFullName(),
parappaTheWrappa);
+ converters.put(Int64Value.getDescriptor().getFullName(), parappaTheWrappa);
+ converters.put(UInt64Value.getDescriptor().getFullName(),
parappaTheWrappa);
+ converters.put(StringValue.getDescriptor().getFullName(),
parappaTheWrappa);
+ converters.put(BytesValue.getDescriptor().getFullName(), parappaTheWrappa);
+ converters.put(FloatValue.getDescriptor().getFullName(), parappaTheWrappa);
+ converters.put(DoubleValue.getDescriptor().getFullName(),
parappaTheWrappa);
+ converters.put(
+ Any.getDescriptor().getFullName(),
+ msg -> JsonFormat.printer().print(msg) // meh
+ );
+ converters.put(
+ Timestamp.getDescriptor().getFullName(),
+ msg -> {
+ final Timestamp ts = Timestamp.parseFrom(msg.toByteString());
+ return Timestamps.toString(ts);
+ }
+ );
+ converters.put(
+ Duration.getDescriptor().getFullName(),
+ msg -> {
+ final Duration duration = Duration.parseFrom(msg.toByteString());
+ return Durations.toString(duration);
+ }
+ );
+ converters.put(
+ FieldMask.getDescriptor().getFullName(),
+ msg ->
FieldMaskUtil.toJsonString(FieldMask.parseFrom(msg.toByteString()))
+ );
+ converters.put(
+ Struct.getDescriptor().getFullName(),
+ msg -> {
+ final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+ final Descriptors.FieldDescriptor field =
descriptor.findFieldByName("fields");
+ if (field == null) {
+ throw new InvalidProtocolBufferException("Invalid Struct type.");
+ }
+ // Struct is formatted as a map object.
+ return convertSingleValue(field, msg.getField(field));
+ }
+ );
+ converters.put(
+ Value.getDescriptor().getFullName(),
+ msg -> {
+ final Map<Descriptors.FieldDescriptor, Object> fields =
msg.getAllFields();
+ if (fields.isEmpty()) {
+ return null;
+ }
+ if (fields.size() != 1) {
+ throw new InvalidProtocolBufferException("Invalid Value type.");
+ }
+ final Map.Entry<Descriptors.FieldDescriptor, Object> entry =
fields.entrySet().stream().findFirst().get();
+ return convertSingleValue(entry.getKey(), entry.getValue());
+ }
+ );
+ converters.put(
+ ListValue.getDescriptor().getFullName(),
+ msg -> {
+ Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+ Descriptors.FieldDescriptor field =
descriptor.findFieldByName("values");
+ if (field == null) {
+ throw new InvalidProtocolBufferException("Invalid ListValue
type.");
+ }
+ return convertList(field, (List<?>) msg.getField(field));
+ }
+ );
+ return converters;
+ }
+
+ @FunctionalInterface
+ interface SpecializedConverter
+ {
+ @Nullable
+ Object convert(Message msg) throws InvalidProtocolBufferException;
+ }
+}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
new file mode 100644
index 0000000000..a3314e4273
--- /dev/null
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import net.thisptr.jackson.jq.JsonQuery;
+import net.thisptr.jackson.jq.exception.JsonQueryException;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Basically a plain java object {@link ObjectFlatteners.FlattenerMaker}, but
it lives here for now...
+ */
+public class ProtobufFlattenerMaker implements
ObjectFlatteners.FlattenerMaker<Map<String, Object>>
+{
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ProtobufJsonProvider JSON_PROVIDER = new
ProtobufJsonProvider();
+
+ private static final Configuration CONFIG = Configuration.builder()
+
.jsonProvider(JSON_PROVIDER)
+
.mappingProvider(new NotImplementedMappingProvider())
+
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
+ .build();
+
+ private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
+
+ @Override
+ public JsonProvider getJsonProvider()
+ {
+ return JSON_PROVIDER;
+ }
+
+ @Override
+ public Iterable<String> discoverRootFields(Map<String, Object> obj)
+ {
+ // in the future we can just return obj.keySet(), but for now this doesnt
expect nested fields...
+ Set<String> rootFields =
Sets.newHashSetWithExpectedSize(obj.keySet().size());
+ for (Map.Entry<String, Object> entry : obj.entrySet()) {
+ if (entry.getValue() instanceof List || entry.getValue() instanceof Map)
{
+ continue;
+ }
+ rootFields.add(entry.getKey());
+ }
+ return rootFields;
+ }
+
+ @Override
+ public Object getRootField(Map<String, Object> obj, String key)
+ {
+ return obj.get(key);
+ }
+
+ @Override
+ public Function<Map<String, Object>, Object> makeJsonPathExtractor(String
expr)
+ {
+ final JsonPath path = JsonPath.compile(expr);
+ return map -> path.read(map, CONFIG);
+ }
+
+ @Override
+ public Function<Map<String, Object>, Object> makeJsonQueryExtractor(String
expr)
+ {
+ final JsonQuery jsonQuery;
+ try {
+ jsonQuery = JsonQuery.compile(expr);
+ }
+ catch (JsonQueryException e) {
+ throw new RuntimeException(e);
+ }
+ return map -> {
+ try {
+ return JSONFlattenerMaker.convertJsonNode(
+ jsonQuery.apply((JsonNode) OBJECT_MAPPER.valueToTree(map)).get(0),
+ enc
+ );
+ }
+ catch (JsonQueryException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
index 36ae06875e..e1ae1a0e71 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable;
-
import java.io.File;
import java.util.Objects;
@@ -61,12 +60,7 @@ public class ProtobufInputFormat extends NestedInputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema,
InputEntity source, File temporaryDirectory)
{
- return new ProtobufReader(
- inputRowSchema,
- source,
- protobufBytesDecoder,
- getFlattenSpec()
- );
+ return new ProtobufReader(inputRowSchema, source, protobufBytesDecoder,
getFlattenSpec());
}
@Override
@@ -88,5 +82,4 @@ public class ProtobufInputFormat extends NestedInputFormat
{
return Objects.hash(protobufBytesDecoder, getFlattenSpec());
}
-
}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
new file mode 100644
index 0000000000..c8e3ebeba7
--- /dev/null
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Basically a plain java object {@link FlattenerJsonProvider}, but it lives
here for now...
+ */
+public class ProtobufJsonProvider extends FlattenerJsonProvider
+{
+ @Override
+ public boolean isArray(final Object o)
+ {
+ return o instanceof List;
+ }
+
+ @Override
+ public boolean isMap(final Object o)
+ {
+ return o == null || o instanceof Map;
+ }
+
+ @Override
+ public int length(final Object o)
+ {
+ if (o instanceof List) {
+ return ((List<?>) o).size();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public Collection<String> getPropertyKeys(final Object o)
+ {
+ if (o == null) {
+ return Collections.emptySet();
+ } else if (o instanceof Map) {
+ return ((Map<?, ?>)
o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
+ } else {
+ throw new UnsupportedOperationException(o.getClass().getName());
+ }
+ }
+
+ @Override
+ public Object getMapValue(final Object o, final String s)
+ {
+ if (o == null) {
+ return null;
+ } else if (o instanceof Map) {
+ return ((Map<?, ?>) o).get(s);
+ }
+ throw new UnsupportedOperationException(o.getClass().getName());
+ }
+}
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index d512d108c3..2dc6aa1f8c 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -19,13 +19,10 @@
package org.apache.druid.data.input.protobuf;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterators;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.util.JsonFormat;
+import com.google.protobuf.Message;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
@@ -34,12 +31,10 @@ import
org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -49,11 +44,9 @@ import java.util.Map;
public class ProtobufReader extends
IntermediateRowParsingReader<DynamicMessage>
{
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final InputRowSchema inputRowSchema;
private final InputEntity source;
- private final JSONPathSpec flattenSpec;
- private final ObjectFlattener<JsonNode> recordFlattener;
+ private final ObjectFlattener<Map<String, Object>> recordFlattener;
private final ProtobufBytesDecoder protobufBytesDecoder;
ProtobufReader(
@@ -63,17 +56,10 @@ public class ProtobufReader extends
IntermediateRowParsingReader<DynamicMessage>
JSONPathSpec flattenSpec
)
{
- if (flattenSpec == null) {
- this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema);
- this.recordFlattener = null;
- } else {
- this.inputRowSchema = inputRowSchema;
- this.recordFlattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(true));
- }
-
+ this.inputRowSchema = inputRowSchema;
+ this.recordFlattener = ObjectFlatteners.create(flattenSpec, new
ProtobufFlattenerMaker());
this.source = source;
this.protobufBytesDecoder = protobufBytesDecoder;
- this.flattenSpec = flattenSpec;
}
@Override
@@ -91,33 +77,27 @@ public class ProtobufReader extends
IntermediateRowParsingReader<DynamicMessage>
}
@Override
- protected List<InputRow> parseInputRows(DynamicMessage intermediateRow)
throws ParseException, JsonProcessingException
+ protected List<InputRow> parseInputRows(DynamicMessage intermediateRow)
throws ParseException
{
- Map<String, Object> record;
-
- if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) {
- try {
- record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k ->
k.getJsonName());
- }
- catch (Exception ex) {
- throw new ParseException(null, ex, "Protobuf message could not be
parsed");
- }
- } else {
- try {
- String json = JsonFormat.printer().print(intermediateRow);
- record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json,
JsonNode.class));
- }
- catch (InvalidProtocolBufferException e) {
- throw new ParseException(null, e, "Protobuf message could not be
parsed");
- }
- }
-
+ final Map<String, Object> record;
+ final Map<String, Object> plainJava = convertMessage(intermediateRow);
+ record = recordFlattener.flatten(plainJava);
return Collections.singletonList(MapInputRowParser.parse(inputRowSchema,
record));
}
@Override
- protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow)
throws JsonProcessingException, InvalidProtocolBufferException
+ protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow)
{
- return Collections.singletonList(new
ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow),
Map.class));
+ return Collections.singletonList(convertMessage(intermediateRow));
+ }
+
+ private static Map<String, Object> convertMessage(Message msg)
+ {
+ try {
+ return ProtobufConverter.convertMessage(msg);
+ }
+ catch (InvalidProtocolBufferException e) {
+ throw new ParseException(null, e, "Protobuf message could not be
parsed");
+ }
}
}
diff --git
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 219fb23aac..1d0cfe9166 100644
---
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -22,8 +22,12 @@ package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
@@ -36,6 +40,12 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.NestedDataDimensionSchema;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.TransformingInputEntityReader;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
@@ -46,6 +56,7 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
public class ProtobufInputFormatTest
{
@@ -63,6 +74,8 @@ public class ProtobufInputFormatTest
@Before
public void setUp() throws Exception
{
+ NullHandling.initializeForTests();
+ ExpressionProcessing.initializeForTests(null);
timestampSpec = new TimestampSpec("timestamp", "iso", null);
dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
@@ -125,7 +138,7 @@ public class ProtobufInputFormatTest
}
@Test
- public void testParseNestedData() throws Exception
+ public void testParseFlattenData() throws Exception
{
//configure parser with desc file
ProtobufInputFormat protobufInputFormat = new
ProtobufInputFormat(flattenSpec, decoder);
@@ -136,11 +149,225 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
- InputRow row = protobufInputFormat.createReader(new
InputRowSchema(timestampSpec, dimensionsSpec, null), entity,
null).read().next();
+ InputRow row = protobufInputFormat.createReader(
+ new InputRowSchema(timestampSpec, dimensionsSpec, null),
+ entity,
+ null
+ ).read().next();
+
+ Assert.assertEquals(
+ ImmutableList.builder()
+ .add("event")
+ .add("id")
+ .add("someOtherId")
+ .add("isValid")
+ .build(),
+ row.getDimensions()
+ );
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
+ @Test
+ public void testParseFlattenDataJq() throws Exception
+ {
+ //configure parser with desc file
+ ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+ new JSONPathSpec(
+ true,
+ Lists.newArrayList(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType",
"eventType"),
+ new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar",
".foo.bar"),
+ new JSONPathFieldSpec(JSONPathFieldType.JQ, "bar0",
".bar[0].bar")
+ )
+ ),
+ decoder
+ );
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30,
ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event =
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+ final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+ InputRow row = protobufInputFormat.createReader(
+ new InputRowSchema(timestampSpec, dimensionsSpec, null),
+ entity,
+ null
+ ).read().next();
+
+ Assert.assertEquals(
+ ImmutableList.builder()
+ .add("event")
+ .add("id")
+ .add("someOtherId")
+ .add("isValid")
+ .build(),
+ row.getDimensions()
+ );
+
+ ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+ }
+
+ @Test
+ public void testParseFlattenDataDiscover() throws Exception
+ {
+ //configure parser with desc file
+ ProtobufInputFormat protobufInputFormat = new
ProtobufInputFormat(flattenSpec, decoder);
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30,
ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event =
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+ final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+ InputRow row = protobufInputFormat.createReader(
+ new InputRowSchema(timestampSpec, new
DimensionsSpec(Collections.emptyList()), null),
+ entity,
+ null
+ ).read().next();
+
+ Assert.assertEquals(
+ ImmutableList.builder()
+ .add("eventType")
+ .add("foobar")
+ .add("bar0")
+ .add("someOtherId")
+ .add("someIntColumn")
+ .add("isValid")
+ .add("description")
+ .add("someLongColumn")
+ .add("someFloatColumn")
+ .add("id")
+ .add("timestamp")
+ .build(),
+ row.getDimensions()
+ );
+
+ ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+ }
+
+ @Test
+ public void testParseNestedData() throws Exception
+ {
+ ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+ JSONPathSpec.DEFAULT,
+ decoder
+ );
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30,
ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event =
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+ final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+ InputEntityReader reader = protobufInputFormat.createReader(
+ new InputRowSchema(
+ timestampSpec,
+ new DimensionsSpec(
+ Lists.newArrayList(
+ new StringDimensionSchema("event"),
+ new StringDimensionSchema("id"),
+ new StringDimensionSchema("someOtherId"),
+ new StringDimensionSchema("isValid"),
+ new StringDimensionSchema("eventType"),
+ new NestedDataDimensionSchema("foo"),
+ new NestedDataDimensionSchema("bar")
+ )
+ ),
+ null
+ ),
+ entity,
+ null
+ );
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ Lists.newArrayList(
+ new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')",
TestExprMacroTable.INSTANCE)
+ )
+ );
+ TransformingInputEntityReader transformingReader = new
TransformingInputEntityReader(
+ reader,
+ transformSpec.toTransformer()
+ );
+
+
+ InputRow row = transformingReader.read().next();
+
+ Assert.assertEquals(
+ ImmutableList.builder()
+ .add("event")
+ .add("id")
+ .add("someOtherId")
+ .add("isValid")
+ .add("eventType")
+ .add("foo")
+ .add("bar")
+ .build(),
+ row.getDimensions()
+ );
+
+ Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo"));
+ Assert.assertEquals(
+ ImmutableList.of(ImmutableMap.of("bar", "bar0"),
ImmutableMap.of("bar", "bar1")),
+ row.getRaw("bar")
+ );
+ ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+ }
+
+ @Test
+ public void testParseNestedDataTransformsOnly() throws Exception
+ {
+ ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+ JSONPathSpec.DEFAULT,
+ decoder
+ );
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30,
ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event =
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+ final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+ InputEntityReader reader = protobufInputFormat.createReader(
+ new InputRowSchema(
+ timestampSpec,
+ new DimensionsSpec(
+ Lists.newArrayList(
+ new StringDimensionSchema("event"),
+ new StringDimensionSchema("id"),
+ new StringDimensionSchema("someOtherId"),
+ new StringDimensionSchema("isValid"),
+ new StringDimensionSchema("eventType")
+ )
+ ),
+ null
+ ),
+ entity,
+ null
+ );
+
+ TransformSpec transformSpec = new TransformSpec(
+ null,
+ Lists.newArrayList(
+ new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')",
TestExprMacroTable.INSTANCE),
+ new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')",
TestExprMacroTable.INSTANCE)
+ )
+ );
+ TransformingInputEntityReader transformingReader = new
TransformingInputEntityReader(
+ reader,
+ transformSpec.toTransformer()
+ );
+
+
+ InputRow row = transformingReader.read().next();
+ ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+ }
+
@Test
public void testParseFlatData() throws Exception
{
@@ -153,7 +380,11 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
- InputRow row = protobufInputFormat.createReader(new
InputRowSchema(timestampSpec, dimensionsSpec, null), entity,
null).read().next();
+ InputRow row = protobufInputFormat.createReader(
+ new InputRowSchema(timestampSpec, dimensionsSpec, null),
+ entity,
+ null
+ ).read().next();
ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
}
@@ -170,7 +401,11 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
- InputRow row = protobufInputFormat.createReader(new
InputRowSchema(timestampSpec, dimensionsSpec, null), entity,
null).read().next();
+ InputRow row = protobufInputFormat.createReader(
+ new InputRowSchema(timestampSpec, dimensionsSpec, null),
+ entity,
+ null
+ ).read().next();
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
diff --git
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
index eba0c8cb95..24807e6424 100644
---
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
+++
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
@@ -28,21 +28,15 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
public class ProtobufReaderTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
private InputRowSchema inputRowSchema;
private InputRowSchema inputRowSchemaWithComplexTimestamp;
private JSONPathSpec flattenSpec;
@@ -129,8 +123,6 @@ public class ProtobufReaderTest
@Test
public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec()
throws Exception
{
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("is unparseable!");
ProtobufReader reader = new ProtobufReader(
inputRowSchemaWithComplexTimestamp,
null,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]