abhishekagarwal87 commented on code in PR #12753:
URL: https://github.com/apache/druid/pull/12753#discussion_r916101924


##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");

Review Comment:
   Please add the actual type of field in the error message. 



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");
+            theMap.put(field.asString(), value.value());
+          }
+
+          return ExprEval.ofComplex(TYPE, theMap);
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new StructExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new StructExpr(args);
+    }
+  }
+
+  public static class JsonObjectExprMacro extends StructExprMacro
+  {
+    public static final String NAME = "json_object";
+    
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+  }
+
+  public static class ToJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              maybeUnwrapStructuredData(input)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ToJsonExpr(args);
+    }
+  }
+
+  public static class ToJsonStringExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json_string";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonStringExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonStringExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          try {
+            final Object unwrapped = maybeUnwrapStructuredData(input);
+            final String stringify = unwrapped == null ? null : 
JSON_MAPPER.writeValueAsString(unwrapped);
+            return ExprEval.ofType(
+                ExpressionType.STRING,
+                stringify
+            );
+          }
+          catch (JsonProcessingException e) {
+            throw new IAE(e, "Unable to stringify [%s] to JSON", 
input.value());
+          }
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonStringExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING;
+        }
+      }
+      return new ToJsonStringExpr(args);
+    }
+  }
+
+  public static class ParseJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "parse_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ParseJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ParseJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval arg = args.get(0).eval(bindings);
+          Object parsed = maybeUnwrapStructuredData(arg);
+          if (arg.type().is(ExprType.STRING) && arg.value() != null && 
maybeJson(arg.asString())) {
+            try {
+              parsed = JSON_MAPPER.readValue(arg.asString(), Object.class);
+            }
+            catch (JsonProcessingException e) {
+              throw new IAE("Bad string input [%s] to [%s]", arg.asString(), 
name());
+            }
+          }
+          return ExprEval.ofComplex(
+              TYPE,
+              parsed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ParseJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ParseJsonExpr(args);
+    }
+  }
+
+
+
+  public static class GetPathExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "get_path";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1PathPartsFromLiteral(name(), args);
+      class GetPathExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public GetPathExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new GetPathExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new GetPathExpr(args);
+    }
+  }
+
+  public static class JsonQueryExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_query";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonQueryExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonQueryExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.ofComplex(
+              TYPE,
+              NestedPathFinder.find(maybeUnwrapStructuredData(input), parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonQueryExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // call all the output JSON typed
+          return TYPE;
+        }
+      }
+      return new JsonQueryExpr(args);
+    }
+  }
+
+  public static class JsonValueExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_value";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonValueExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonValueExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonValueExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new JsonValueExpr(args);
+    }
+  }
+
+  public static class ListPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class ListPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              ImmutableList.copyOf(info.getLiteralFields())
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListPathsExpr(args);
+    }
+  }
+
+  public static class JsonPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class JsonPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          // maybe in the future ProcessResults should deal in 
PathFinder.PathPart instead of strings for fields
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          List<String> transformed = info.getLiteralFields()
+                                        .stream()
+                                        .map(p -> 
NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseJqPath(p)))
+                                        .collect(Collectors.toList());
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              transformed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new JsonPathsExpr(args);
+    }
+  }
+
+  public static class ListKeysExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_keys";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1PathPartsFromLiteral(name(), args);
+      class ListKeysExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListKeysExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              NestedPathFinder.findKeys(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListKeysExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListKeysExpr(args);
+    }
+  }
+
+  @Nullable
+  static Object maybeUnwrapStructuredData(ExprEval input)
+  {
+    if (input.value() instanceof StructuredData) {
+      StructuredData data = (StructuredData) input.value();
+      return data.getValue();
+    }
+    return input.value();
+  }
+
+  static void checkArg0NestedType(String fnName, List<Expr> args, ExprEval 
input)

Review Comment:
   nit - you could just pass args.get(0) as an argument. since the intention to 
use 0th argument is explicit in the function name. 



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
+import org.apache.druid.segment.nested.CompressedNestedDataComplexColumn;
+import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
+import org.apache.druid.segment.nested.NestedDataColumnSerializer;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
+import org.apache.druid.segment.serde.ComplexColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.IntBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class NestedDataColumnMerger implements DimensionMergerV9
+{
+  private static final Logger log = new Logger(NestedDataColumnMerger.class);
+  public static final Comparator<Pair<Integer, PeekingIterator<Long>>> 
LONG_MERGING_COMPARATOR =
+      DictionaryMergingIterator.makePeekingComparator();
+  public static final Comparator<Pair<Integer, PeekingIterator<Double>>> 
DOUBLE_MERGING_COMPARATOR =
+      DictionaryMergingIterator.makePeekingComparator();
+
+  private final String name;
+  private final Closer closer;
+
+  private NestedDataColumnSerializer serializer;
+
+  public NestedDataColumnMerger(
+      String name,
+      IndexSpec indexSpec,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ProgressIndicator progressIndicator,
+      Closer closer
+  )
+  {
+
+    this.name = name;
+    this.serializer = new NestedDataColumnSerializer(name, indexSpec, 
segmentWriteOutMedium, progressIndicator, closer);
+    this.closer = closer;
+  }
+
+  @Override
+  public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
+  {
+
+    long dimStartTime = System.currentTimeMillis();
+
+    int numMergeIndex = 0;
+    GlobalDictionarySortedCollector sortedLookup = null;
+    final Indexed[] sortedLookups = new Indexed[adapters.size()];
+    final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+    final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+
+    final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields 
= new TreeMap<>();
+
+    for (int i = 0; i < adapters.size(); i++) {
+      final IndexableAdapter adapter = adapters.get(i);
+      final GlobalDictionarySortedCollector dimValues;
+      if (adapter instanceof IncrementalIndexAdapter) {
+        dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+      } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+        dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+      } else {
+        throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());
+      }
+
+      boolean allNulls = allNull(dimValues.getSortedStrings()) &&
+                         allNull(dimValues.getSortedLongs()) &&
+                         allNull(dimValues.getSortedDoubles());
+      sortedLookup = dimValues;

Review Comment:
   shouldn't this be inside the `if` block below? 



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.MutableBitmap;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.CloseableIndexed;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
+import org.apache.druid.segment.nested.GlobalDimensionDictionary;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class NestedDataColumnIndexer implements 
DimensionIndexer<StructuredData, StructuredData, StructuredData>

Review Comment:
   can you add some javadocs here? 



##########
processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.CompressedPools;
+import org.apache.druid.segment.serde.MetaSerdeHelper;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+public class CompressedBlockSerializer implements Serializer
+{
+  private static final MetaSerdeHelper<CompressedBlockSerializer> 
META_SERDE_HELPER = MetaSerdeHelper
+      .firstWriteByte((CompressedBlockSerializer x) -> (byte) 0x01)

Review Comment:
   nit - can you directly point to the VERSION variable in 
CompressedBlockReader? 



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);

Review Comment:
   Please add the size of arguments in the exception error message.



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.NestedPathPart;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");
+            theMap.put(field.asString(), value.value());
+          }
+
+          return ExprEval.ofComplex(TYPE, theMap);
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new StructExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new StructExpr(args);
+    }
+  }
+
+  public static class JsonObjectExprMacro extends StructExprMacro
+  {
+    public static final String NAME = "json_object";
+    
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+  }
+
+  public static class ToJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              maybeUnwrapStructuredData(input)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ToJsonExpr(args);
+    }
+  }
+
+  public static class ToJsonStringExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json_string";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonStringExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonStringExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          try {
+            final Object unwrapped = maybeUnwrapStructuredData(input);
+            final String stringify = unwrapped == null ? null : 
JSON_MAPPER.writeValueAsString(unwrapped);
+            return ExprEval.ofType(
+                ExpressionType.STRING,
+                stringify
+            );
+          }
+          catch (JsonProcessingException e) {
+            throw new IAE(e, "Unable to stringify [%s] to JSON", 
input.value());
+          }
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonStringExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING;
+        }
+      }
+      return new ToJsonStringExpr(args);
+    }
+  }
+
+  public static class ParseJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "parse_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ParseJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ParseJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval arg = args.get(0).eval(bindings);
+          Object parsed = maybeUnwrapStructuredData(arg);
+          if (arg.type().is(ExprType.STRING) && arg.value() != null && 
maybeJson(arg.asString())) {

Review Comment:
   this one seems a bit odd to me. this method doesn't fail if `maybeJson` 
returns false. But this method fails, if `maybeJson` returns true and we can't 
parse the json. but its possible that it was not in fact a json object. 



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.NestedPathPart;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");
+            theMap.put(field.asString(), value.value());
+          }
+
+          return ExprEval.ofComplex(TYPE, theMap);
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new StructExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new StructExpr(args);
+    }
+  }
+
+  public static class JsonObjectExprMacro extends StructExprMacro
+  {
+    public static final String NAME = "json_object";
+    
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+  }
+
+  public static class ToJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              maybeUnwrapStructuredData(input)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ToJsonExpr(args);
+    }
+  }
+
+  public static class ToJsonStringExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json_string";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonStringExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonStringExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          try {
+            final Object unwrapped = maybeUnwrapStructuredData(input);
+            final String stringify = unwrapped == null ? null : 
JSON_MAPPER.writeValueAsString(unwrapped);
+            return ExprEval.ofType(
+                ExpressionType.STRING,
+                stringify
+            );
+          }
+          catch (JsonProcessingException e) {
+            throw new IAE(e, "Unable to stringify [%s] to JSON", 
input.value());
+          }
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonStringExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING;
+        }
+      }
+      return new ToJsonStringExpr(args);
+    }
+  }
+
+  public static class ParseJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "parse_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ParseJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ParseJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval arg = args.get(0).eval(bindings);
+          Object parsed = maybeUnwrapStructuredData(arg);
+          if (arg.type().is(ExprType.STRING) && arg.value() != null && 
maybeJson(arg.asString())) {
+            try {
+              parsed = JSON_MAPPER.readValue(arg.asString(), Object.class);
+            }
+            catch (JsonProcessingException e) {
+              throw new IAE("Bad string input [%s] to [%s]", arg.asString(), 
name());
+            }
+          }
+          return ExprEval.ofComplex(
+              TYPE,
+              parsed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ParseJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ParseJsonExpr(args);
+    }
+  }
+
+
+
+  public static class GetPathExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "get_path";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = getArg1PathPartsFromLiteral(name(), 
args);
+      class GetPathExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public GetPathExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new GetPathExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new GetPathExpr(args);
+    }
+  }
+
+  public static class JsonQueryExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_query";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonQueryExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonQueryExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              NestedPathFinder.find(maybeUnwrapStructuredData(input), parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonQueryExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // call all the output JSON typed
+          return TYPE;
+        }
+      }
+      return new JsonQueryExpr(args);
+    }
+  }
+
+  public static class JsonValueExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_value";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonValueExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonValueExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonValueExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new JsonValueExpr(args);
+    }
+  }
+
+  public static class ListPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class ListPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              ImmutableList.copyOf(info.getLiteralFields())
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListPathsExpr(args);
+    }
+  }
+
+  public static class JsonPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class JsonPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          // maybe in the future ProcessResults should deal in 
PathFinder.PathPart instead of strings for fields
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          List<String> transformed = info.getLiteralFields()
+                                        .stream()
+                                        .map(p -> 
NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseJqPath(p)))
+                                        .collect(Collectors.toList());
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              transformed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new JsonPathsExpr(args);
+    }
+  }
+
+  public static class ListKeysExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_keys";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = getArg1PathPartsFromLiteral(name(), 
args);
+      class ListKeysExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListKeysExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              NestedPathFinder.findKeys(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListKeysExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListKeysExpr(args);
+    }
+  }
+
+  @Nullable
+  static Object maybeUnwrapStructuredData(ExprEval input)
+  {
+    if (input.value() instanceof StructuredData) {
+      StructuredData data = (StructuredData) input.value();
+      return data.getValue();
+    }
+    return input.value();
+  }
+
+  static List<NestedPathPart> getArg1PathPartsFromLiteral(String fnName, 
List<Expr> args)
+  {
+    if (!(args.get(1).isLiteral() && args.get(1).getLiteralValue() instanceof 
String)) {
+      throw new IAE(
+          "Function[%s] second argument [%s] must be a literal [%s] value",
+          fnName,
+          args.get(1).stringify(),
+          ExpressionType.STRING
+      );
+    }
+    final String path = (String) args.get(1).getLiteralValue();
+    List<NestedPathPart> parts;
+    try {
+      parts = NestedPathFinder.parseJsonPath(path);
+    }
+    catch (IllegalArgumentException iae) {
+      parts = NestedPathFinder.parseJqPath(path);
+    }
+    return parts;
+  }
+
+  static List<NestedPathPart> getArg1JsonPathPartsFromLiteral(String fnName, 
List<Expr> args)
+  {
+    if (!(args.get(1).isLiteral() && args.get(1).getLiteralValue() instanceof 
String)) {
+      throw new IAE(
+          "Function[%s] second argument [%s] must be a literal [%s] value",
+          fnName,
+          args.get(1).stringify(),
+          ExpressionType.STRING
+      );
+    }
+    final List<NestedPathPart> parts = NestedPathFinder.parseJsonPath(
+        (String) args.get(1).getLiteralValue()
+    );
+    return parts;
+  }
+
+  static boolean maybeJson(@Nullable String val)

Review Comment:
   nit: can be renamed to `maybeIsJson` 



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.NestedPathPart;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");
+            theMap.put(field.asString(), value.value());
+          }
+
+          return ExprEval.ofComplex(TYPE, theMap);
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new StructExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new StructExpr(args);
+    }
+  }
+
+  public static class JsonObjectExprMacro extends StructExprMacro
+  {
+    public static final String NAME = "json_object";
+    
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+  }
+
+  public static class ToJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              maybeUnwrapStructuredData(input)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ToJsonExpr(args);
+    }
+  }
+
+  public static class ToJsonStringExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json_string";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonStringExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonStringExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          try {
+            final Object unwrapped = maybeUnwrapStructuredData(input);
+            final String stringify = unwrapped == null ? null : 
JSON_MAPPER.writeValueAsString(unwrapped);
+            return ExprEval.ofType(
+                ExpressionType.STRING,
+                stringify
+            );
+          }
+          catch (JsonProcessingException e) {
+            throw new IAE(e, "Unable to stringify [%s] to JSON", 
input.value());
+          }
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonStringExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING;
+        }
+      }
+      return new ToJsonStringExpr(args);
+    }
+  }
+
+  public static class ParseJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "parse_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ParseJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ParseJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval arg = args.get(0).eval(bindings);
+          Object parsed = maybeUnwrapStructuredData(arg);
+          if (arg.type().is(ExprType.STRING) && arg.value() != null && 
maybeJson(arg.asString())) {
+            try {
+              parsed = JSON_MAPPER.readValue(arg.asString(), Object.class);
+            }
+            catch (JsonProcessingException e) {
+              throw new IAE("Bad string input [%s] to [%s]", arg.asString(), 
name());
+            }
+          }
+          return ExprEval.ofComplex(
+              TYPE,
+              parsed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ParseJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ParseJsonExpr(args);
+    }
+  }
+
+
+
+  public static class GetPathExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "get_path";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = getArg1PathPartsFromLiteral(name(), 
args);
+      class GetPathExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public GetPathExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new GetPathExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new GetPathExpr(args);
+    }
+  }
+
+  public static class JsonQueryExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_query";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonQueryExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonQueryExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              NestedPathFinder.find(maybeUnwrapStructuredData(input), parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonQueryExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // call all the output JSON typed
+          return TYPE;
+        }
+      }
+      return new JsonQueryExpr(args);
+    }
+  }
+
+  public static class JsonValueExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_value";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonValueExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonValueExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonValueExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new JsonValueExpr(args);
+    }
+  }
+
+  public static class ListPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class ListPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              ImmutableList.copyOf(info.getLiteralFields())
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListPathsExpr(args);
+    }
+  }
+
+  public static class JsonPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class JsonPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          // maybe in the future ProcessResults should deal in 
PathFinder.PathPart instead of strings for fields
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          List<String> transformed = info.getLiteralFields()
+                                        .stream()
+                                        .map(p -> 
NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseJqPath(p)))
+                                        .collect(Collectors.toList());
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              transformed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new JsonPathsExpr(args);
+    }
+  }
+
+  public static class ListKeysExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_keys";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathPart> parts = getArg1PathPartsFromLiteral(name(), 
args);
+      class ListKeysExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListKeysExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              NestedPathFinder.findKeys(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListKeysExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListKeysExpr(args);
+    }
+  }
+
+  @Nullable
+  static Object maybeUnwrapStructuredData(ExprEval input)
+  {
+    if (input.value() instanceof StructuredData) {
+      StructuredData data = (StructuredData) input.value();
+      return data.getValue();
+    }
+    return input.value();
+  }
+
+  static List<NestedPathPart> getArg1PathPartsFromLiteral(String fnName, 
List<Expr> args)

Review Comment:
   why not just take args.get(1) as the argument? 



##########
processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java:
##########
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.query.expression;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedPathFinder;
+import org.apache.druid.segment.nested.StructuredData;
+import org.apache.druid.segment.nested.StructuredDataProcessor;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class NestedDataExpressions
+{
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  public static final ExpressionType TYPE = Preconditions.checkNotNull(
+      ExpressionType.fromColumnType(NestedDataComplexTypeSerde.TYPE)
+  );
+
+  public static class StructExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "struct";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      Preconditions.checkArgument(args.size() % 2 == 0);
+      class StructExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public StructExpr(List<Expr> args)
+        {
+          super(NAME, args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          HashMap<String, Object> theMap = new HashMap<>();
+          for (int i = 0; i < args.size(); i += 2) {
+            ExprEval field = args.get(i).eval(bindings);
+            ExprEval value = args.get(i + 1).eval(bindings);
+
+            Preconditions.checkArgument(field.type().is(ExprType.STRING), 
"field name must be a STRING");
+            theMap.put(field.asString(), value.value());
+          }
+
+          return ExprEval.ofComplex(TYPE, theMap);
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new StructExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new StructExpr(args);
+    }
+  }
+
+  public static class JsonObjectExprMacro extends StructExprMacro
+  {
+    public static final String NAME = "json_object";
+    
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+  }
+
+  public static class ToJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          return ExprEval.ofComplex(
+              TYPE,
+              maybeUnwrapStructuredData(input)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ToJsonExpr(args);
+    }
+  }
+
+  public static class ToJsonStringExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "to_json_string";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ToJsonStringExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ToJsonStringExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          try {
+            final Object unwrapped = maybeUnwrapStructuredData(input);
+            final String stringify = unwrapped == null ? null : 
JSON_MAPPER.writeValueAsString(unwrapped);
+            return ExprEval.ofType(
+                ExpressionType.STRING,
+                stringify
+            );
+          }
+          catch (JsonProcessingException e) {
+            throw new IAE(e, "Unable to stringify [%s] to JSON", 
input.value());
+          }
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ToJsonStringExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING;
+        }
+      }
+      return new ToJsonStringExpr(args);
+    }
+  }
+
+  public static class ParseJsonExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "parse_json";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      class ParseJsonExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ParseJsonExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval arg = args.get(0).eval(bindings);
+          Object parsed = maybeUnwrapStructuredData(arg);
+          if (arg.type().is(ExprType.STRING) && arg.value() != null && 
maybeJson(arg.asString())) {
+            try {
+              parsed = JSON_MAPPER.readValue(arg.asString(), Object.class);
+            }
+            catch (JsonProcessingException e) {
+              throw new IAE("Bad string input [%s] to [%s]", arg.asString(), 
name());
+            }
+          }
+          return ExprEval.ofComplex(
+              TYPE,
+              parsed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ParseJsonExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return TYPE;
+        }
+      }
+      return new ParseJsonExpr(args);
+    }
+  }
+
+
+
+  public static class GetPathExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "get_path";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1PathPartsFromLiteral(name(), args);
+      class GetPathExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public GetPathExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new GetPathExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new GetPathExpr(args);
+    }
+  }
+
+  public static class JsonQueryExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_query";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonQueryExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonQueryExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.ofComplex(
+              TYPE,
+              NestedPathFinder.find(maybeUnwrapStructuredData(input), parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonQueryExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // call all the output JSON typed
+          return TYPE;
+        }
+      }
+      return new JsonQueryExpr(args);
+    }
+  }
+
+  public static class JsonValueExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_value";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1JsonPathPartsFromLiteral(name(), args);
+      class JsonValueExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonValueExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.bestEffortOf(
+              NestedPathFinder.findLiteral(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonValueExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          // we cannot infer the output type (well we could say it is 'STRING' 
right now because is all we support...
+          return null;
+        }
+      }
+      return new JsonValueExpr(args);
+    }
+  }
+
+  public static class ListPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class ListPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              ImmutableList.copyOf(info.getLiteralFields())
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListPathsExpr(args);
+    }
+  }
+
+  public static class JsonPathsExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "json_paths";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final StructuredDataProcessor processor = new StructuredDataProcessor()
+      {
+        @Override
+        public int processLiteralField(String fieldName, Object fieldValue)
+        {
+          // do nothing, we only want the list of fields returned by this 
processor
+          return 0;
+        }
+      };
+
+      class JsonPathsExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public JsonPathsExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          // maybe in the future ProcessResults should deal in 
PathFinder.PathPart instead of strings for fields
+          StructuredDataProcessor.ProcessResults info = 
processor.processFields(maybeUnwrapStructuredData(input));
+          List<String> transformed = info.getLiteralFields()
+                                        .stream()
+                                        .map(p -> 
NestedPathFinder.toNormalizedJsonPath(NestedPathFinder.parseJqPath(p)))
+                                        .collect(Collectors.toList());
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              transformed
+          );
+        }
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new JsonPathsExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new JsonPathsExpr(args);
+    }
+  }
+
+  public static class ListKeysExprMacro implements ExprMacroTable.ExprMacro
+  {
+    public static final String NAME = "list_keys";
+
+    @Override
+    public String name()
+    {
+      return NAME;
+    }
+
+    @Override
+    public Expr apply(List<Expr> args)
+    {
+      final List<NestedPathFinder.NestedPathPart> parts = 
getArg1PathPartsFromLiteral(name(), args);
+      class ListKeysExpr extends ExprMacroTable.BaseScalarMacroFunctionExpr
+      {
+        public ListKeysExpr(List<Expr> args)
+        {
+          super(name(), args);
+        }
+
+        @Override
+        public ExprEval eval(ObjectBinding bindings)
+        {
+          ExprEval input = args.get(0).eval(bindings);
+          checkArg0NestedType(name, args, input);
+          return ExprEval.ofType(
+              ExpressionType.STRING_ARRAY,
+              NestedPathFinder.findKeys(maybeUnwrapStructuredData(input), 
parts)
+          );
+        }
+
+
+        @Override
+        public Expr visit(Shuttle shuttle)
+        {
+          List<Expr> newArgs = args.stream().map(x -> 
x.visit(shuttle)).collect(Collectors.toList());
+          return shuttle.visit(new ListKeysExpr(newArgs));
+        }
+
+        @Nullable
+        @Override
+        public ExpressionType getOutputType(InputBindingInspector inspector)
+        {
+          return ExpressionType.STRING_ARRAY;
+        }
+      }
+      return new ListKeysExpr(args);
+    }
+  }
+
+  @Nullable
+  static Object maybeUnwrapStructuredData(ExprEval input)
+  {
+    if (input.value() instanceof StructuredData) {
+      StructuredData data = (StructuredData) input.value();
+      return data.getValue();
+    }
+    return input.value();
+  }
+
+  static void checkArg0NestedType(String fnName, List<Expr> args, ExprEval 
input)
+  {
+    if (!TYPE.equals(input.type()) && input.value() != null) {
+      // give unknown complex a try, sometimes ingestion needs a little help 
because input format isn't mapped to a
+      // druid schema
+      if (ExpressionType.UNKNOWN_COMPLEX.equals(input.type())) {
+        return;
+      }
+      throw new IAE(
+          "Function[%s] first argument [%s] must be type [%s] as input, got 
[%s]",
+          fnName,
+          args.get(0).stringify(),
+          TYPE.asTypeString(),
+          input.type().asTypeString()
+      );
+    }
+  }
+
+  static List<NestedPathFinder.NestedPathPart> 
getArg1PathPartsFromLiteral(String fnName, List<Expr> args)

Review Comment:
   nit. you could just pass the args.get(1) itself as an argument. 



##########
processing/src/main/java/org/apache/druid/segment/ComparatorDimensionDictionary.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.segment;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link Comparator} based {@link DimensionDictionary}
+ *
+ * there are a lot of unused methods in here for now since the only thing this 
is used for is to build up
+ * the unsorted dictionary and then it is converted to a {@link 
ComparatorSortedDimensionDictionary}, but
+ * leaving the unused methods in place for now to be basically compatible with 
the other implementation.
+ *
+ * This version is not thread-safe since the only current user doesn't use the 
dictionary for realtime queries, if this
+ * changes we need to add a 'ConcurrentComparatorDimensionDictionary'.
+ */
+public class ComparatorDimensionDictionary<T>
+{
+  public static final int ABSENT_VALUE_ID = -1;
+
+  @Nullable
+  private T minValue = null;
+  @Nullable
+  private T maxValue = null;
+  private volatile int idForNull = ABSENT_VALUE_ID;
+
+  private final AtomicLong sizeInBytes = new AtomicLong(0L);
+  private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
+
+  private final List<T> idToValue = new ArrayList<>();
+  private final Comparator<T> comparator;
+
+  public ComparatorDimensionDictionary(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+    valueToId.defaultReturnValue(ABSENT_VALUE_ID);
+  }
+
+  public int getId(@Nullable T value)
+  {
+    if (value == null) {
+      return idForNull;
+    }
+    return valueToId.getInt(value);
+  }
+
+  @SuppressWarnings("unused")
+  @Nullable
+  public T getValue(int id)
+  {
+    if (id == idForNull) {
+      return null;
+    }
+    return idToValue.get(id);
+  }
+
+  public int size()
+  {
+    // using idToValue rather than valueToId because the valueToId doesn't 
account null value, if it is present.
+    return idToValue.size();
+  }
+
+  /**
+   * Gets the current size of this dictionary in bytes.
+   *
+   * @throws IllegalStateException if size computation is disabled.
+   */
+  public long sizeInBytes()
+  {
+    return sizeInBytes.get();
+  }
+
+  public int add(@Nullable T originalValue)
+  {
+    if (originalValue == null) {
+      if (idForNull == ABSENT_VALUE_ID) {
+        idForNull = idToValue.size();
+        idToValue.add(null);
+      }
+      return idForNull;
+    }
+    int prev = valueToId.getInt(originalValue);
+    if (prev >= 0) {
+      return prev;
+    }
+    final int index = idToValue.size();
+    valueToId.put(originalValue, index);
+    idToValue.add(originalValue);
+
+    // Add size of new dim value and 2 references (valueToId and idToValue)
+    sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + (2L * 
Long.BYTES));
+
+    minValue = minValue == null || comparator.compare(minValue, originalValue) 
> 0 ? originalValue : minValue;
+    maxValue = maxValue == null || comparator.compare(maxValue, originalValue) 
< 0 ? originalValue : maxValue;
+    return index;
+  }
+
+  @SuppressWarnings("unused")
+  public T getMinValue()
+  {
+    return minValue;
+  }
+
+  @SuppressWarnings("unused")
+  public T getMaxValue()
+  {
+    return maxValue;
+  }
+
+  @SuppressWarnings("unused")
+  public int getIdForNull()
+  {
+    return idForNull;
+  }
+
+  public ComparatorSortedDimensionDictionary<T> sort()
+  {
+    return new ComparatorSortedDimensionDictionary<T>(idToValue, comparator, 
idToValue.size());
+  }
+
+  /**
+   * Estimates the size of the dimension value in bytes. This method is called
+   * only when a new dimension value is being added to the lookup.
+   *
+   * @throws UnsupportedOperationException Implementations that want to 
estimate

Review Comment:
   what is the fallback if they don't want to estimate memory? 



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
+import org.apache.druid.segment.nested.CompressedNestedDataComplexColumn;
+import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
+import org.apache.druid.segment.nested.NestedDataColumnSerializer;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
+import org.apache.druid.segment.serde.ComplexColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.IntBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class NestedDataColumnMerger implements DimensionMergerV9
+{
+  private static final Logger log = new Logger(NestedDataColumnMerger.class);
+  public static final Comparator<Pair<Integer, PeekingIterator<Long>>> 
LONG_MERGING_COMPARATOR =
+      DictionaryMergingIterator.makePeekingComparator();
+  public static final Comparator<Pair<Integer, PeekingIterator<Double>>> 
DOUBLE_MERGING_COMPARATOR =
+      DictionaryMergingIterator.makePeekingComparator();
+
+  private final String name;
+  private final Closer closer;
+
+  private NestedDataColumnSerializer serializer;
+
+  public NestedDataColumnMerger(
+      String name,
+      IndexSpec indexSpec,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ProgressIndicator progressIndicator,
+      Closer closer
+  )
+  {
+
+    this.name = name;
+    this.serializer = new NestedDataColumnSerializer(name, indexSpec, 
segmentWriteOutMedium, progressIndicator, closer);
+    this.closer = closer;
+  }
+
+  @Override
+  public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
+  {
+
+    long dimStartTime = System.currentTimeMillis();
+
+    int numMergeIndex = 0;
+    GlobalDictionarySortedCollector sortedLookup = null;
+    final Indexed[] sortedLookups = new Indexed[adapters.size()];
+    final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+    final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+
+    final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields 
= new TreeMap<>();
+
+    for (int i = 0; i < adapters.size(); i++) {
+      final IndexableAdapter adapter = adapters.get(i);
+      final GlobalDictionarySortedCollector dimValues;
+      if (adapter instanceof IncrementalIndexAdapter) {
+        dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+      } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+        dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+      } else {
+        throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());
+      }
+
+      boolean allNulls = allNull(dimValues.getSortedStrings()) &&
+                         allNull(dimValues.getSortedLongs()) &&
+                         allNull(dimValues.getSortedDoubles());
+      sortedLookup = dimValues;
+      if (!allNulls) {
+        sortedLookups[i] = dimValues.getSortedStrings();
+        sortedLongLookups[i] = dimValues.getSortedLongs();
+        sortedDoubleLookups[i] = dimValues.getSortedDoubles();
+        numMergeIndex++;
+      }
+    }
+
+    serializer.open();
+    serializer.serializeFields(mergedFields);
+
+    int cardinality = 0;
+    if (numMergeIndex > 1) {
+      DictionaryMergingIterator<String> dictionaryMergeIterator = new 
DictionaryMergingIterator<>(
+          sortedLookups,
+          StringDimensionMergerV9.DICTIONARY_MERGING_COMPARATOR,
+          true
+      );
+      DictionaryMergingIterator<Long> longDictionaryMergeIterator = new 
DictionaryMergingIterator<>(
+          sortedLongLookups,
+          LONG_MERGING_COMPARATOR,
+          true
+      );
+      DictionaryMergingIterator<Double> doubleDictionaryMergeIterator = new 
DictionaryMergingIterator<>(
+          sortedDoubleLookups,
+          DOUBLE_MERGING_COMPARATOR,
+          true
+      );
+      serializer.serializeStringDictionary(() -> dictionaryMergeIterator);
+      serializer.serializeLongDictionary(() -> longDictionaryMergeIterator);
+      serializer.serializeDoubleDictionary(() -> 
doubleDictionaryMergeIterator);
+      cardinality = dictionaryMergeIterator.getCardinality();
+    } else if (numMergeIndex == 1) {
+      serializer.serializeStringDictionary(sortedLookup.getSortedStrings());
+      serializer.serializeLongDictionary(sortedLookup.getSortedLongs());
+      serializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
+      cardinality = sortedLookup.size();
+    }
+
+    log.debug(
+        "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
+        name,
+        cardinality,
+        System.currentTimeMillis() - dimStartTime
+    );
+  }
+
+  @Nullable
+  private GlobalDictionarySortedCollector getSortedIndexFromIncrementalAdapter(
+      IncrementalIndexAdapter adapter,
+      SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
+  )
+  {
+    final IncrementalIndex index = adapter.getIncrementalIndex();
+    final IncrementalIndex.DimensionDesc dim = index.getDimension(name);
+    if (dim == null || !(dim.getIndexer() instanceof NestedDataColumnIndexer)) 
{
+      return null;
+    }
+    final NestedDataColumnIndexer indexer = (NestedDataColumnIndexer) 
dim.getIndexer();
+    for (Map.Entry<String, NestedDataColumnIndexer.LiteralFieldIndexer> entry 
: indexer.fieldIndexers.entrySet()) {
+      // skip adding the field if no types are in the set, meaning only null 
values have been processed
+      if (!entry.getValue().getTypes().isEmpty()) {
+        mergedFields.put(entry.getKey(), entry.getValue().getTypes());
+      }
+    }
+    return indexer.globalDictionary.getSortedCollector();
+  }
+
+  @Nullable
+  private GlobalDictionarySortedCollector getSortedIndexesFromQueryableAdapter(
+      QueryableIndexIndexableAdapter adapter,
+      SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
+  )
+  {
+    final ColumnHolder columnHolder = 
adapter.getQueryableIndex().getColumnHolder(name);
+
+    if (columnHolder == null) {
+      return null;
+    }
+
+    final BaseColumn col = columnHolder.getColumn();
+
+    closer.register(col);

Review Comment:
   we are doing this two times. another occurrence is inside the function. 



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
+import org.apache.druid.segment.nested.CompressedNestedDataComplexColumn;
+import org.apache.druid.segment.nested.GlobalDictionarySortedCollector;
+import org.apache.druid.segment.nested.NestedDataColumnSerializer;
+import org.apache.druid.segment.nested.NestedDataComplexTypeSerde;
+import org.apache.druid.segment.nested.NestedLiteralTypeInfo;
+import org.apache.druid.segment.serde.ComplexColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.IntBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class NestedDataColumnMerger implements DimensionMergerV9

Review Comment:
   some high level javadocs would have been nice. 



##########
processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.CompressedPools;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.function.Supplier;
+
+/**
+ * Reader for a virtual contiguous address range backed by compressed blocks 
of data.
+ *
+ * Format:
+ * | version (byte) | compression (byte) | num blocks (int) | block size (int) 
| end offsets | compressed data |
+ *
+ * This mechanism supports two modes of use, the first where callers may ask 
for a range of data from the underlying
+ * blocks, provided by {@link #getRange(long, int)}. The {@link ByteBuffer} 
provided by this method may or may not
+ * be valid after additional calls to {@link #getRange(long, int)} or calls to 
{@link #seekBlock(int)}.
+ *
+ * For fixed width values which are aligned with the block size, callers may 
also use the method
+ * {@link #getDecompressedDataBuffer()} to have direct access to the current 
uncompressed block, and use the methods
+ * {@link #loadBlock(long)} to load the correct block and translate a virtual 
offset into the relative offset, or
+ * {@link #seekBlock(int)} to change which block is currently loaded.
+ *
+ * {@link #getRange(long, int)} uses these same mechanisms internally to 
supply data.
+ */
+public final class CompressedBlockReader implements Closeable
+{
+  private static final ByteBuffer NULL_VALUE = ByteBuffer.wrap(new byte[0]);
+  public static final byte VERSION = 0x01;
+
+  public static Supplier<CompressedBlockReader> fromByteBuffer(ByteBuffer 
buffer, ByteOrder byteOrder)
+  {
+    byte versionFromBuffer = buffer.get();
+
+    if (versionFromBuffer == VERSION) {
+      final CompressionStrategy compression = 
CompressionStrategy.forId(buffer.get());
+      final int blockSize = buffer.getInt();
+      assert CompressedPools.BUFFER_SIZE == blockSize;
+      Preconditions.checkState(
+          blockSize <= CompressedPools.BUFFER_SIZE,
+          "Maximum block size must be less than " + CompressedPools.BUFFER_SIZE
+      );
+      final int numBlocks = buffer.getInt();
+      final int offsetsSize = numBlocks * Integer.BYTES;
+      // buffer is at start of ending offsets
+      final ByteBuffer offsets = buffer.asReadOnlyBuffer().order(byteOrder);
+      offsets.limit(offsets.position() + offsetsSize);
+      final IntBuffer offsetView = 
offsets.slice().order(byteOrder).asIntBuffer();
+      final int compressedSize = offsetView.get(numBlocks - 1);
+
+      // move to start of compressed data
+      buffer.position(buffer.position() + offsetsSize);
+      final ByteBuffer compressedData = 
buffer.asReadOnlyBuffer().order(byteOrder);
+      compressedData.limit(compressedData.position() + compressedSize);
+      buffer.position(buffer.position() + compressedSize);
+
+      final ByteBuffer compressedDataView = 
compressedData.slice().order(byteOrder);
+      return () -> new CompressedBlockReader(
+          compression,
+          numBlocks,
+          blockSize,
+          offsetView.asReadOnlyBuffer(),
+          compressedDataView.asReadOnlyBuffer().order(byteOrder),
+          byteOrder
+      );
+    }
+    throw new IAE("Unknown version[%s]", versionFromBuffer);
+  }
+
+  private final CompressionStrategy.Decompressor decompressor;
+
+  private final int numBlocks;
+  private final int div;
+  private final int rem;
+  private final IntBuffer endOffsetsBuffer;
+  private final ByteBuffer compressedDataBuffer;
+
+  private final ResourceHolder<ByteBuffer> decompressedDataBufferHolder;
+  private final ByteBuffer decompressedDataBuffer;
+
+  private final ByteOrder byteOrder;
+  private final Closer closer;
+  private int currentBlockNumber = -1;
+
+  public CompressedBlockReader(
+      CompressionStrategy compressionStrategy,
+      int numBlocks,
+      int blockSize,
+      IntBuffer endOffsetsBuffer,
+      ByteBuffer compressedDataBuffer,
+      ByteOrder byteOrder
+  )
+  {
+    this.decompressor = compressionStrategy.getDecompressor();
+    this.numBlocks = numBlocks;
+    this.div = Integer.numberOfTrailingZeros(blockSize);
+    this.rem = blockSize - 1;
+    this.endOffsetsBuffer = endOffsetsBuffer;
+    this.compressedDataBuffer = compressedDataBuffer;
+    this.closer = Closer.create();
+    this.decompressedDataBufferHolder = CompressedPools.getByteBuf(byteOrder);
+    closer.register(decompressedDataBufferHolder);
+    this.decompressedDataBuffer = decompressedDataBufferHolder.get();
+    this.decompressedDataBuffer.clear();
+    this.byteOrder = byteOrder;
+  }
+
+  /**
+   * Get size in bytes of virtual contiguous buffer
+   */
+  public long getSize()

Review Comment:
   this is compressedSize, correct? 



##########
processing/src/main/java/org/apache/druid/segment/data/CompressedBlockReader.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.CompressedPools;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.function.Supplier;
+
+/**
+ * Reader for a virtual contiguous address range backed by compressed blocks 
of data.
+ *
+ * Format:
+ * | version (byte) | compression (byte) | num blocks (int) | block size (int) 
| end offsets | compressed data |

Review Comment:
   block size comes before number of blocks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to