This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 25.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/25.0.0 by this push:
     new 93e2a7fc88 add protobuf flattener, direct to plain java conversion for 
faster flattening (#13519) (#13546)
93e2a7fc88 is described below

commit 93e2a7fc88c3f63f2700d86bb7ebd45ee73c4feb
Author: Clint Wylie <[email protected]>
AuthorDate: Sun Dec 11 19:57:47 2022 -0800

    add protobuf flattener, direct to plain java conversion for faster 
flattening (#13519) (#13546)
    
    * add protobuf flattener, direct to plain java conversion for faster 
flattening, nested column tests
---
 .../util/common/parsers/FlattenerJsonProvider.java |  65 +-----
 .../util/common/parsers/JSONFlattenerMaker.java    |  18 +-
 .../common/parsers/FlattenerJsonProviderTest.java  | 146 ++++++++++++
 .../data/input/avro/GenericAvroJsonProvider.java   |  93 +-------
 .../data/input/orc/OrcStructJsonProvider.java      | 114 +---------
 .../parquet/simple/ParquetGroupJsonProvider.java   | 104 +--------
 extensions-core/protobuf-extensions/pom.xml        |  19 +-
 .../data/input/protobuf/ProtobufConverter.java     | 244 +++++++++++++++++++++
 .../input/protobuf/ProtobufFlattenerMaker.java     | 114 ++++++++++
 .../data/input/protobuf/ProtobufInputFormat.java   |   9 +-
 .../data/input/protobuf/ProtobufJsonProvider.java  |  79 +++++++
 .../druid/data/input/protobuf/ProtobufReader.java  |  60 ++---
 .../input/protobuf/ProtobufInputFormatTest.java    | 243 +++++++++++++++++++-
 .../data/input/protobuf/ProtobufReaderTest.java    |   8 -
 14 files changed, 879 insertions(+), 437 deletions(-)

diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
similarity index 63%
copy from 
extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
copy to 
core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
index 15f81b6f20..2574f94b68 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java
@@ -17,32 +17,19 @@
  * under the License.
  */
 
-package org.apache.druid.data.input.orc;
+package org.apache.druid.java.util.common.parsers;
 
 import com.jayway.jsonpath.InvalidJsonException;
 import com.jayway.jsonpath.spi.json.JsonProvider;
-import org.apache.hadoop.io.Text;
-import org.apache.orc.mapred.OrcMap;
-import org.apache.orc.mapred.OrcStruct;
 
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
-public class OrcStructJsonProvider implements JsonProvider
+public abstract class FlattenerJsonProvider implements JsonProvider
 {
-  private final OrcStructConverter converter;
-
-  OrcStructJsonProvider(OrcStructConverter converter)
-  {
-    this.converter = converter;
-  }
-
   @Override
   public Object createArray()
   {
@@ -52,19 +39,7 @@ public class OrcStructJsonProvider implements JsonProvider
   @Override
   public Object createMap()
   {
-    return new HashMap<>();
-  }
-
-  @Override
-  public boolean isArray(final Object o)
-  {
-    return o instanceof List;
-  }
-
-  @Override
-  public boolean isMap(final Object o)
-  {
-    return o == null || o instanceof Map || o instanceof OrcStruct;
+    return new LinkedHashMap<>();
   }
 
   @Override
@@ -86,38 +61,6 @@ public class OrcStructJsonProvider implements JsonProvider
     throw new UnsupportedOperationException(o.getClass().getName());
   }
 
-  @Override
-  public Collection<String> getPropertyKeys(final Object o)
-  {
-    if (o == null) {
-      return Collections.emptySet();
-    } else if (o instanceof Map) {
-      return ((Map<Object, Object>) 
o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
-    } else if (o instanceof OrcStruct) {
-      return ((OrcStruct) o).getSchema().getFieldNames();
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  public Object getMapValue(final Object o, final String s)
-  {
-    if (o == null) {
-      return null;
-    } else if (o instanceof OrcMap) {
-      OrcMap map = (OrcMap) o;
-      return map.get(new Text(s));
-    } else if (o instanceof Map) {
-      return ((Map) o).get(s);
-    } else if (o instanceof OrcStruct) {
-      OrcStruct struct = (OrcStruct) o;
-      // get field by index since we have no way to know if this map is the 
root or not
-      return converter.convertField(struct, 
struct.getSchema().getFieldNames().indexOf(s));
-    }
-    throw new UnsupportedOperationException(o.getClass().getName());
-  }
-
   @Override
   public Object getArrayIndex(final Object o, final int i)
   {
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
index 589ed5ffbc..b54b51f863 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
@@ -54,9 +54,9 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
                    .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
                    .build();
 
+  private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
   private final boolean keepNullValues;
 
-  private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
 
   public JSONFlattenerMaker(boolean keepNullValues)
   {
@@ -66,7 +66,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
   @Override
   public Iterable<String> discoverRootFields(final JsonNode obj)
   {
-    return FluentIterable.from(() -> obj.fields())
+    return FluentIterable.from(obj::fields)
                          .filter(
                              entry -> {
                                final JsonNode val = entry.getValue();
@@ -137,13 +137,13 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
   public Object finalizeConversionForMap(Object o)
   {
     if (o instanceof JsonNode) {
-      return convertJsonNode((JsonNode) o);
+      return convertJsonNode((JsonNode) o, enc);
     }
     return o;
   }
 
   @Nullable
-  private Object convertJsonNode(JsonNode val)
+  public static Object convertJsonNode(JsonNode val, CharsetEncoder enc)
   {
     if (val == null || val.isNull()) {
       return null;
@@ -158,7 +158,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
     }
 
     if (val.isTextual()) {
-      return charsetFix(val.asText());
+      return charsetFix(val.asText(), enc);
     }
 
     if (val.isBoolean()) {
@@ -175,7 +175,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
       List<Object> newList = new ArrayList<>();
       for (JsonNode entry : val) {
         if (!entry.isNull()) {
-          newList.add(finalizeConversionForMap(entry));
+          newList.add(convertJsonNode(entry, enc));
         }
       }
       return newList;
@@ -185,7 +185,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
       Map<String, Object> newMap = new LinkedHashMap<>();
       for (Iterator<Map.Entry<String, JsonNode>> it = val.fields(); 
it.hasNext(); ) {
         Map.Entry<String, JsonNode> entry = it.next();
-        newMap.put(entry.getKey(), finalizeConversionForMap(entry.getValue()));
+        newMap.put(entry.getKey(), convertJsonNode(entry.getValue(), enc));
       }
       return newMap;
     }
@@ -197,7 +197,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
   }
 
   @Nullable
-  private String charsetFix(String s)
+  private static String charsetFix(String s, CharsetEncoder enc)
   {
     if (s != null && !enc.canEncode(s)) {
       // Some whacky characters are in this string (e.g. \uD900). These are 
problematic because they are decodeable
@@ -209,7 +209,7 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
     }
   }
 
-  private boolean isFlatList(JsonNode list)
+  private static boolean isFlatList(JsonNode list)
   {
     for (JsonNode obj : list) {
       if (obj.isObject() || obj.isArray()) {
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
new file mode 100644
index 0000000000..24214be14e
--- /dev/null
+++ 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class FlattenerJsonProviderTest
+{
+  FlattenerJsonProvider jsonProvider = new FlattenerJsonProvider()
+  {
+    @Override
+    public boolean isArray(final Object o)
+    {
+      throw new RuntimeException("not tested");
+    }
+
+    @Override
+    public boolean isMap(final Object o)
+    {
+      throw new RuntimeException("not tested");
+    }
+
+    @Override
+    public Collection<String> getPropertyKeys(final Object o)
+    {
+      throw new RuntimeException("not tested");
+    }
+
+    @Override
+    public Object getMapValue(final Object o, final String s)
+    {
+      throw new RuntimeException("not tested");
+    }
+  };
+
+  @Test
+  public void testMapStuff()
+  {
+    Object aMap = jsonProvider.createMap();
+    jsonProvider.setProperty(aMap, "key", "value");
+    Assert.assertEquals(ImmutableMap.of("key", "value"), aMap);
+    jsonProvider.removeProperty(aMap, "key");
+    Assert.assertEquals(ImmutableMap.of(), aMap);
+    Assert.assertEquals(aMap, jsonProvider.unwrap(aMap));
+
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.setProperty(jsonProvider.createArray(), "key", 
"value")
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.removeProperty(jsonProvider.createArray(), "key")
+    );
+  }
+
+  @Test
+  public void testArrayStuff()
+  {
+    Object aList = jsonProvider.createArray();
+    jsonProvider.setArrayIndex(aList, 0, "a");
+    jsonProvider.setArrayIndex(aList, 1, "b");
+    jsonProvider.setArrayIndex(aList, 2, "c");
+    Assert.assertEquals(3, jsonProvider.length(aList));
+    Assert.assertEquals("a", jsonProvider.getArrayIndex(aList, 0));
+    Assert.assertEquals("b", jsonProvider.getArrayIndex(aList, 1));
+    Assert.assertEquals("c", jsonProvider.getArrayIndex(aList, 2));
+    List<String> expected = ImmutableList.of("a", "b", "c");
+    Assert.assertEquals(expected, aList);
+    Iterator<?> iter = jsonProvider.toIterable(aList).iterator();
+    Iterator<String> expectedIter = expected.iterator();
+    while (iter.hasNext()) {
+      Assert.assertEquals(expectedIter.next(), iter.next());
+    }
+    Assert.assertFalse(expectedIter.hasNext());
+    Assert.assertEquals(aList, jsonProvider.unwrap(aList));
+
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.getArrayIndex(jsonProvider.createMap(), 0)
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.setArrayIndex(jsonProvider.createMap(), 0, "a")
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.toIterable(jsonProvider.createMap())
+    );
+  }
+
+  @Test
+  public void testNotImplementedOnPurpose()
+  {
+    Object aList = jsonProvider.createArray();
+    Throwable t = Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.toJson(aList)
+    );
+    Assert.assertEquals("Unused", t.getMessage());
+
+    t = Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.parse("{}")
+    );
+    Assert.assertEquals("Unused", t.getMessage());
+
+
+    t = Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.parse(new 
ByteArrayInputStream(StringUtils.toUtf8("{}")), "UTF-8")
+    );
+    Assert.assertEquals("Unused", t.getMessage());
+
+    t = Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> jsonProvider.getArrayIndex(aList, 0, false)
+    );
+    Assert.assertEquals("Deprecated", t.getMessage());
+  }
+}
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index 86bbb7d7e9..0705500767 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -20,22 +20,17 @@
 package org.apache.druid.data.input.avro;
 
 import com.google.common.collect.ImmutableMap;
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
 
 import javax.annotation.Nullable;
-
-import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -43,7 +38,7 @@ import java.util.stream.Collectors;
 /**
  * JsonProvider for JsonPath + Avro.
  */
-public class GenericAvroJsonProvider implements JsonProvider
+public class GenericAvroJsonProvider extends FlattenerJsonProvider
 {
   private final boolean extractUnionsByType;
 
@@ -52,36 +47,6 @@ public class GenericAvroJsonProvider implements JsonProvider
     this.extractUnionsByType = extractUnionsByType;
   }
 
-  @Override
-  public Object parse(final String s) throws InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object parse(final InputStream inputStream, final String s) throws 
InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public String toJson(final Object o)
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object createArray()
-  {
-    return new ArrayList<>();
-  }
-
-  @Override
-  public Object createMap()
-  {
-    return new HashMap<>();
-  }
-
   @Override
   public boolean isArray(final Object o)
   {
@@ -100,16 +65,6 @@ public class GenericAvroJsonProvider implements JsonProvider
     }
   }
 
-  @Override
-  public Iterable<?> toIterable(final Object o)
-  {
-    if (o instanceof List) {
-      return (List) o;
-    } else {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   @Override
   public Collection<String> getPropertyKeys(final Object o)
   {
@@ -124,34 +79,6 @@ public class GenericAvroJsonProvider implements JsonProvider
     }
   }
 
-  @Override
-  public Object getArrayIndex(final Object o, final int i)
-  {
-    return ((List) o).get(i);
-  }
-
-  @Override
-  @Deprecated
-  public Object getArrayIndex(final Object o, final int i, final boolean b)
-  {
-    throw new UnsupportedOperationException("Deprecated");
-  }
-
-  @Override
-  public void setArrayIndex(final Object o, final int i, final Object o1)
-  {
-    if (o instanceof List) {
-      final List list = (List) o;
-      if (list.size() == i) {
-        list.add(o1);
-      } else {
-        list.set(i, o1);
-      }
-    } else {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   @Nullable
   @Override
   public Object getMapValue(final Object o, final String s)
@@ -189,28 +116,12 @@ public class GenericAvroJsonProvider implements 
JsonProvider
     }
   }
 
-  @Override
-  public void removeProperty(final Object o, final Object o1)
-  {
-    if (o instanceof Map) {
-      ((Map) o).remove(o1);
-    } else {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   @Override
   public boolean isMap(final Object o)
   {
     return o == null || o instanceof Map || o instanceof GenericRecord;
   }
 
-  @Override
-  public Object unwrap(final Object o)
-  {
-    return o;
-  }
-
   private boolean isExtractableUnion(final Schema.Field field)
   {
     return field.schema().isUnion() &&
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
index 15f81b6f20..384a8f833a 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java
@@ -19,22 +19,18 @@
 
 package org.apache.druid.data.input.orc;
 
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.mapred.OrcMap;
 import org.apache.orc.mapred.OrcStruct;
 
-import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class OrcStructJsonProvider implements JsonProvider
+public class OrcStructJsonProvider extends FlattenerJsonProvider
 {
   private final OrcStructConverter converter;
 
@@ -43,18 +39,6 @@ public class OrcStructJsonProvider implements JsonProvider
     this.converter = converter;
   }
 
-  @Override
-  public Object createArray()
-  {
-    return new ArrayList<>();
-  }
-
-  @Override
-  public Object createMap()
-  {
-    return new HashMap<>();
-  }
-
   @Override
   public boolean isArray(final Object o)
   {
@@ -67,25 +51,6 @@ public class OrcStructJsonProvider implements JsonProvider
     return o == null || o instanceof Map || o instanceof OrcStruct;
   }
 
-  @Override
-  public int length(final Object o)
-  {
-    if (o instanceof List) {
-      return ((List) o).size();
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public Iterable<?> toIterable(final Object o)
-  {
-    if (o instanceof List) {
-      return (List) o;
-    }
-    throw new UnsupportedOperationException(o.getClass().getName());
-  }
-
   @Override
   public Collection<String> getPropertyKeys(final Object o)
   {
@@ -117,79 +82,4 @@ public class OrcStructJsonProvider implements JsonProvider
     }
     throw new UnsupportedOperationException(o.getClass().getName());
   }
-
-  @Override
-  public Object getArrayIndex(final Object o, final int i)
-  {
-    if (o instanceof List) {
-      return ((List) o).get(i);
-    }
-    throw new UnsupportedOperationException(o.getClass().getName());
-  }
-
-  @Override
-  public void setArrayIndex(final Object o, final int i, final Object o1)
-  {
-    if (o instanceof List) {
-      final List list = (List) o;
-      if (list.size() == i) {
-        list.add(o1);
-      } else {
-        list.set(i, o1);
-      }
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  public void setProperty(final Object o, final Object o1, final Object o2)
-  {
-    if (o instanceof Map) {
-      ((Map) o).put(o1, o2);
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  public void removeProperty(final Object o, final Object o1)
-  {
-    if (o instanceof Map) {
-      ((Map) o).remove(o1);
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  @Deprecated
-  public Object getArrayIndex(final Object o, final int i, final boolean b)
-  {
-    throw new UnsupportedOperationException("Deprecated");
-  }
-
-  @Override
-  public Object parse(final String s) throws InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object parse(final InputStream inputStream, final String s) throws 
InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public String toJson(final Object o)
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object unwrap(final Object o)
-  {
-    return o;
-  }
 }
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
index 3190c7e28a..27234ea24a 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java
@@ -19,15 +19,11 @@
 
 package org.apache.druid.data.input.parquet.simple;
 
-import com.jayway.jsonpath.InvalidJsonException;
-import com.jayway.jsonpath.spi.json.JsonProvider;
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
 import org.apache.parquet.example.data.Group;
 
-import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -35,7 +31,7 @@ import java.util.stream.Collectors;
 /**
  * Provides json path for Parquet {@link Group} objects
  */
-public class ParquetGroupJsonProvider implements JsonProvider
+public class ParquetGroupJsonProvider extends FlattenerJsonProvider
 {
   private final ParquetGroupConverter converter;
 
@@ -44,18 +40,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
     this.converter = converter;
   }
 
-  @Override
-  public Object createArray()
-  {
-    return new ArrayList<>();
-  }
-
-  @Override
-  public Object createMap()
-  {
-    return new HashMap<>();
-  }
-
   @Override
   public boolean isArray(final Object o)
   {
@@ -82,15 +66,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
     }
   }
 
-  @Override
-  public Iterable<?> toIterable(final Object o)
-  {
-    if (o instanceof List) {
-      return (List) o;
-    }
-    throw new UnsupportedOperationException(o.getClass().getName());
-  }
-
   @Override
   public Collection<String> getPropertyKeys(final Object o)
   {
@@ -118,80 +93,5 @@ public class ParquetGroupJsonProvider implements 
JsonProvider
     }
     throw new UnsupportedOperationException(o.getClass().getName());
   }
-
-  @Override
-  public Object getArrayIndex(final Object o, final int i)
-  {
-    if (o instanceof List) {
-      return ((List) o).get(i);
-    }
-    throw new UnsupportedOperationException(o.getClass().getName());
-  }
-
-  @Override
-  public void setArrayIndex(final Object o, final int i, final Object o1)
-  {
-    if (o instanceof List) {
-      final List list = (List) o;
-      if (list.size() == i) {
-        list.add(o1);
-      } else {
-        list.set(i, o1);
-      }
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  public void setProperty(final Object o, final Object o1, final Object o2)
-  {
-    if (o instanceof Map) {
-      ((Map) o).put(o1, o2);
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  public void removeProperty(final Object o, final Object o1)
-  {
-    if (o instanceof Map) {
-      ((Map) o).remove(o1);
-    } else {
-      throw new UnsupportedOperationException(o.getClass().getName());
-    }
-  }
-
-  @Override
-  @Deprecated
-  public Object getArrayIndex(final Object o, final int i, final boolean b)
-  {
-    throw new UnsupportedOperationException("Deprecated");
-  }
-
-  @Override
-  public Object parse(final String s) throws InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object parse(final InputStream inputStream, final String s) throws 
InvalidJsonException
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public String toJson(final Object o)
-  {
-    throw new UnsupportedOperationException("Unused");
-  }
-
-  @Override
-  public Object unwrap(final Object o)
-  {
-    return o;
-  }
 }
 
diff --git a/extensions-core/protobuf-extensions/pom.xml 
b/extensions-core/protobuf-extensions/pom.xml
index c7b7fc6e8b..4e717f5df5 100644
--- a/extensions-core/protobuf-extensions/pom.xml
+++ b/extensions-core/protobuf-extensions/pom.xml
@@ -52,6 +52,12 @@
       <version>${project.parent.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-processing</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
@@ -133,13 +139,22 @@
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
-      <version>2.0.1</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.jayway.jsonpath</groupId>
+      <artifactId>json-path</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.thisptr</groupId>
+      <artifactId>jackson-jq</artifactId>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- test -->
     <dependency>
@@ -161,7 +176,7 @@
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-processing</artifactId>
       <version>${project.parent.version}</version>
-      <scope>test</scope>
+      <type>test-jar</type>
     </dependency>
     <dependency>
       <groupId>nl.jqno.equalsverifier</groupId>
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
new file mode 100644
index 0000000000..a032e33ac8
--- /dev/null
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Duration;
+import com.google.protobuf.FieldMask;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
+import com.google.protobuf.Message;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import com.google.protobuf.Value;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.FieldMaskUtil;
+import com.google.protobuf.util.JsonFormat;
+import com.google.protobuf.util.Timestamps;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Convert {@link Message} to plain java stuffs, based roughly on the 
conversions done with {@link JsonFormat}
+ */
+public class ProtobufConverter
+{
+  private static final Map<String, SpecializedConverter> SPECIAL_CONVERSIONS = 
buildSpecializedConversions();
+
+  @Nullable
+  public static Map<String, Object> convertMessage(Message msg) throws 
InvalidProtocolBufferException
+  {
+    if (msg == null) {
+      return null;
+    }
+    final Map<Descriptors.FieldDescriptor, Object> fields = msg.getAllFields();
+    final Map<String, Object> converted = 
Maps.newHashMapWithExpectedSize(fields.size());
+    for (Map.Entry<Descriptors.FieldDescriptor, Object> field : 
fields.entrySet()) {
+      converted.put(field.getKey().getJsonName(), convertField(field.getKey(), 
field.getValue()));
+    }
+    return converted;
+  }
+
+  @Nullable
+  private static Object convertField(Descriptors.FieldDescriptor field, Object 
value)
+      throws InvalidProtocolBufferException
+  {
+    // handle special types
+    if (value instanceof Message) {
+      Message msg = (Message) value;
+      final String typeName = msg.getDescriptorForType().getFullName();
+      SpecializedConverter converter = SPECIAL_CONVERSIONS.get(typeName);
+      if (converter != null) {
+        return converter.convert(msg);
+      }
+    }
+
+    if (field.isMapField()) {
+      return convertMap(field, value);
+    } else if (field.isRepeated()) {
+      return convertList(field, (List<?>) value);
+    } else {
+      return convertSingleValue(field, value);
+    }
+  }
+
+  @Nonnull
+  private static List<Object> convertList(Descriptors.FieldDescriptor field, 
List<?> value)
+      throws InvalidProtocolBufferException
+  {
+    final List<Object> theList = 
Lists.newArrayListWithExpectedSize(value.size());
+    for (Object element : value) {
+      theList.add(convertSingleValue(field, element));
+    }
+    return theList;
+  }
+
+  @Nullable
+  private static Object convertMap(Descriptors.FieldDescriptor field, Object 
value)
+      throws InvalidProtocolBufferException
+  {
+    final Descriptors.Descriptor type = field.getMessageType();
+    final Descriptors.FieldDescriptor keyField = type.findFieldByName("key");
+    final Descriptors.FieldDescriptor valueField = 
type.findFieldByName("value");
+    if (keyField == null || valueField == null) {
+      throw new InvalidProtocolBufferException("Invalid map field.");
+    }
+
+    @SuppressWarnings("unchecked")
+    final List<Object> elements = (List<Object>) value;
+    final HashMap<String, Object> theMap = 
Maps.newHashMapWithExpectedSize(elements.size());
+    for (Object element : elements) {
+      Message entry = (Message) element;
+      theMap.put(
+          (String) convertSingleValue(keyField, entry.getField(keyField)),
+          convertSingleValue(valueField, entry.getField(valueField))
+      );
+    }
+    return theMap;
+  }
+
+  @Nullable
+  private static Object convertSingleValue(Descriptors.FieldDescriptor field, 
Object value)
+      throws InvalidProtocolBufferException
+  {
+    switch (field.getType()) {
+      case BYTES:
+        return ((ByteString) value).toByteArray();
+      case ENUM:
+        if 
("google.protobuf.NullValue".equals(field.getEnumType().getFullName())) {
+          return null;
+        } else {
+          return ((Descriptors.EnumValueDescriptor) value).getName();
+        }
+      case MESSAGE:
+      case GROUP:
+        return convertMessage((Message) value);
+      default:
+        // pass through everything else
+        return value;
+    }
+  }
+
+  private static Map<String, SpecializedConverter> 
buildSpecializedConversions()
+  {
+    final Map<String, SpecializedConverter> converters = new HashMap<>();
+    final SpecializedConverter parappaTheWrappa = msg -> {
+      final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+      final Descriptors.FieldDescriptor valueField = 
descriptor.findFieldByName("value");
+      if (valueField == null) {
+        throw new InvalidProtocolBufferException("Invalid Wrapper type.");
+      }
+      return convertSingleValue(valueField, msg.getField(valueField));
+    };
+    converters.put(BoolValue.getDescriptor().getFullName(), parappaTheWrappa);
+    converters.put(Int32Value.getDescriptor().getFullName(), parappaTheWrappa);
+    converters.put(UInt32Value.getDescriptor().getFullName(), 
parappaTheWrappa);
+    converters.put(Int64Value.getDescriptor().getFullName(), parappaTheWrappa);
+    converters.put(UInt64Value.getDescriptor().getFullName(), 
parappaTheWrappa);
+    converters.put(StringValue.getDescriptor().getFullName(), 
parappaTheWrappa);
+    converters.put(BytesValue.getDescriptor().getFullName(), parappaTheWrappa);
+    converters.put(FloatValue.getDescriptor().getFullName(), parappaTheWrappa);
+    converters.put(DoubleValue.getDescriptor().getFullName(), 
parappaTheWrappa);
+    converters.put(
+        Any.getDescriptor().getFullName(),
+        msg -> JsonFormat.printer().print(msg) // meh
+    );
+    converters.put(
+        Timestamp.getDescriptor().getFullName(),
+        msg -> {
+          final Timestamp ts = Timestamp.parseFrom(msg.toByteString());
+          return Timestamps.toString(ts);
+        }
+    );
+    converters.put(
+        Duration.getDescriptor().getFullName(),
+        msg -> {
+          final Duration duration = Duration.parseFrom(msg.toByteString());
+          return Durations.toString(duration);
+        }
+    );
+    converters.put(
+        FieldMask.getDescriptor().getFullName(),
+        msg -> 
FieldMaskUtil.toJsonString(FieldMask.parseFrom(msg.toByteString()))
+    );
+    converters.put(
+        Struct.getDescriptor().getFullName(),
+        msg -> {
+          final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+          final Descriptors.FieldDescriptor field = 
descriptor.findFieldByName("fields");
+          if (field == null) {
+            throw new InvalidProtocolBufferException("Invalid Struct type.");
+          }
+          // Struct is formatted as a map object.
+          return convertSingleValue(field, msg.getField(field));
+        }
+    );
+    converters.put(
+        Value.getDescriptor().getFullName(),
+        msg -> {
+          final Map<Descriptors.FieldDescriptor, Object> fields = 
msg.getAllFields();
+          if (fields.isEmpty()) {
+            return null;
+          }
+          if (fields.size() != 1) {
+            throw new InvalidProtocolBufferException("Invalid Value type.");
+          }
+          final Map.Entry<Descriptors.FieldDescriptor, Object> entry = 
fields.entrySet().stream().findFirst().get();
+          return convertSingleValue(entry.getKey(), entry.getValue());
+        }
+    );
+    converters.put(
+        ListValue.getDescriptor().getFullName(),
+        msg -> {
+          Descriptors.Descriptor descriptor = msg.getDescriptorForType();
+          Descriptors.FieldDescriptor field = 
descriptor.findFieldByName("values");
+          if (field == null) {
+            throw new InvalidProtocolBufferException("Invalid ListValue 
type.");
+          }
+          return convertList(field, (List<?>) msg.getField(field));
+        }
+    );
+    return converters;
+  }
+
+  @FunctionalInterface
+  interface SpecializedConverter
+  {
+    @Nullable
+    Object convert(Message msg) throws InvalidProtocolBufferException;
+  }
+}
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
new file mode 100644
index 0000000000..a3314e4273
--- /dev/null
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.spi.json.JsonProvider;
+import net.thisptr.jackson.jq.JsonQuery;
+import net.thisptr.jackson.jq.exception.JsonQueryException;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Basically a plain java object {@link ObjectFlatteners.FlattenerMaker}, but 
it lives here for now...
+ */
+public class ProtobufFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<Map<String, Object>>
+{
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final ProtobufJsonProvider JSON_PROVIDER = new 
ProtobufJsonProvider();
+
+  private static final Configuration CONFIG = Configuration.builder()
+                                                           
.jsonProvider(JSON_PROVIDER)
+                                                           
.mappingProvider(new NotImplementedMappingProvider())
+                                                           
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
+                                                           .build();
+
+  private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
+
+  @Override
+  public JsonProvider getJsonProvider()
+  {
+    return JSON_PROVIDER;
+  }
+
+  @Override
+  public Iterable<String> discoverRootFields(Map<String, Object> obj)
+  {
+    // in the future we can just return obj.keySet(), but for now this doesnt 
expect nested fields...
+    Set<String> rootFields = 
Sets.newHashSetWithExpectedSize(obj.keySet().size());
+    for (Map.Entry<String, Object> entry : obj.entrySet()) {
+      if (entry.getValue() instanceof List || entry.getValue() instanceof Map) 
{
+        continue;
+      }
+      rootFields.add(entry.getKey());
+    }
+    return rootFields;
+  }
+
+  @Override
+  public Object getRootField(Map<String, Object> obj, String key)
+  {
+    return obj.get(key);
+  }
+
+  @Override
+  public Function<Map<String, Object>, Object> makeJsonPathExtractor(String 
expr)
+  {
+    final JsonPath path = JsonPath.compile(expr);
+    return map -> path.read(map, CONFIG);
+  }
+
+  @Override
+  public Function<Map<String, Object>, Object> makeJsonQueryExtractor(String 
expr)
+  {
+    final JsonQuery jsonQuery;
+    try {
+      jsonQuery = JsonQuery.compile(expr);
+    }
+    catch (JsonQueryException e) {
+      throw new RuntimeException(e);
+    }
+    return map -> {
+      try {
+        return JSONFlattenerMaker.convertJsonNode(
+            jsonQuery.apply((JsonNode) OBJECT_MAPPER.valueToTree(map)).get(0),
+            enc
+        );
+      }
+      catch (JsonQueryException e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+}
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
index 36ae06875e..e1ae1a0e71 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 
 import javax.annotation.Nullable;
-
 import java.io.File;
 import java.util.Objects;
 
@@ -61,12 +60,7 @@ public class ProtobufInputFormat extends NestedInputFormat
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
   {
-    return new ProtobufReader(
-        inputRowSchema,
-        source,
-        protobufBytesDecoder,
-        getFlattenSpec()
-    );
+    return new ProtobufReader(inputRowSchema, source, protobufBytesDecoder, 
getFlattenSpec());
   }
 
   @Override
@@ -88,5 +82,4 @@ public class ProtobufInputFormat extends NestedInputFormat
   {
     return Objects.hash(protobufBytesDecoder, getFlattenSpec());
   }
-
 }
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
new file mode 100644
index 0000000000..c8e3ebeba7
--- /dev/null
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.protobuf;
+
+import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Basically a plain java object {@link FlattenerJsonProvider}, but it lives 
here for now...
+ */
+public class ProtobufJsonProvider extends FlattenerJsonProvider
+{
+  @Override
+  public boolean isArray(final Object o)
+  {
+    return o instanceof List;
+  }
+
+  @Override
+  public boolean isMap(final Object o)
+  {
+    return o == null || o instanceof Map;
+  }
+
+  @Override
+  public int length(final Object o)
+  {
+    if (o instanceof List) {
+      return ((List<?>) o).size();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public Collection<String> getPropertyKeys(final Object o)
+  {
+    if (o == null) {
+      return Collections.emptySet();
+    } else if (o instanceof Map) {
+      return ((Map<?, ?>) 
o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
+    } else {
+      throw new UnsupportedOperationException(o.getClass().getName());
+    }
+  }
+
+  @Override
+  public Object getMapValue(final Object o, final String s)
+  {
+    if (o == null) {
+      return null;
+    } else if (o instanceof Map) {
+      return ((Map<?, ?>) o).get(s);
+    }
+    throw new UnsupportedOperationException(o.getClass().getName());
+  }
+}
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index d512d108c3..2dc6aa1f8c 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -19,13 +19,10 @@
 
 package org.apache.druid.data.input.protobuf;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Iterators;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.util.JsonFormat;
+import com.google.protobuf.Message;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputRow;
@@ -34,12 +31,10 @@ import 
org.apache.druid.data.input.IntermediateRowParsingReader;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.utils.CollectionUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -49,11 +44,9 @@ import java.util.Map;
 
 public class ProtobufReader extends 
IntermediateRowParsingReader<DynamicMessage>
 {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private final InputRowSchema inputRowSchema;
   private final InputEntity source;
-  private final JSONPathSpec flattenSpec;
-  private final ObjectFlattener<JsonNode> recordFlattener;
+  private final ObjectFlattener<Map<String, Object>> recordFlattener;
   private final ProtobufBytesDecoder protobufBytesDecoder;
 
   ProtobufReader(
@@ -63,17 +56,10 @@ public class ProtobufReader extends 
IntermediateRowParsingReader<DynamicMessage>
       JSONPathSpec flattenSpec
   )
   {
-    if (flattenSpec == null) {
-      this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema);
-      this.recordFlattener = null;
-    } else {
-      this.inputRowSchema = inputRowSchema;
-      this.recordFlattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(true));
-    }
-
+    this.inputRowSchema = inputRowSchema;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new 
ProtobufFlattenerMaker());
     this.source = source;
     this.protobufBytesDecoder = protobufBytesDecoder;
-    this.flattenSpec = flattenSpec;
   }
 
   @Override
@@ -91,33 +77,27 @@ public class ProtobufReader extends 
IntermediateRowParsingReader<DynamicMessage>
   }
 
   @Override
-  protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) 
throws ParseException, JsonProcessingException
+  protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) 
throws ParseException
   {
-    Map<String, Object> record;
-
-    if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) {
-      try {
-        record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> 
k.getJsonName());
-      }
-      catch (Exception ex) {
-        throw new ParseException(null, ex, "Protobuf message could not be 
parsed");
-      }
-    } else {
-      try {
-        String json = JsonFormat.printer().print(intermediateRow);
-        record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json, 
JsonNode.class));
-      }
-      catch (InvalidProtocolBufferException e) {
-        throw new ParseException(null, e, "Protobuf message could not be 
parsed");
-      }
-    }
-
+    final Map<String, Object> record;
+    final Map<String, Object> plainJava = convertMessage(intermediateRow);
+    record = recordFlattener.flatten(plainJava);
     return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, 
record));
   }
 
   @Override
-  protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow) 
throws JsonProcessingException, InvalidProtocolBufferException
+  protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow)
   {
-    return Collections.singletonList(new 
ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), 
Map.class));
+    return Collections.singletonList(convertMessage(intermediateRow));
+  }
+
+  private static Map<String, Object> convertMessage(Message msg)
+  {
+    try {
+      return ProtobufConverter.convertMessage(msg);
+    }
+    catch (InvalidProtocolBufferException e) {
+      throw new ParseException(null, e, "Protobuf message could not be 
parsed");
+    }
   }
 }
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 219fb23aac..1d0cfe9166 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -22,8 +22,12 @@ package org.apache.druid.data.input.protobuf;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
@@ -36,6 +40,12 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.math.expr.ExpressionProcessing;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.NestedDataDimensionSchema;
+import org.apache.druid.segment.transform.ExpressionTransform;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.TransformingInputEntityReader;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 import org.junit.Assert;
@@ -46,6 +56,7 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 
 public class ProtobufInputFormatTest
 {
@@ -63,6 +74,8 @@ public class ProtobufInputFormatTest
   @Before
   public void setUp() throws Exception
   {
+    NullHandling.initializeForTests();
+    ExpressionProcessing.initializeForTests(null);
     timestampSpec = new TimestampSpec("timestamp", "iso", null);
     dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
         new StringDimensionSchema("event"),
@@ -125,7 +138,7 @@ public class ProtobufInputFormatTest
   }
 
   @Test
-  public void testParseNestedData() throws Exception
+  public void testParseFlattenData() throws Exception
   {
     //configure parser with desc file
     ProtobufInputFormat protobufInputFormat = new 
ProtobufInputFormat(flattenSpec, decoder);
@@ -136,11 +149,225 @@ public class ProtobufInputFormatTest
 
     final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
 
-    InputRow row = protobufInputFormat.createReader(new 
InputRowSchema(timestampSpec, dimensionsSpec, null), entity, 
null).read().next();
+    InputRow row = protobufInputFormat.createReader(
+        new InputRowSchema(timestampSpec, dimensionsSpec, null),
+        entity,
+        null
+    ).read().next();
+
+    Assert.assertEquals(
+        ImmutableList.builder()
+                     .add("event")
+                     .add("id")
+                     .add("someOtherId")
+                     .add("isValid")
+                     .build(),
+        row.getDimensions()
+    );
 
     ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
   }
 
+  @Test
+  public void testParseFlattenDataJq() throws Exception
+  {
+    //configure parser with desc file
+    ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+        new JSONPathSpec(
+            true,
+            Lists.newArrayList(
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", 
"eventType"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar", 
".foo.bar"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "bar0", 
".bar[0].bar")
+            )
+        ),
+        decoder
+    );
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputRow row = protobufInputFormat.createReader(
+        new InputRowSchema(timestampSpec, dimensionsSpec, null),
+        entity,
+        null
+    ).read().next();
+
+    Assert.assertEquals(
+        ImmutableList.builder()
+                     .add("event")
+                     .add("id")
+                     .add("someOtherId")
+                     .add("isValid")
+                     .build(),
+        row.getDimensions()
+    );
+
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+  }
+
+  @Test
+  public void testParseFlattenDataDiscover() throws Exception
+  {
+    //configure parser with desc file
+    ProtobufInputFormat protobufInputFormat = new 
ProtobufInputFormat(flattenSpec, decoder);
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputRow row = protobufInputFormat.createReader(
+        new InputRowSchema(timestampSpec, new 
DimensionsSpec(Collections.emptyList()), null),
+        entity,
+        null
+    ).read().next();
+
+    Assert.assertEquals(
+        ImmutableList.builder()
+                     .add("eventType")
+                     .add("foobar")
+                     .add("bar0")
+                     .add("someOtherId")
+                     .add("someIntColumn")
+                     .add("isValid")
+                     .add("description")
+                     .add("someLongColumn")
+                     .add("someFloatColumn")
+                     .add("id")
+                     .add("timestamp")
+                     .build(),
+        row.getDimensions()
+    );
+
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+  }
+
+  @Test
+  public void testParseNestedData() throws Exception
+  {
+    ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+        JSONPathSpec.DEFAULT,
+        decoder
+    );
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputEntityReader reader = protobufInputFormat.createReader(
+        new InputRowSchema(
+            timestampSpec,
+            new DimensionsSpec(
+                Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid"),
+                    new StringDimensionSchema("eventType"),
+                    new NestedDataDimensionSchema("foo"),
+                    new NestedDataDimensionSchema("bar")
+                )
+            ),
+            null
+        ),
+        entity,
+        null
+    );
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        Lists.newArrayList(
+            new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", 
TestExprMacroTable.INSTANCE),
+            new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+    TransformingInputEntityReader transformingReader = new 
TransformingInputEntityReader(
+        reader,
+        transformSpec.toTransformer()
+    );
+
+
+    InputRow row = transformingReader.read().next();
+
+    Assert.assertEquals(
+        ImmutableList.builder()
+                     .add("event")
+                     .add("id")
+                     .add("someOtherId")
+                     .add("isValid")
+                     .add("eventType")
+                     .add("foo")
+                     .add("bar")
+                     .build(),
+        row.getDimensions()
+    );
+
+    Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo"));
+    Assert.assertEquals(
+        ImmutableList.of(ImmutableMap.of("bar", "bar0"), 
ImmutableMap.of("bar", "bar1")),
+        row.getRaw("bar")
+    );
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+  }
+
+  @Test
+  public void testParseNestedDataTransformsOnly() throws Exception
+  {
+    ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+        JSONPathSpec.DEFAULT,
+        decoder
+    );
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputEntityReader reader = protobufInputFormat.createReader(
+        new InputRowSchema(
+            timestampSpec,
+            new DimensionsSpec(
+                Lists.newArrayList(
+                    new StringDimensionSchema("event"),
+                    new StringDimensionSchema("id"),
+                    new StringDimensionSchema("someOtherId"),
+                    new StringDimensionSchema("isValid"),
+                    new StringDimensionSchema("eventType")
+                )
+            ),
+            null
+        ),
+        entity,
+        null
+    );
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        Lists.newArrayList(
+            new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", 
TestExprMacroTable.INSTANCE),
+            new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+    TransformingInputEntityReader transformingReader = new 
TransformingInputEntityReader(
+        reader,
+        transformSpec.toTransformer()
+    );
+
+
+    InputRow row = transformingReader.read().next();
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+  }
+
   @Test
   public void testParseFlatData() throws Exception
   {
@@ -153,7 +380,11 @@ public class ProtobufInputFormatTest
 
     final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
 
-    InputRow row = protobufInputFormat.createReader(new 
InputRowSchema(timestampSpec, dimensionsSpec, null), entity, 
null).read().next();
+    InputRow row = protobufInputFormat.createReader(
+        new InputRowSchema(timestampSpec, dimensionsSpec, null),
+        entity,
+        null
+    ).read().next();
 
     ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
   }
@@ -170,7 +401,11 @@ public class ProtobufInputFormatTest
 
     final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
 
-    InputRow row = protobufInputFormat.createReader(new 
InputRowSchema(timestampSpec, dimensionsSpec, null), entity, 
null).read().next();
+    InputRow row = protobufInputFormat.createReader(
+        new InputRowSchema(timestampSpec, dimensionsSpec, null),
+        entity,
+        null
+    ).read().next();
 
     ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
   }
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
index eba0c8cb95..24807e6424 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
@@ -28,21 +28,15 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.apache.druid.java.util.common.parsers.ParseException;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
 
 public class ProtobufReaderTest
 {
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   private InputRowSchema inputRowSchema;
   private InputRowSchema inputRowSchemaWithComplexTimestamp;
   private JSONPathSpec flattenSpec;
@@ -129,8 +123,6 @@ public class ProtobufReaderTest
   @Test
   public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec() 
throws Exception
   {
-    expectedException.expect(ParseException.class);
-    expectedException.expectMessage("is unparseable!");
     ProtobufReader reader = new ProtobufReader(
         inputRowSchemaWithComplexTimestamp,
         null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to