This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb26a1093d discover nested columns when using nested column indexer 
for schemaless ingestion (#13672)
fb26a1093d is described below

commit fb26a1093d11f0574dbb905ac0d39c8eb3777312
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jan 18 12:57:28 2023 -0800

    discover nested columns when using nested column indexer for schemaless 
ingestion (#13672)
    
    * discover nested columns when using nested column indexer for schemaless
    * move useNestedColumnIndexerForSchemaDiscovery from AppendableIndexSpec to 
DimensionsSpec
---
 .../apache/druid/data/input/InputRowSchema.java    |   1 +
 .../druid/data/input/impl/DimensionsSpec.java      |  61 ++++++++++--
 .../druid/data/input/impl/JsonLineReader.java      |   8 +-
 .../druid/data/input/impl/JsonNodeReader.java      |   8 +-
 .../apache/druid/data/input/impl/JsonReader.java   |   8 +-
 .../util/common/parsers/JSONFlattenerMaker.java    |  10 +-
 .../java/util/common/parsers/JSONPathParser.java   |   5 +-
 .../java/util/common/parsers/ObjectFlatteners.java |   4 +-
 .../common/parsers/JSONFlattenerMakerTest.java     |  32 ++++++-
 .../util/common/parsers/ObjectFlattenersTest.java  |   2 +-
 .../druid/data/input/avro/AvroFlattenerMaker.java  |  24 ++++-
 .../druid/data/input/avro/AvroOCFReader.java       |  10 +-
 .../apache/druid/data/input/avro/AvroParsers.java  |  13 ++-
 .../druid/data/input/avro/AvroStreamReader.java    |  10 +-
 .../data/input/avro/AvroFlattenerMakerTest.java    |  60 +++++++++++-
 .../data/input/kafkainput/KafkaInputFormat.java    |   7 +-
 .../data/input/orc/OrcHadoopInputRowParser.java    |  10 +-
 .../org/apache/druid/data/input/orc/OrcReader.java |   8 +-
 .../data/input/orc/OrcStructFlattenerMaker.java    |  10 +-
 .../apache/druid/data/input/orc/OrcReaderTest.java | 104 +++++++++++++++++++++
 .../druid/data/input/parquet/ParquetReader.java    |   8 +-
 .../avro/ParquetAvroHadoopInputRowParser.java      |  10 +-
 .../parquet/simple/ParquetGroupFlattenerMaker.java |  10 +-
 .../simple/ParquetHadoopInputRowParser.java        |  10 +-
 .../parquet/NestedColumnParquetReaderTest.java     |  73 +++++++++++++++
 .../input/protobuf/ProtobufFlattenerMaker.java     |  13 ++-
 .../input/protobuf/ProtobufInputRowSchema.java     |  13 ++-
 .../druid/data/input/protobuf/ProtobufReader.java  |   5 +-
 .../input/protobuf/ProtobufInputFormatTest.java    |  71 ++++++++++++++
 .../indexing/overlord/sampler/SamplerConfig.java   |  21 ++---
 .../task/ClientCompactionTaskQuerySerdeTest.java   |   4 +-
 .../cases/cluster/Common/dependencies.yaml         |   3 +-
 .../druid/testsEx/cluster/MetastoreClient.java     |   1 -
 .../testsEx/config/IntegrationTestingConfigEx.java |   1 -
 .../testsEx/indexer/AbstractITBatchIndexTest.java  |  41 +++++++-
 .../AbstractLocalInputSourceParallelIndexTest.java |  35 ++++++-
 ...ITLocalInputSourceAllFormatSchemalessTest.java} |  79 +++++++++++-----
 .../ITLocalInputSourceAllInputFormatTest.java      |   1 -
 .../testsEx/indexer/ITOverwriteBatchIndexTest.java |   1 +
 .../testsEx/leadership/ITHighAvailabilityTest.java |   2 +-
 .../resources/indexer/wikipedia_index_queries.json |   2 +-
 .../wikipedia_index_schemaless_queries.json        |  32 +++++++
 ...a_local_input_source_index_task_schemaless.json |  42 +++++++++
 .../incremental/AppendableIndexBuilder.java        |  10 --
 .../segment/incremental/AppendableIndexSpec.java   |   3 -
 .../segment/incremental/IncrementalIndex.java      |   6 +-
 .../incremental/OnheapIncrementalIndex.java        |  39 +++-----
 .../druid/segment/NestedDataColumnIndexerTest.java |   4 +-
 .../OnheapIncrementalIndexBenchmark.java           |   6 +-
 .../incremental/OnheapIncrementalIndexTest.java}   |  34 ++++---
 .../ClientCompactionTaskQueryTuningConfig.java     |   4 +-
 .../DataSourceCompactionConfigTest.java            |   2 +-
 .../UserCompactionTaskQueryTuningConfigTest.java   |   2 +-
 .../duty/NewestSegmentFirstPolicyTest.java         |   4 +-
 54 files changed, 829 insertions(+), 158 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java 
b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index 3c4263ba99..dc7d50afd4 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -34,6 +34,7 @@ public class InputRowSchema
   private final TimestampSpec timestampSpec;
   private final DimensionsSpec dimensionsSpec;
   private final ColumnsFilter columnsFilter;
+
   /**
    * Set of metric names for further downstream processing by {@link 
InputSource}.
    * Empty set if no metric given.
diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java 
b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
index 6e412c30ad..6dfaa9ad3d 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
@@ -49,7 +49,9 @@ public class DimensionsSpec
   private final Map<String, DimensionSchema> dimensionSchemaMap;
   private final boolean includeAllDimensions;
 
-  public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, 
null, false);
+  private final boolean useNestedColumnIndexerForSchemaDiscovery;
+
+  public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, 
null, false, null);
 
   public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
   {
@@ -78,7 +80,7 @@ public class DimensionsSpec
 
   public DimensionsSpec(List<DimensionSchema> dimensions)
   {
-    this(dimensions, null, null, false);
+    this(dimensions, null, null, false, null);
   }
 
   @JsonCreator
@@ -86,7 +88,8 @@ public class DimensionsSpec
       @JsonProperty("dimensions") List<DimensionSchema> dimensions,
       @JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
       @Deprecated @JsonProperty("spatialDimensions") 
List<SpatialDimensionSchema> spatialDimensions,
-      @JsonProperty("includeAllDimensions") boolean includeAllDimensions
+      @JsonProperty("includeAllDimensions") boolean includeAllDimensions,
+      @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean 
useNestedColumnIndexerForSchemaDiscovery
   )
   {
     this.dimensions = dimensions == null
@@ -115,6 +118,8 @@ public class DimensionsSpec
       dimensionSchemaMap.put(newSchema.getName(), newSchema);
     }
     this.includeAllDimensions = includeAllDimensions;
+    this.useNestedColumnIndexerForSchemaDiscovery =
+        useNestedColumnIndexerForSchemaDiscovery != null && 
useNestedColumnIndexerForSchemaDiscovery;
   }
 
   @JsonProperty
@@ -135,6 +140,12 @@ public class DimensionsSpec
     return includeAllDimensions;
   }
 
+  @JsonProperty
+  public boolean useNestedColumnIndexerForSchemaDiscovery()
+  {
+    return useNestedColumnIndexerForSchemaDiscovery;
+  }
+
   @Deprecated
   @JsonIgnore
   public List<SpatialDimensionSchema> getSpatialDimensions()
@@ -188,7 +199,13 @@ public class DimensionsSpec
   @PublicApi
   public DimensionsSpec withDimensions(List<DimensionSchema> dims)
   {
-    return new DimensionsSpec(dims, ImmutableList.copyOf(dimensionExclusions), 
null, includeAllDimensions);
+    return new DimensionsSpec(
+        dims,
+        ImmutableList.copyOf(dimensionExclusions),
+        null,
+        includeAllDimensions,
+        useNestedColumnIndexerForSchemaDiscovery
+    );
   }
 
   public DimensionsSpec withDimensionExclusions(Set<String> dimExs)
@@ -197,14 +214,21 @@ public class DimensionsSpec
         dimensions,
         ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)),
         null,
-        includeAllDimensions
+        includeAllDimensions,
+        useNestedColumnIndexerForSchemaDiscovery
     );
   }
 
   @Deprecated
   public DimensionsSpec withSpatialDimensions(List<SpatialDimensionSchema> 
spatials)
   {
-    return new DimensionsSpec(dimensions, 
ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions);
+    return new DimensionsSpec(
+        dimensions,
+        ImmutableList.copyOf(dimensionExclusions),
+        spatials,
+        includeAllDimensions,
+        useNestedColumnIndexerForSchemaDiscovery
+    );
   }
 
   private void verify(List<SpatialDimensionSchema> spatialDimensions)
@@ -243,6 +267,7 @@ public class DimensionsSpec
     }
     DimensionsSpec that = (DimensionsSpec) o;
     return includeAllDimensions == that.includeAllDimensions
+           && useNestedColumnIndexerForSchemaDiscovery == 
that.useNestedColumnIndexerForSchemaDiscovery
            && Objects.equals(dimensions, that.dimensions)
            && Objects.equals(dimensionExclusions, that.dimensionExclusions);
   }
@@ -250,7 +275,12 @@ public class DimensionsSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(dimensions, dimensionExclusions, includeAllDimensions);
+    return Objects.hash(
+        dimensions,
+        dimensionExclusions,
+        includeAllDimensions,
+        useNestedColumnIndexerForSchemaDiscovery
+    );
   }
 
   @Override
@@ -260,6 +290,7 @@ public class DimensionsSpec
            "dimensions=" + dimensions +
            ", dimensionExclusions=" + dimensionExclusions +
            ", includeAllDimensions=" + includeAllDimensions +
+           ", useNestedColumnIndexerForSchemaDiscovery=" + 
useNestedColumnIndexerForSchemaDiscovery +
            '}';
   }
 
@@ -270,6 +301,8 @@ public class DimensionsSpec
     private List<SpatialDimensionSchema> spatialDimensions;
     private boolean includeAllDimensions;
 
+    private boolean useNestedColumnIndexerForSchemaDiscovery;
+
     public Builder setDimensions(List<DimensionSchema> dimensions)
     {
       this.dimensions = dimensions;
@@ -301,9 +334,21 @@ public class DimensionsSpec
       return this;
     }
 
+    public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean 
useNestedColumnIndexerForSchemaDiscovery)
+    {
+      this.useNestedColumnIndexerForSchemaDiscovery = 
useNestedColumnIndexerForSchemaDiscovery;
+      return this;
+    }
+
     public DimensionsSpec build()
     {
-      return new DimensionsSpec(dimensions, dimensionExclusions, 
spatialDimensions, includeAllDimensions);
+      return new DimensionsSpec(
+          dimensions,
+          dimensionExclusions,
+          spatialDimensions,
+          includeAllDimensions,
+          useNestedColumnIndexerForSchemaDiscovery
+      );
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java 
b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
index 19c0d685a9..aa5ecbb086 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java
@@ -59,7 +59,13 @@ public class JsonLineReader extends TextReader
   )
   {
     super(inputRowSchema, source);
-    this.flattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(keepNullColumns));
+    this.flattener = ObjectFlatteners.create(
+        flattenSpec,
+        new JSONFlattenerMaker(
+            keepNullColumns,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
     this.mapper = mapper;
   }
 
diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java 
b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
index 7a7b98c452..eaf42244cb 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java
@@ -80,7 +80,13 @@ public class JsonNodeReader extends 
IntermediateRowParsingReader<JsonNode>
   {
     this.inputRowSchema = inputRowSchema;
     this.source = source;
-    this.flattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(keepNullColumns));
+    this.flattener = ObjectFlatteners.create(
+        flattenSpec,
+        new JSONFlattenerMaker(
+            keepNullColumns,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
     this.mapper = mapper;
     this.jsonFactory = new JsonFactory();
   }
diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java 
b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 8dee12dc30..1b1402f22a 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -77,7 +77,13 @@ public class JsonReader extends 
IntermediateRowParsingReader<String>
   {
     this.inputRowSchema = inputRowSchema;
     this.source = source;
-    this.flattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(keepNullColumns));
+    this.flattener = ObjectFlatteners.create(
+        flattenSpec,
+        new JSONFlattenerMaker(
+            keepNullColumns,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
     this.mapper = mapper;
     this.jsonFactory = new JsonFactory();
   }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
index b54b51f863..0b8244e29b 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java
@@ -57,15 +57,23 @@ public class JSONFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<JsonN
   private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
   private final boolean keepNullValues;
 
+  private final boolean discoverNestedFields;
 
-  public JSONFlattenerMaker(boolean keepNullValues)
+
+  public JSONFlattenerMaker(boolean keepNullValues, boolean 
discoverNestedFields)
   {
     this.keepNullValues = keepNullValues;
+    this.discoverNestedFields = discoverNestedFields;
   }
 
   @Override
   public Iterable<String> discoverRootFields(final JsonNode obj)
   {
+    // if discovering nested fields, just return all root fields since we want 
everything
+    // else, we filter for literals and arrays of literals
+    if (discoverNestedFields) {
+      return obj::fieldNames;
+    }
     return FluentIterable.from(obj::fields)
                          .filter(
                              entry -> {
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
index 48777a8311..281d8442cc 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java
@@ -42,7 +42,10 @@ public class JSONPathParser implements Parser<String, Object>
   public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean 
keepNullColumns)
   {
     this.mapper = mapper == null ? new ObjectMapper() : mapper;
-    this.flattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(keepNullColumns));
+    this.flattener = ObjectFlatteners.create(
+        flattenSpec,
+        new JSONFlattenerMaker(keepNullColumns, false)
+    );
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
index 98f087333a..b2040d73db 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
@@ -218,7 +218,9 @@ public class ObjectFlatteners
   {
     JsonProvider getJsonProvider();
     /**
-     * List all "root" primitive properties and primitive lists (no nested 
objects, no lists of objects)
+     * List all "root" fields. If
+     * {@link 
org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery}
 is false, this
+     * method should filter fields to include only fields that contain 
primitive and lists of primitive values
      */
     Iterable<String> discoverRootFields(T obj);
 
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
index 0a1b09578b..5583081db1 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.BinaryNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,7 +38,8 @@ public class JSONFlattenerMakerTest
 {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-  private static final JSONFlattenerMaker FLATTENER_MAKER = new 
JSONFlattenerMaker(true);
+  private static final JSONFlattenerMaker FLATTENER_MAKER = new 
JSONFlattenerMaker(true, false);
+  private static final JSONFlattenerMaker FLATTENER_MAKER_NESTED = new 
JSONFlattenerMaker(true, true);
 
   @Test
   public void testStrings() throws JsonProcessingException
@@ -169,4 +171,32 @@ public class JSONFlattenerMakerTest
     result = FLATTENER_MAKER.finalizeConversionForMap(node);
     Assert.assertEquals(expectedList, result);
   }
+
+  @Test
+  public void testDiscovery() throws JsonProcessingException
+  {
+    Map<String, Object> theMap =
+        ImmutableMap.<String, Object>builder()
+                    .put("bool", true)
+                    .put("int", 1)
+                    .put("long", 1L)
+                    .put("float", 0.11f)
+                    .put("double", 0.33)
+                    .put("binary", new byte[]{0x01, 0x02, 0x03})
+                    .put("list", ImmutableList.of("foo", "bar", "baz"))
+                    .put("anotherList", ImmutableList.of(1, 2, 3))
+                    .put("nested", ImmutableMap.of("x", 1L, "y", 2L, "z", 3L))
+                    .build();
+
+    JsonNode node = 
OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(theMap));
+    Assert.assertTrue(node.isObject());
+    Assert.assertEquals(
+        ImmutableSet.of("bool", "int", "long", "float", "double", "binary", 
"list", "anotherList"),
+        ImmutableSet.copyOf(FLATTENER_MAKER.discoverRootFields(node))
+    );
+    Assert.assertEquals(
+        ImmutableSet.of("bool", "int", "long", "float", "double", "binary", 
"list", "anotherList", "nested"),
+        ImmutableSet.copyOf(FLATTENER_MAKER_NESTED.discoverRootFields(node))
+    );
+  }
 }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
index e0b0fbcc51..ab6c50ef18 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
@@ -34,7 +34,7 @@ public class ObjectFlattenersTest
 {
   private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}";
 
-  private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new 
JSONFlattenerMaker(true);
+  private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new 
JSONFlattenerMaker(true, false);
   private static final ObjectFlattener FLATTENER = ObjectFlatteners.create(
       new JSONPathSpec(
           true,
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 4e254c18ee..ba9d895b1f 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -92,14 +92,27 @@ public class AvroFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<Gener
   private final boolean fromPigAvroStorage;
   private final boolean binaryAsString;
 
+  private final boolean discoverNestedFields;
+
   /**
-   * @param fromPigAvroStorage boolean to specify the data file is stored 
using AvroStorage
-   * @param binaryAsString boolean to encode the byte[] as a string.
+   * @param fromPigAvroStorage    boolean to specify the data file is stored 
using AvroStorage
+   * @param binaryAsString        if true, treat byte[] as utf8 encoded values 
and coerce to strings, else leave as byte[]
+   * @param extractUnionsByType   if true, unions will be extracted to 
separate nested fields for each type. See
+   *                              {@link 
GenericAvroJsonProvider#extractUnionTypes(Object)} for more details
+   * @param discoverNestedFields  if true, {@link 
#discoverRootFields(GenericRecord)} will return the full set of
+   *                              fields, else this list will be filtered to 
contain only simple literals and arrays
+   *                              of simple literals
    */
-  public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean 
binaryAsString, final boolean extractUnionsByType)
+  public AvroFlattenerMaker(
+      final boolean fromPigAvroStorage,
+      final boolean binaryAsString,
+      final boolean extractUnionsByType,
+      final boolean discoverNestedFields
+  )
   {
     this.fromPigAvroStorage = fromPigAvroStorage;
     this.binaryAsString = binaryAsString;
+    this.discoverNestedFields = discoverNestedFields;
 
     this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
     this.jsonPathConfiguration =
@@ -113,6 +126,11 @@ public class AvroFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<Gener
   @Override
   public Set<String> discoverRootFields(final GenericRecord obj)
   {
+    // if discovering nested fields, just return all root fields since we want 
everything
+    // else, we filter for literals and arrays of literals
+    if (discoverNestedFields) {
+      return 
obj.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toSet());
+    }
     return obj.getSchema()
               .getFields()
               .stream()
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 66552f9eda..3039fbb7c8 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -64,7 +64,15 @@ public class AvroOCFReader extends 
IntermediateRowParsingReader<GenericRecord>
     this.source = source;
     this.temporaryDirectory = temporaryDirectory;
     this.readerSchema = readerSchema;
-    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new 
AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
+    this.recordFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new AvroFlattenerMaker(
+            false,
+            binaryAsString,
+            extractUnionsByType,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
   }
 
   @Override
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 6d399a8dc6..12f8a20bab 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.avro;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -50,7 +51,17 @@ public class AvroParsers
       flattenSpec = JSONPathSpec.DEFAULT;
     }
 
-    return ObjectFlatteners.create(flattenSpec, new 
AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType));
+    final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+
+    return ObjectFlatteners.create(
+        flattenSpec,
+        new AvroFlattenerMaker(
+            fromPigAvroStorage,
+            binaryAsString,
+            extractUnionsByType,
+            dimensionsSpec != null && 
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
   }
 
   public static List<InputRow> parseGenericRecord(
diff --git 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index 4ed6a8a2ab..51bc733d43 100644
--- 
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ 
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -60,7 +60,15 @@ public class AvroStreamReader extends 
IntermediateRowParsingReader<GenericRecord
     this.inputRowSchema = inputRowSchema;
     this.source = source;
     this.avroBytesDecoder = avroBytesDecoder;
-    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new 
AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
+    this.recordFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new AvroFlattenerMaker(
+            false,
+            binaryAsString,
+            extractUnionsByType,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
   }
 
   @Override
diff --git 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
index 1174d2684f..28caea3049 100644
--- 
a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
+++ 
b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input.avro;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.druid.data.input.AvroStreamInputRowParserTest;
@@ -40,9 +41,9 @@ import java.util.Map;
 public class AvroFlattenerMakerTest
 {
   private static final AvroFlattenerMaker 
FLATTENER_WITHOUT_EXTRACT_UNION_BY_TYPE =
-      new AvroFlattenerMaker(false, false, false);
+      new AvroFlattenerMaker(false, false, false, false);
   private static final AvroFlattenerMaker FLATTENER_WITH_EXTRACT_UNION_BY_TYPE 
=
-      new AvroFlattenerMaker(false, false, true);
+      new AvroFlattenerMaker(false, false, true, false);
 
   private static final SomeAvroDatum RECORD = 
AvroStreamInputRowParserTest.buildSomeAvroDatum();
 
@@ -77,7 +78,7 @@ public class AvroFlattenerMakerTest
   @Test
   public void jsonPathExtractorExtractUnionsByType()
   {
-    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, 
true);
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, 
true, false);
 
     // Unmamed types are accessed by type
 
@@ -156,6 +157,59 @@ public class AvroFlattenerMakerTest
     );
   }
 
+  @Test
+  public void testDiscovery()
+  {
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false, false, 
true, false);
+    final AvroFlattenerMaker flattenerNested = new AvroFlattenerMaker(false, 
false, true, true);
+
+    SomeAvroDatum input = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+
+    Assert.assertEquals(
+        ImmutableSet.of(
+            "someOtherId",
+            "someStringArray",
+            "someIntArray",
+            "someFloat",
+            "eventType",
+            "someFixed",
+            "someBytes",
+            "someUnion",
+            "id",
+            "someEnum",
+            "someLong",
+            "someInt",
+            "timestamp"
+        ),
+        ImmutableSet.copyOf(flattener.discoverRootFields(input))
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(
+            "someStringValueMap",
+            "someOtherId",
+            "someStringArray",
+            "someIntArray",
+            "someFloat",
+            "isValid",
+            "someIntValueMap",
+            "eventType",
+            "someFixed",
+            "someBytes",
+            "someRecord",
+            "someMultiMemberUnion",
+            "someNull",
+            "someRecordArray",
+            "someUnion",
+            "id",
+            "someEnum",
+            "someLong",
+            "someInt",
+            "timestamp"
+        ),
+        ImmutableSet.copyOf(flattenerNested.discoverRootFields(input))
+    );
+  }
+
   private void getRootField_common(final SomeAvroDatum record, final 
AvroFlattenerMaker flattener)
   {
     Assert.assertEquals(
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index 73ff6a47a8..aad06bac7b 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -82,7 +82,12 @@ public class KafkaInputFormat implements InputFormat
   public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
   {
     SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = 
(SettableByteEntity<KafkaRecordEntity>) source;
-    InputRowSchema newInputRowSchema = new InputRowSchema(dummyTimestampSpec, 
inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
+    InputRowSchema newInputRowSchema = new InputRowSchema(
+        dummyTimestampSpec,
+        inputRowSchema.getDimensionsSpec(),
+        inputRowSchema.getColumnsFilter(),
+        inputRowSchema.getMetricNames()
+    );
     return new KafkaInputReader(
         inputRowSchema,
         settableByteEntitySource,
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
index 9fcefcba2a..5ab8cda968 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.orc;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
@@ -55,7 +56,14 @@ public class OrcHadoopInputRowParser implements 
InputRowParser<OrcStruct>
     } else {
       flattenSpec = JSONPathSpec.DEFAULT;
     }
-    this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new 
OrcStructFlattenerMaker(this.binaryAsString));
+    final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+    this.orcStructFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new OrcStructFlattenerMaker(
+            this.binaryAsString,
+            dimensionsSpec != null && 
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
     this.parser = new MapInputRowParser(parseSpec);
   }
 
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index 5bcec8c80e..89ba536562 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -70,7 +70,13 @@ public class OrcReader extends 
IntermediateRowParsingReader<OrcStruct>
     this.inputRowSchema = inputRowSchema;
     this.source = source;
     this.temporaryDirectory = temporaryDirectory;
-    this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new 
OrcStructFlattenerMaker(binaryAsString));
+    this.orcStructFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new OrcStructFlattenerMaker(
+            binaryAsString,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
   }
 
   @Override
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
index ee770d3e8e..016ddb3f90 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java
@@ -43,7 +43,9 @@ public class OrcStructFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<
   private final JsonProvider orcJsonProvider;
   private final OrcStructConverter converter;
 
-  OrcStructFlattenerMaker(boolean binaryAsString)
+  private final boolean discoverNestedFields;
+
+  OrcStructFlattenerMaker(boolean binaryAsString, boolean disocverNestedFields)
   {
     this.converter = new OrcStructConverter(binaryAsString);
     this.orcJsonProvider = new OrcStructJsonProvider(converter);
@@ -52,11 +54,17 @@ public class OrcStructFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<
                                               .mappingProvider(new 
NotImplementedMappingProvider())
                                               
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
                                               .build();
+    this.discoverNestedFields = disocverNestedFields;
   }
 
   @Override
   public Iterable<String> discoverRootFields(OrcStruct obj)
   {
+    // if discovering nested fields, just return all root fields since we want 
everything
+    // else, we filter for literals and arrays of literals
+    if (discoverNestedFields) {
+      return obj.getSchema().getFieldNames();
+    }
     List<String> fields = obj.getSchema().getFieldNames();
     List<TypeDescription> children = obj.getSchema().getChildren();
     List<String> primitiveFields = new ArrayList<>();
diff --git 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
index 03083f61eb..cf8ba19d9b 100644
--- 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
+++ 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -52,6 +52,7 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 
 public class OrcReaderTest extends InitializedNullHandlingTest
 {
@@ -421,6 +422,109 @@ public class OrcReaderTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testNestedColumnSchemaless() throws IOException
+  {
+    final OrcInputFormat inputFormat = new OrcInputFormat(
+        new JSONPathSpec(true, ImmutableList.of()),
+        null,
+        new Configuration()
+    );
+    final InputRowSchema schema = new InputRowSchema(
+        new TimestampSpec("ts", "millis", null),
+        
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+        ColumnsFilter.all(),
+        null
+    );
+    final FileEntity entity = new FileEntity(new 
File("example/orc-file-11-format.orc"));
+
+    final InputEntityReader reader = inputFormat.createReader(schema, entity, 
temporaryFolder.newFolder());
+
+    List<String> dims = ImmutableList.of(
+        "boolean1",
+        "byte1",
+        "short1",
+        "int1",
+        "long1",
+        "float1",
+        "double1",
+        "bytes1",
+        "string1",
+        "middle",
+        "list",
+        "map",
+        "ts",
+        "decimal1"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      int actualRowCount = 0;
+
+      // Check the first row
+      Assert.assertTrue(iterator.hasNext());
+      InputRow row = iterator.next();
+
+      Assert.assertEquals(dims, row.getDimensions());
+      actualRowCount++;
+      Assert.assertEquals(
+          ImmutableMap.of(
+              "list",
+              ImmutableList.of(
+                  ImmutableMap.of("int1", 1, "string1", "bye"),
+                  ImmutableMap.of("int1", 2, "string1", "sigh")
+              )
+          ),
+          row.getRaw("middle")
+      );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("int1", 3, "string1", "good"),
+              ImmutableMap.of("int1", 4, "string1", "bad")
+          ),
+          row.getRaw("list")
+      );
+      Assert.assertEquals(
+          ImmutableMap.of(),
+          row.getRaw("map")
+      );
+      Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), 
row.getTimestamp());
+
+      while (iterator.hasNext()) {
+        actualRowCount++;
+        row = iterator.next();
+        Assert.assertEquals(dims, row.getDimensions());
+      }
+
+      // Check the last row
+      Assert.assertEquals(
+          ImmutableMap.of(
+              "list",
+              ImmutableList.of(
+                  ImmutableMap.of("int1", 1, "string1", "bye"),
+                  ImmutableMap.of("int1", 2, "string1", "sigh")
+              )
+          ),
+          row.getRaw("middle")
+      );
+      Assert.assertEquals(
+          ImmutableList.of(
+              ImmutableMap.of("int1", 100000000, "string1", "cat"),
+              ImmutableMap.of("int1", -100000, "string1", "in"),
+              ImmutableMap.of("int1", 1234, "string1", "hat")
+          ),
+          row.getRaw("list")
+      );
+      Assert.assertEquals(
+          ImmutableMap.of(
+              "chani", ImmutableMap.of("int1", 5, "string1", "chani"),
+              "mauddib", ImmutableMap.of("int1", 1, "string1", "mauddib")
+          ),
+          row.getRaw("map")
+      );
+
+      Assert.assertEquals(7500, actualRowCount);
+    }
+  }
+
   @Test
   public void testListMap() throws IOException
   {
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index aced1106ee..494f662516 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -66,7 +66,13 @@ public class ParquetReader extends 
IntermediateRowParsingReader<Group>
     this.inputRowSchema = inputRowSchema;
     this.source = source;
     this.temporaryDirectory = temporaryDirectory;
-    this.flattener = ObjectFlatteners.create(flattenSpec, new 
ParquetGroupFlattenerMaker(binaryAsString));
+    this.flattener = ObjectFlatteners.create(
+        flattenSpec,
+        new ParquetGroupFlattenerMaker(
+            binaryAsString,
+            
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
   }
 
   @Override
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index 796891ea8e..5c2dd2844f 100755
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.avro.AvroFlattenerMaker;
 import org.apache.druid.data.input.avro.AvroParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
@@ -76,9 +77,16 @@ public class ParquetAvroHadoopInputRowParser implements 
InputRowParser<GenericRe
       flattenSpec = JSONPathSpec.DEFAULT;
     }
 
+    final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+
     this.recordFlattener = ObjectFlatteners.create(
         flattenSpec,
-        new AvroFlattenerMaker(false, this.binaryAsString, 
this.extractUnionsByType)
+        new AvroFlattenerMaker(
+            false,
+            this.binaryAsString,
+            this.extractUnionsByType,
+            dimensionsSpec != null && 
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+        )
     );
   }
 
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
index 7725055462..a243107cc2 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java
@@ -41,7 +41,9 @@ public class ParquetGroupFlattenerMaker implements 
ObjectFlatteners.FlattenerMak
   private final ParquetGroupConverter converter;
   private final JsonProvider parquetJsonProvider;
 
-  public ParquetGroupFlattenerMaker(boolean binaryAsString)
+  private final boolean discoverNestedFields;
+
+  public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean 
discoverNestedFields)
   {
     this.converter = new ParquetGroupConverter(binaryAsString);
     this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
@@ -50,11 +52,17 @@ public class ParquetGroupFlattenerMaker implements 
ObjectFlatteners.FlattenerMak
                                               .mappingProvider(new 
NotImplementedMappingProvider())
                                               
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
                                               .build();
+    this.discoverNestedFields = discoverNestedFields;
   }
 
   @Override
   public Set<String> discoverRootFields(Group obj)
   {
+    // if discovering nested fields, just return all root fields since we want 
everything
+    // else, we filter for literals and arrays of literals
+    if (discoverNestedFields) {
+      return 
obj.getType().getFields().stream().map(Type::getName).collect(Collectors.toSet());
+    }
     return obj.getType()
               .getFields()
               .stream()
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
index 762bb709d1..f3e8539685 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.parquet.simple;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
@@ -54,7 +55,14 @@ public class ParquetHadoopInputRowParser implements 
InputRowParser<Group>
     } else {
       flattenSpec = JSONPathSpec.DEFAULT;
     }
-    this.groupFlattener = ObjectFlatteners.create(flattenSpec, new 
ParquetGroupFlattenerMaker(this.binaryAsString));
+    final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec();
+    this.groupFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new ParquetGroupFlattenerMaker(
+            this.binaryAsString,
+            dimensionsSpec != null && 
dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
+        )
+    );
     this.parser = new MapInputRowParser(parseSpec);
   }
 
diff --git 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
index 387ff2cf91..950039d069 100644
--- 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
+++ 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java
@@ -162,4 +162,77 @@ public class NestedColumnParquetReaderTest extends 
BaseParquetReaderTest
     Assert.assertEquals(1L, rows.get(0).getRaw("t_a2_1_b1"));
     Assert.assertEquals(1L, rows.get(0).getRaw("tt_a2_0_b1"));
   }
+
+  @Test
+  public void testNestedColumnSchemalessNestedTestFileNoNested() throws 
IOException
+  {
+    final String file = "example/flattening/test_nested_1.parquet";
+    InputRowSchema schema = new InputRowSchema(
+        new TimestampSpec("timestamp", "auto", null),
+        
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(),
+        ColumnsFilter.all(),
+        null
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
+    InputEntityReader reader = createReader(
+        file,
+        schema,
+        flattenSpec
+    );
+
+    List<InputRow> rows = readAllRows(reader);
+    Assert.assertEquals(ImmutableList.of("dim1", "metric1", "timestamp"), 
rows.get(0).getDimensions());
+    Assert.assertEquals(FlattenSpecParquetInputTest.TS1, 
rows.get(0).getTimestamp().toString());
+    Assert.assertEquals(ImmutableList.of("d1v1"), 
rows.get(0).getDimension("dim1"));
+    Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1"));
+    Assert.assertEquals(ImmutableList.of("1"), 
rows.get(0).getDimension("metric1"));
+    Assert.assertEquals(1, rows.get(0).getRaw("metric1"));
+    Assert.assertEquals(1, rows.get(0).getMetric("metric1"));
+    // can still read even if it doesn't get reported as a dimension
+    Assert.assertEquals(
+        ImmutableMap.of(
+            "listDim", ImmutableList.of("listDim1v1", "listDim1v2"),
+            "dim3", 1,
+            "dim2", "d2v1",
+            "metric2", 2
+        ),
+        rows.get(0).getRaw("nestedData")
+    );
+  }
+
+  @Test
+  public void testNestedColumnSchemalessNestedTestFile() throws IOException
+  {
+    final String file = "example/flattening/test_nested_1.parquet";
+    InputRowSchema schema = new InputRowSchema(
+        new TimestampSpec("timestamp", "auto", null),
+        
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+        ColumnsFilter.all(),
+        null
+    );
+    JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of());
+    InputEntityReader reader = createReader(
+        file,
+        schema,
+        flattenSpec
+    );
+
+    List<InputRow> rows = readAllRows(reader);
+    Assert.assertEquals(ImmutableList.of("nestedData", "dim1", "metric1", 
"timestamp"), rows.get(0).getDimensions());
+    Assert.assertEquals(FlattenSpecParquetInputTest.TS1, 
rows.get(0).getTimestamp().toString());
+    Assert.assertEquals(ImmutableList.of("d1v1"), 
rows.get(0).getDimension("dim1"));
+    Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1"));
+    Assert.assertEquals(ImmutableList.of("1"), 
rows.get(0).getDimension("metric1"));
+    Assert.assertEquals(1, rows.get(0).getRaw("metric1"));
+    Assert.assertEquals(1, rows.get(0).getMetric("metric1"));
+    Assert.assertEquals(
+        ImmutableMap.of(
+            "listDim", ImmutableList.of("listDim1v1", "listDim1v2"),
+            "dim3", 1,
+            "dim2", "d2v1",
+            "metric2", 2
+        ),
+        rows.get(0).getRaw("nestedData")
+    );
+  }
 }
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
index a3314e4273..a3d69226fd 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java
@@ -56,6 +56,13 @@ public class ProtobufFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<M
 
   private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
 
+  private final boolean discoverNestedFields;
+
+  public ProtobufFlattenerMaker(boolean discoverNestedFields)
+  {
+    this.discoverNestedFields = discoverNestedFields;
+  }
+
   @Override
   public JsonProvider getJsonProvider()
   {
@@ -65,7 +72,11 @@ public class ProtobufFlattenerMaker implements 
ObjectFlatteners.FlattenerMaker<M
   @Override
   public Iterable<String> discoverRootFields(Map<String, Object> obj)
   {
-    // in the future we can just return obj.keySet(), but for now this doesnt 
expect nested fields...
+    // if discovering nested fields, just return all root fields since we want 
everything
+    // else, we filter for literals and arrays of literals
+    if (discoverNestedFields) {
+      return obj.keySet();
+    }
     Set<String> rootFields = 
Sets.newHashSetWithExpectedSize(obj.keySet().size());
     for (Map.Entry<String, Object> entry : obj.entrySet()) {
       if (entry.getValue() instanceof List || entry.getValue() instanceof Map) 
{
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
index 9bb50a247b..7c0881b69c 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java
@@ -40,7 +40,12 @@ public class ProtobufInputRowSchema extends InputRowSchema
 {
   public ProtobufInputRowSchema(InputRowSchema inputRowSchema)
   {
-    super(new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()), 
inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter());
+    super(
+        new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()),
+        inputRowSchema.getDimensionsSpec(),
+        inputRowSchema.getColumnsFilter(),
+        inputRowSchema.getMetricNames()
+    );
   }
 
   static class ProtobufTimestampSpec extends TimestampSpec
@@ -51,9 +56,9 @@ public class ProtobufInputRowSchema extends InputRowSchema
     }
 
     /**
-     * Extracts the timestamp from the record. If the timestamp column is of 
complex type such as {@link Timestamp}, then the timestamp
-     * is first serialized to string via {@link JsonFormat}. Directly calling 
{@code toString()} on {@code Timestamp}
-     * returns an unparseable string.
+     * Extracts the timestamp from the record. If the timestamp column is of 
complex type such as {@link Timestamp},
+     * then the timestamp is first serialized to string via {@link 
JsonFormat}. Directly calling {@code toString()}
+     * on {@code Timestamp} returns an unparseable string.
      */
     @Override
     @Nullable
diff --git 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index 2dc6aa1f8c..ac77a545dd 100644
--- 
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++ 
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -57,7 +57,10 @@ public class ProtobufReader extends 
IntermediateRowParsingReader<DynamicMessage>
   )
   {
     this.inputRowSchema = inputRowSchema;
-    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new 
ProtobufFlattenerMaker());
+    this.recordFlattener = ObjectFlatteners.create(
+        flattenSpec,
+        new 
ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery())
+    );
     this.source = source;
     this.protobufBytesDecoder = protobufBytesDecoder;
   }
diff --git 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
index 0b5d096449..9255e1e7eb 100644
--- 
a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
+++ 
b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java
@@ -328,6 +328,77 @@ public class ProtobufInputFormatTest
 
   }
 
+  @Test
+  public void testParseNestedDataSchemaless() throws Exception
+  {
+    ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
+        JSONPathSpec.DEFAULT,
+        decoder
+    );
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, 
ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = 
ProtobufInputRowParserTest.buildNestedData(dateTime);
+
+    final ByteEntity entity = new 
ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
+
+    InputEntityReader reader = protobufInputFormat.createReader(
+        new InputRowSchema(
+            timestampSpec,
+            
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
+            null,
+            null
+        ),
+        entity,
+        null
+    );
+
+    TransformSpec transformSpec = new TransformSpec(
+        null,
+        Lists.newArrayList(
+            new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", 
TestExprMacroTable.INSTANCE),
+            new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", 
TestExprMacroTable.INSTANCE)
+        )
+    );
+    TransformingInputEntityReader transformingReader = new 
TransformingInputEntityReader(
+        reader,
+        transformSpec.toTransformer()
+    );
+
+
+    InputRow row = transformingReader.read().next();
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            "someOtherId",
+            "bar",
+            "someIntColumn",
+            "isValid",
+            "foo",
+            "description",
+            "someLongColumn",
+            "someFloatColumn",
+            "eventType",
+            "id",
+            "someBytesColumn",
+            "timestamp"
+        ),
+        row.getDimensions()
+    );
+
+    Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo"));
+    Assert.assertEquals(
+        ImmutableList.of(ImmutableMap.of("bar", "bar0"), 
ImmutableMap.of("bar", "bar1")),
+        row.getRaw("bar")
+    );
+    Assert.assertArrayEquals(
+        new byte[]{0x01, 0x02, 0x03, 0x04},
+        (byte[]) row.getRaw("someBytesColumn")
+    );
+    ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
+
+  }
+
   @Test
   public void testParseNestedDataTransformsOnly() throws Exception
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
index aaa2cd0789..8824decc05 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -22,10 +22,7 @@ package org.apache.druid.indexing.overlord.sampler;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputSource;
 import org.apache.druid.java.util.common.HumanReadableBytes;
-import org.apache.druid.segment.indexing.DataSchema;
 
 import javax.annotation.Nullable;
 
@@ -35,13 +32,9 @@ public class SamplerConfig
   private static final int MAX_NUM_ROWS = 5000;
   private static final int DEFAULT_TIMEOUT_MS = 10000;
 
-
-
   private final int numRows;
   private final int timeoutMs;
-
   private final long maxBytesInMemory;
-
   private final long maxClientResponseBytes;
 
   @JsonCreator
@@ -92,13 +85,13 @@ public class SamplerConfig
 
   /**
    * Maximum number of bytes in memory that the {@link 
org.apache.druid.segment.incremental.IncrementalIndex} used by
-   * {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema, 
SamplerConfig)} will be allowed to
-   * accumulate before aborting sampling. Particularly useful for limiting 
footprint of sample operations as well as
-   * overall response size from sample requests. However, it is not directly 
correlated to response size since it
-   * also contains the "raw" input data, so actual responses will likely be at 
least twice the size of this value,
-   * depending on factors such as number of transforms, aggregations in the 
case of rollup, whether all columns
-   * of the input are present in the dimension spec, and so on. If it is 
preferred to control client response size,
-   * use {@link SamplerConfig#getMaxClientResponseBytes()} instead.
+   * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, 
org.apache.druid.data.input.InputFormat, 
org.apache.druid.segment.indexing.DataSchema, SamplerConfig})
+   * will be allowed to accumulate before aborting sampling. Particularly 
useful for limiting footprint of sample
+   * operations as well as overall response size from sample requests. 
However, it is not directly correlated to
+   * response size since it also contains the "raw" input data, so actual 
responses will likely be at least twice the
+   * size of this value, depending on factors such as number of transforms, 
aggregations in the case of rollup, whether
+   * all columns of the input are present in the dimension spec, and so on. If 
it is preferred to control client
+   * response size, use {@link SamplerConfig#getMaxClientResponseBytes()} 
instead.
    */
   public long getMaxBytesInMemory()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index b3db2b71d1..c6b6d6fdf0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -252,7 +252,7 @@ public class ClientCompactionTaskQuerySerdeTest
             new ParallelIndexTuningConfig(
                 null,
                 null,
-                new OnheapIncrementalIndex.Spec(true, false),
+                new OnheapIncrementalIndex.Spec(true),
                 40000,
                 2000L,
                 null,
@@ -316,7 +316,7 @@ public class ClientCompactionTaskQuerySerdeTest
         ),
         new ClientCompactionTaskQueryTuningConfig(
             100,
-            new OnheapIncrementalIndex.Spec(true, false),
+            new OnheapIncrementalIndex.Spec(true),
             40000,
             2000L,
             30000L,
diff --git a/integration-tests-ex/cases/cluster/Common/dependencies.yaml 
b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
index ccfce630a2..4eeb85a49c 100644
--- a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
+++ b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
@@ -67,7 +67,8 @@ services:
   # See https://hub.docker.com/_/mysql
   # The image will intialize the user and DB upon first start.
   metadata:
-    #    platform: linux/x86_64  - Add when running on M1 Macs
+    # Uncomment the following when running on M1 Macs:
+    # platform: linux/x86_64
     image: mysql:$MYSQL_IMAGE_VERSION
     container_name: metadata
     command:
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
index 86e0ece8d5..58e47bbe7e 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java
@@ -26,7 +26,6 @@ import org.skife.jdbi.v2.DBI;
 import org.skife.jdbi.v2.Handle;
 
 import javax.inject.Inject;
-
 import java.sql.Connection;
 import java.sql.SQLException;
 
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
index c14ea745aa..e23c996c88 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java
@@ -25,7 +25,6 @@ import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.IntegrationTestingConfigProvider;
 
 import javax.inject.Inject;
-
 import java.util.Map;
 import java.util.Properties;
 
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index 144722eea9..7b983b9cf1 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -155,14 +155,23 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
     return fileString;
   }
 
+  protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    doTestQuery(dataSource, queryFilePath, false);
+  }
+
   /**
    * Reads native queries from a file and runs against the provided datasource.
    */
-  protected void doTestQuery(String dataSource, String queryFilePath)
+  protected void doTestQuery(String dataSource, String queryFilePath, boolean 
isSql)
   {
     try {
       String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
-      queryHelper.testQueriesFromString(query);
+      if (isSql) {
+        sqlQueryHelper.testQueriesFromString(query);
+      } else {
+        queryHelper.testQueriesFromString(query);
+      }
     }
     catch (Exception e) {
       LOG.error(e, "Error while running test query at path " + queryFilePath);
@@ -255,6 +264,31 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
         queryFilePath,
         waitForNewVersion,
         runTestQueries,
+        false,
+        waitForSegmentsToLoad,
+        segmentAvailabilityConfirmationPair
+    );
+  }
+
+  protected void doIndexTest(
+      String dataSource,
+      String indexTaskFilePath,
+      Function<String, String> taskSpecTransform,
+      String queryFilePath,
+      boolean waitForNewVersion,
+      boolean runTestQueries,
+      boolean waitForSegmentsToLoad,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws IOException
+  {
+    doIndexTest(
+        dataSource,
+        indexTaskFilePath,
+        taskSpecTransform,
+        queryFilePath,
+        waitForNewVersion,
+        runTestQueries,
+        false,
         waitForSegmentsToLoad,
         segmentAvailabilityConfirmationPair
     );
@@ -267,6 +301,7 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
       String queryFilePath,
       boolean waitForNewVersion,
       boolean runTestQueries,
+      boolean isSqlQueries,
       boolean waitForSegmentsToLoad,
       Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
   ) throws IOException
@@ -288,7 +323,7 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
         segmentAvailabilityConfirmationPair
     );
     if (runTestQueries) {
-      doTestQuery(dataSource, queryFilePath);
+      doTestQuery(dataSource, queryFilePath, isSqlQueries);
     }
   }
 
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
index ce8a9f5c13..868ffdb8e5 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.StringUtils;
 
 import javax.annotation.Nonnull;
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Function;
@@ -48,6 +49,28 @@ public abstract class 
AbstractLocalInputSourceParallelIndexTest extends Abstract
       @Nonnull Map<String, Object> extraInputFormatMap,
       Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
   ) throws Exception
+  {
+    doIndexTest(
+        inputFormatDetails,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        false,
+        Collections.emptyMap(),
+        extraInputFormatMap,
+        segmentAvailabilityConfirmationPair
+    );
+  }
+
+
+  public void doIndexTest(
+      InputFormatDetails inputFormatDetails,
+      String ingestSpecTemplate,
+      String queries,
+      boolean useSqlQueries,
+      @Nonnull Map<String, Object> templateValues,
+      @Nonnull Map<String, Object> extraInputFormatMap,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws Exception
   {
     final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
     Map<String, Object> inputFormatMap = new ImmutableMap.Builder<String, 
Object>().putAll(extraInputFormatMap)
@@ -58,6 +81,13 @@ public abstract class 
AbstractLocalInputSourceParallelIndexTest extends Abstract
     ) {
       final Function<String, String> sqlInputSourcePropsTransform = spec -> {
         try {
+          for (Map.Entry<String, Object> entry : templateValues.entrySet()) {
+            spec = StringUtils.replace(
+                spec,
+                "%%" + entry.getKey() + "%%",
+                jsonMapper.writeValueAsString(entry.getValue())
+            );
+          }
           spec = StringUtils.replace(
               spec,
               "%%PARTITIONS_SPEC%%",
@@ -102,11 +132,12 @@ public abstract class 
AbstractLocalInputSourceParallelIndexTest extends Abstract
 
       doIndexTest(
           indexDatasource,
-          INDEX_TASK,
+          ingestSpecTemplate,
           sqlInputSourcePropsTransform,
-          INDEX_QUERIES_RESOURCE,
+          queries,
           false,
           true,
+          useSqlQueries,
           true,
           segmentAvailabilityConfirmationPair
       );
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
similarity index 55%
copy from 
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
copy to 
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
index 8482952db2..61deca75ed 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java
@@ -22,22 +22,25 @@ package org.apache.druid.testsEx.indexer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.Pair;
-import 
org.apache.druid.tests.indexer.AbstractLocalInputSourceParallelIndexTest;
 import org.apache.druid.testsEx.categories.InputFormat;
 import org.apache.druid.testsEx.config.DruidTestRunner;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 @RunWith(DruidTestRunner.class)
 @Category(InputFormat.class)
-public class ITLocalInputSourceAllInputFormatTest extends 
AbstractLocalInputSourceParallelIndexTest
+public class ITLocalInputSourceAllFormatSchemalessTest extends 
AbstractLocalInputSourceParallelIndexTest
 {
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_local_input_source_index_task_schemaless.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_schemaless_queries.json";
+
   @Test
-  public void testAvroInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
+  public void 
testAvroInputFormatIndexDataIngestionSpecWithFileSchemaSchemaless() throws 
Exception
   {
     List fieldList = ImmutableList.of(
         ImmutableMap.of("name", "timestamp", "type", "string"),
@@ -61,42 +64,70 @@ public class ITLocalInputSourceAllInputFormatTest extends 
AbstractLocalInputSour
                                  "type", "record",
                                  "name", "wikipedia",
                                  "fields", fieldList);
-    doIndexTest(InputFormatDetails.AVRO, ImmutableMap.of("schema", schema), 
new Pair<>(false, false));
-  }
-
-  @Test
-  public void testAvroInputFormatIndexDataIngestionSpecWithoutSchema() throws 
Exception
-  {
-    doIndexTest(InputFormatDetails.AVRO, new Pair<>(false, false));
-  }
-
-  @Test
-  public void testJsonInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
-  {
-    doIndexTest(InputFormatDetails.JSON, new Pair<>(false, false));
+    doIndexTest(
+        AbstractITBatchIndexTest.InputFormatDetails.AVRO,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        true,
+        ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+        ImmutableMap.of("schema", schema),
+        new Pair<>(false, false)
+    );
   }
 
   @Test
-  public void testTsvInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
+  public void 
testAvroInputFormatIndexDataIngestionSpecNoFileSchemaSchemaless() throws 
Exception
   {
-    doIndexTest(InputFormatDetails.TSV, 
ImmutableMap.of("findColumnsFromHeader", true), new Pair<>(false, false));
+    doIndexTest(
+        AbstractITBatchIndexTest.InputFormatDetails.AVRO,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        true,
+        ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+        Collections.emptyMap(),
+        new Pair<>(false, false)
+    );
   }
 
   @Test
-  public void testParquetInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
+  public void testJsonInputFormatIndexDataIngestionSpecSchemaless() throws 
Exception
   {
-    doIndexTest(InputFormatDetails.PARQUET, new Pair<>(false, false));
+    doIndexTest(
+        AbstractITBatchIndexTest.InputFormatDetails.JSON,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        true,
+        ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+        Collections.emptyMap(),
+        new Pair<>(false, false)
+    );
   }
 
   @Test
-  public void testOrcInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
+  public void testParquetInputFormatIndexDataIngestionSpecSchemaless() throws 
Exception
   {
-    doIndexTest(InputFormatDetails.ORC, new Pair<>(false, false));
+    doIndexTest(
+        AbstractITBatchIndexTest.InputFormatDetails.PARQUET,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        true,
+        ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+        Collections.emptyMap(),
+        new Pair<>(false, false)
+    );
   }
 
   @Test
-  public void testCsvInputFormatIndexDataIngestionSpecWithSchema() throws 
Exception
+  public void testOrcInputFormatIndexDataIngestionSpecSchemaless() throws 
Exception
   {
-    doIndexTest(InputFormatDetails.CSV, 
ImmutableMap.of("findColumnsFromHeader", true), new Pair<>(false, false));
+    doIndexTest(
+        AbstractITBatchIndexTest.InputFormatDetails.ORC,
+        INDEX_TASK,
+        INDEX_QUERIES_RESOURCE,
+        true,
+        ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true),
+        Collections.emptyMap(),
+        new Pair<>(false, false)
+    );
   }
 }
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
index 8482952db2..641d385865 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.testsEx.indexer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.Pair;
-import 
org.apache.druid.tests.indexer.AbstractLocalInputSourceParallelIndexTest;
 import org.apache.druid.testsEx.categories.InputFormat;
 import org.apache.druid.testsEx.config.DruidTestRunner;
 import org.junit.Test;
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
index 73fc73d42d..4f19a226d9 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java
@@ -159,6 +159,7 @@ public class ITOverwriteBatchIndexTest extends 
AbstractITBatchIndexTest
         null,
         false,
         false,
+        false,
         true,
         new Pair<>(false, false)
     );
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
index 735bc0a50f..60d22f3f9f 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java
@@ -32,11 +32,11 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.TestClient;
 import org.apache.druid.testing.utils.SqlTestQueryHelper;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
 import org.apache.druid.testsEx.categories.HighAvailability;
 import org.apache.druid.testsEx.cluster.DruidClusterClient;
 import org.apache.druid.testsEx.config.DruidTestRunner;
 import org.apache.druid.testsEx.config.Initializer;
+import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
 import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
diff --git 
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
index 928effe65e..62c3ac6371 100644
--- 
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
+++ 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json
@@ -1,6 +1,6 @@
 [
     {
-        "description": "timeseries, 1 agg, all",
+        "description": "timeboundary",
         "query":{
             "queryType" : "timeBoundary",
             "dataSource": "%%DATASOURCE%%"
diff --git 
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
new file mode 100644
index 0000000000..316ec5fd8a
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json
@@ -0,0 +1,32 @@
+[
+  {
+    "description": "select all things",
+    "query": {
+      "query": "SELECT 
\"__time\",\"continent\",\"country\",\"city\",\"added\",\"unpatrolled\",\"delta\",\"language\",\"robot\",\"deleted\",\"newPage\",\"namespace\",\"anonymous\",\"page\",\"region\",\"user\"
 FROM \"%%DATASOURCE%%\" ORDER BY __time"
+    },
+    "expectedResults": [
+      {"__time":"2013-08-31T01:02:33.000Z","continent":"North 
America","country":"United States","city":"San 
Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy
 Danger","region":"Bay Area","user":"nuclear"},
+      
{"__time":"2013-08-31T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker
 Eureka","region":"Cantebury","user":"speed"},
+      
{"__time":"2013-08-31T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno
 Alpha","region":"Oblast","user":"masterYi"},
+      
{"__time":"2013-08-31T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson
 Typhoon","region":"Shanxi","user":"triplets"},
+      
{"__time":"2013-08-31T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote
 Tango","region":"Kanto","user":"stringer"},
+      {"__time":"2013-09-01T01:02:33.000Z","continent":"North 
America","country":"United States","city":"San 
Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy
 Danger","region":"Bay Area","user":"nuclear"},
+      
{"__time":"2013-09-01T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker
 Eureka","region":"Cantebury","user":"speed"},
+      
{"__time":"2013-09-01T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno
 Alpha","region":"Oblast","user":"masterYi"},
+      
{"__time":"2013-09-01T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson
 Typhoon","region":"Shanxi","user":"triplets"},
+      
{"__time":"2013-09-01T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote
 Tango","region":"Kanto","user":"stringer"}
+    ]
+  },
+  {
+    "description": "simple group by",
+    "query": {
+      "query": "SELECT page, SUM(added) as added FROM \"%%DATASOURCE%%\" WHERE 
continent = 'Asia' GROUP BY 1 ORDER BY 2 DESC"
+    },
+    "expectedResults": [
+      {"page":"Crimson Typhoon","added":1810},
+      {"page":"Cherno Alpha","added":246},
+      {"page":"Coyote Tango","added":2}
+    ]
+  }
+
+]
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
new file mode 100644
index 0000000000..bc96c730ee
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json
@@ -0,0 +1,42 @@
+{
+  "type": "index_parallel",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "timestampSpec": {
+        "column": "timestamp"
+      },
+      "dimensionsSpec": {
+        "dimensions": [],
+        "useNestedColumnIndexerForSchemaDiscovery": 
%%USE_NESTED_COLUMN_INDEXER%%
+      },
+      "metricsSpec": [],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "second",
+        "intervals" : [ "2013-08-31/2013-09-02" ]
+      }
+    },
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "local",
+        "filter" : "%%INPUT_SOURCE_FILTER%%",
+        "baseDir": "%%INPUT_SOURCE_BASE_DIR%%"
+      },
+      "appendToExisting": %%APPEND_TO_EXISTING%%,
+      "dropExisting": %%DROP_EXISTING%%,
+      "inputFormat": %%INPUT_FORMAT%%
+    },
+    "tuningConfig": {
+      "type": "index_parallel",
+      "maxNumConcurrentSubTasks": 4,
+      "splitHintSpec": {
+        "type": "maxSize",
+        "maxNumFiles": 1
+      },
+      "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
+      "partitionsSpec": %%PARTITIONS_SPEC%%
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 887c931204..1269fe1e6b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -42,8 +42,6 @@ public abstract class AppendableIndexBuilder
   protected boolean preserveExistingMetrics = false;
   protected boolean useMaxMemoryEstimates = true;
 
-  protected boolean useNestedColumnIndexerForSchemaDiscovery = false;
-
   protected final Logger log = new Logger(this.getClass());
 
   public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema 
incrementalIndexSchema)
@@ -132,14 +130,6 @@ public abstract class AppendableIndexBuilder
     return this;
   }
 
-  public AppendableIndexBuilder setUseNestedColumnIndexerForSchemaDiscovery(
-      boolean useNestedColumnIndexerForSchemaDiscovery
-  )
-  {
-    this.useNestedColumnIndexerForSchemaDiscovery = 
useNestedColumnIndexerForSchemaDiscovery;
-    return this;
-  }
-
   public void validate()
   {
     if (maxRowCount <= 0) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
index 3af6bfa4cb..67cdabdf56 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
@@ -32,7 +32,4 @@ public interface AppendableIndexSpec
 
   // Returns the default max bytes in memory for this index.
   long getDefaultMaxBytesInMemory();
-
-  @SuppressWarnings("unused")
-  boolean useNestedColumnIndexerForSchemaDiscovery();
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index 0085b58859..ddb52c309c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -276,8 +276,7 @@ public abstract class IncrementalIndex extends 
AbstractIndex implements Iterable
       final boolean deserializeComplexMetrics,
       final boolean concurrentEventAdd,
       final boolean preserveExistingMetrics,
-      final boolean useMaxMemoryEstimates,
-      final boolean useNestedColumnIndexerForSchemaDiscovery
+      final boolean useMaxMemoryEstimates
   )
   {
     this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
@@ -289,7 +288,8 @@ public abstract class IncrementalIndex extends 
AbstractIndex implements Iterable
     this.deserializeComplexMetrics = deserializeComplexMetrics;
     this.preserveExistingMetrics = preserveExistingMetrics;
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
-    this.useNestedColumnIndexerForSchemaDiscovery = 
useNestedColumnIndexerForSchemaDiscovery;
+    this.useNestedColumnIndexerForSchemaDiscovery = 
incrementalIndexSchema.getDimensionsSpec()
+                                                                          
.useNestedColumnIndexerForSchemaDiscovery();
 
     this.timeAndMetricsColumnCapabilities = new HashMap<>();
     this.metricDescs = Maps.newLinkedHashMap();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 1912136a07..35ccd8961d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -126,11 +126,16 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       // preserveExistingMetrics should only be set true for DruidInputSource 
since that is the only case where we can have existing metrics
       // This is currently only use by auto compaction and should not be use 
for anything else.
       boolean preserveExistingMetrics,
-      boolean useMaxMemoryEstimates,
-      boolean useNestedColumnIndexerSchemaDiscovery
+      boolean useMaxMemoryEstimates
   )
   {
-    super(incrementalIndexSchema, deserializeComplexMetrics, 
concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates, 
useNestedColumnIndexerSchemaDiscovery);
+    super(
+        incrementalIndexSchema,
+        deserializeComplexMetrics,
+        concurrentEventAdd,
+        preserveExistingMetrics,
+        useMaxMemoryEstimates
+    );
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : 
maxBytesInMemory;
     this.facts = incrementalIndexSchema.isRollup() ? new 
RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -657,8 +662,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
           maxRowCount,
           maxBytesInMemory,
           preserveExistingMetrics,
-          useMaxMemoryEstimates,
-          useNestedColumnIndexerForSchemaDiscovery
+          useMaxMemoryEstimates
       );
     }
   }
@@ -666,7 +670,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   public static class Spec implements AppendableIndexSpec
   {
     private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false;
-    private static final boolean 
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY = false;
     public static final String TYPE = "onheap";
 
     // When set to true, for any row that already has metric (with the same 
name defined in metricSpec),
@@ -676,26 +679,19 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
     // This is currently only use by auto compaction and should not be use for 
anything else.
     final boolean preserveExistingMetrics;
 
-    final boolean useNestedColumnIndexerForSchemaDiscovery;
-
     public Spec()
     {
       this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS;
-      this.useNestedColumnIndexerForSchemaDiscovery = 
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY;
     }
 
     @JsonCreator
     public Spec(
-        final @JsonProperty("preserveExistingMetrics") @Nullable Boolean 
preserveExistingMetrics,
-        final @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") 
@Nullable Boolean useNestedColumnIndexerForSchemaDiscovery
+        final @JsonProperty("preserveExistingMetrics") @Nullable Boolean 
preserveExistingMetrics
     )
     {
       this.preserveExistingMetrics = preserveExistingMetrics != null
                                      ? preserveExistingMetrics
                                      : DEFAULT_PRESERVE_EXISTING_METRICS;
-      this.useNestedColumnIndexerForSchemaDiscovery = 
useNestedColumnIndexerForSchemaDiscovery != null
-                                                      ? 
useNestedColumnIndexerForSchemaDiscovery
-                                                      : 
DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY;
     }
 
     @JsonProperty
@@ -707,8 +703,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     @Override
     public AppendableIndexBuilder builder()
     {
-      return new Builder().setPreserveExistingMetrics(preserveExistingMetrics)
-                          
.setUseNestedColumnIndexerForSchemaDiscovery(useNestedColumnIndexerForSchemaDiscovery);
+      return new Builder().setPreserveExistingMetrics(preserveExistingMetrics);
     }
 
     @Override
@@ -720,13 +715,6 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
       return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
     }
 
-    @JsonProperty
-    @Override
-    public boolean useNestedColumnIndexerForSchemaDiscovery()
-    {
-      return useNestedColumnIndexerForSchemaDiscovery;
-    }
-
     @Override
     public boolean equals(Object o)
     {
@@ -737,14 +725,13 @@ public class OnheapIncrementalIndex extends 
IncrementalIndex
         return false;
       }
       Spec spec = (Spec) o;
-      return preserveExistingMetrics == spec.preserveExistingMetrics &&
-             useNestedColumnIndexerForSchemaDiscovery == 
spec.useNestedColumnIndexerForSchemaDiscovery;
+      return preserveExistingMetrics == spec.preserveExistingMetrics;
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(preserveExistingMetrics, 
useNestedColumnIndexerForSchemaDiscovery);
+      return Objects.hash(preserveExistingMetrics);
     }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
index 54bfe1dcb5..e5a1c41bc4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java
@@ -46,7 +46,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
@@ -548,13 +547,12 @@ public class NestedDataColumnIndexerTest extends 
InitializedNullHandlingTest
                 new TimestampSpec(TIME_COL, "millis", null),
                 Granularities.NONE,
                 VirtualColumns.EMPTY,
-                new DimensionsSpec(Collections.emptyList()),
+                
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
                 new AggregatorFactory[0],
                 false
             )
         )
         .setMaxRowCount(1000)
-        .setUseNestedColumnIndexerForSchemaDiscovery(true)
         .build();
     return index;
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index dad2a4c213..02d43e2750 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -125,8 +125,7 @@ public class OnheapIncrementalIndexBenchmark extends 
AbstractBenchmark
           maxRowCount,
           maxBytesInMemory,
           false,
-          true,
-          false
+          true
       );
     }
 
@@ -150,8 +149,7 @@ public class OnheapIncrementalIndexBenchmark extends 
AbstractBenchmark
           maxRowCount,
           maxBytesInMemory,
           false,
-          true,
-          false
+          true
       );
     }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
similarity index 51%
copy from 
processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
copy to 
processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
index 3af6bfa4cb..07ed6f7502 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
@@ -19,20 +19,28 @@
 
 package org.apache.druid.segment.incremental;
 
-import org.apache.druid.guice.annotations.UnstableApi;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
 
-/**
- * AppendableIndexSpec describes the in-memory indexing method for data 
ingestion.
- */
-@UnstableApi
-public interface AppendableIndexSpec
+public class OnheapIncrementalIndexTest
 {
-  // Returns a builder of the appendable index.
-  AppendableIndexBuilder builder();
-
-  // Returns the default max bytes in memory for this index.
-  long getDefaultMaxBytesInMemory();
+  private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
 
-  @SuppressWarnings("unused")
-  boolean useNestedColumnIndexerForSchemaDiscovery();
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    OnheapIncrementalIndex.Spec spec = new OnheapIncrementalIndex.Spec(true);
+    Assert.assertEquals(spec, 
MAPPER.readValue(MAPPER.writeValueAsString(spec), 
OnheapIncrementalIndex.Spec.class));
+  }
+  @Test
+  public void testSpecEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(OnheapIncrementalIndex.Spec.class)
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index ad8827f2a9..7b1a7c5468 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -88,7 +88,7 @@ public class ClientCompactionTaskQueryTuningConfig
     if (userCompactionTaskQueryTuningConfig == null) {
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
-          new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false),
+          new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
           null,
           null,
           null,
@@ -111,7 +111,7 @@ public class ClientCompactionTaskQueryTuningConfig
     } else {
       AppendableIndexSpec appendableIndexSpecToUse = 
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
                                                      ? 
userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
-                                                     : new 
OnheapIncrementalIndex.Spec(preserveExistingMetrics, false);
+                                                     : new 
OnheapIncrementalIndex.Spec(preserveExistingMetrics);
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
           appendableIndexSpecToUse,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 4a48f3280c..a0f4cf9a61 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -259,7 +259,7 @@ public class DataSourceCompactionConfigTest extends 
InitializedNullHandlingTest
   {
     final UserCompactionTaskQueryTuningConfig tuningConfig = new 
UserCompactionTaskQueryTuningConfig(
         40000,
-        new OnheapIncrementalIndex.Spec(true, false),
+        new OnheapIncrementalIndex.Spec(true),
         2000L,
         null,
         new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
index 9473287229..01c889ad2c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
@@ -78,7 +78,7 @@ public class UserCompactionTaskQueryTuningConfigTest
   {
     final UserCompactionTaskQueryTuningConfig tuningConfig = new 
UserCompactionTaskQueryTuningConfig(
         40000,
-        new OnheapIncrementalIndex.Spec(true, false),
+        new OnheapIncrementalIndex.Spec(true),
         2000L,
         null,
         new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index b1f3ad9952..73abcb8d5e 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -1523,7 +1523,7 @@ public class NewestSegmentFirstPolicyTest
             null,
             new UserCompactionTaskQueryTuningConfig(
                 null,
-                new OnheapIncrementalIndex.Spec(true, false),
+                new OnheapIncrementalIndex.Spec(true),
                 null,
                 1000L,
                 null,
@@ -1558,7 +1558,7 @@ public class NewestSegmentFirstPolicyTest
             null,
             new UserCompactionTaskQueryTuningConfig(
                 null,
-                new OnheapIncrementalIndex.Spec(false, false),
+                new OnheapIncrementalIndex.Spec(false),
                 null,
                 1000L,
                 null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to