This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e5625f3ae Fix indexMerger to respect the includeAllDimensions flag 
(#12428)
5e5625f3ae is described below

commit 5e5625f3ae9fa663697214f49e66b5c296d342fb
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Apr 13 12:43:11 2022 -0700

    Fix indexMerger to respect the includeAllDimensions flag (#12428)
    
    * Fix indexMerger to respect flag includeAllDimensions flag; 
jsonInputFormat should set keepNullColumns if useFieldDiscovery is set
    
    * address comments
---
 .../druid/data/input/impl/JsonInputFormat.java     |  12 +-
 .../druid/data/input/impl/JsonInputFormatTest.java |  37 +-
 ...ltiPhaseParallelIndexingWithNullColumnTest.java | 201 ----------
 ...ltiPhaseParallelIndexingWithNullColumnTest.java | 435 +++++++++++++++++++++
 ...ngePartitionMultiPhaseParallelIndexingTest.java | 122 ------
 .../org/apache/druid/segment/IndexMergerV9.java    |  51 ++-
 6 files changed, 517 insertions(+), 341 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java 
b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 8a9a3fd529..0ce4cbfa8b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -74,7 +74,11 @@ public class JsonInputFormat extends NestedInputFormat
     super(flattenSpec);
     this.featureSpec = featureSpec == null ? Collections.emptyMap() : 
featureSpec;
     this.objectMapper = new ObjectMapper();
-    this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns;
+    if (keepNullColumns != null) {
+      this.keepNullColumns = keepNullColumns;
+    } else {
+      this.keepNullColumns = flattenSpec != null && 
flattenSpec.isUseFieldDiscovery();
+    }
     for (Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
       Feature feature = Feature.valueOf(entry.getKey());
       objectMapper.configure(feature, entry.getValue());
@@ -88,6 +92,12 @@ public class JsonInputFormat extends NestedInputFormat
     return featureSpec;
   }
 
+  @JsonProperty
+  public boolean isKeepNullColumns()
+  {
+    return keepNullColumns;
+  }
+
   @Override
   public boolean isSplittable()
   {
diff --git 
a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java 
b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
index aa9c95a7e4..0577dd72fb 100644
--- 
a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
+++ 
b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
@@ -41,7 +41,7 @@ public class JsonInputFormatTest
     final ObjectMapper mapper = new ObjectMapper();
     final JsonInputFormat format = new JsonInputFormat(
         new JSONPathSpec(
-            false,
+            true,
             ImmutableList.of(
                 new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", 
"baz"),
                 new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", 
"baz2"),
@@ -52,7 +52,7 @@ public class JsonInputFormatTest
             )
         ),
         ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, 
Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
-        false
+        true
     );
     final byte[] bytes = mapper.writeValueAsBytes(format);
     final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, 
InputFormat.class);
@@ -72,4 +72,37 @@ public class JsonInputFormatTest
               .withIgnoredFields("objectMapper")
               .verify();
   }
+
+  @Test
+  public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault()
+  {
+    final JsonInputFormat format = new JsonInputFormat(
+        new JSONPathSpec(false, null),
+        null,
+        null
+    );
+    Assert.assertFalse(format.isKeepNullColumns());
+  }
+
+  @Test
+  public void testUseFieldDiscovery_setKeepNullColumnsByDefault()
+  {
+    final JsonInputFormat format = new JsonInputFormat(
+        new JSONPathSpec(true, null),
+        null,
+        null
+    );
+    Assert.assertTrue(format.isKeepNullColumns());
+  }
+
+  @Test
+  public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets()
+  {
+    final JsonInputFormat format = new JsonInputFormat(
+        new JSONPathSpec(true, null),
+        null,
+        false
+    );
+    Assert.assertFalse(format.isKeepNullColumns());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
deleted file mode 100644
index 1c8669b909..0000000000
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.data.input.impl.JsonInputFormat;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.task.Tasks;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HashPartitionMultiPhaseParallelIndexingWithNullColumnTest extends 
AbstractMultiPhaseParallelIndexingTest
-{
-  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", 
"auto", null);
-  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
-      DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
-  );
-  private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, 
null, null);
-  private static final List<Interval> INTERVAL_TO_INDEX = 
Collections.singletonList(Intervals.of("2022-01/P1M"));
-
-  public HashPartitionMultiPhaseParallelIndexingWithNullColumnTest()
-  {
-    super(LockGranularity.TIME_CHUNK, true, 0., 0.);
-  }
-
-  @Test
-  public void testIngestNullColumn() throws JsonProcessingException
-  {
-    final List<DimensionSchema> dimensionSchemas = 
DimensionsSpec.getDefaultSchemas(
-        Arrays.asList("ts", "unknownDim")
-    );
-    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
-        null,
-        null,
-        null,
-        new ParallelIndexIngestionSpec(
-            new DataSchema(
-                DATASOURCE,
-                TIMESTAMP_SPEC,
-                DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
-                DEFAULT_METRICS_SPEC,
-                new UniformGranularitySpec(
-                    Granularities.DAY,
-                    Granularities.MINUTE,
-                    INTERVAL_TO_INDEX
-                ),
-                null
-            ),
-            new ParallelIndexIOConfig(
-                null,
-                getInputSource(),
-                JSON_FORMAT,
-                false,
-                null
-            ),
-            newTuningConfig(
-                new HashedPartitionsSpec(
-                    10,
-                    null,
-                    ImmutableList.of("ts", "unknownDim")
-                ),
-                2,
-                true
-            )
-        ),
-        null
-    );
-
-    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
-    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
-    Assert.assertFalse(segments.isEmpty());
-    for (DataSegment segment : segments) {
-      for (int i = 0; i < dimensionSchemas.size(); i++) {
-        Assert.assertEquals(dimensionSchemas.get(i).getName(), 
segment.getDimensions().get(i));
-      }
-    }
-  }
-
-  @Test
-  public void 
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws 
JsonProcessingException
-  {
-    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
-        null,
-        null,
-        null,
-        new ParallelIndexIngestionSpec(
-            new DataSchema(
-                DATASOURCE,
-                TIMESTAMP_SPEC,
-                DIMENSIONS_SPEC.withDimensions(
-                    DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", 
"unknownDim"))
-                ),
-                DEFAULT_METRICS_SPEC,
-                new UniformGranularitySpec(
-                    Granularities.DAY,
-                    Granularities.MINUTE,
-                    INTERVAL_TO_INDEX
-                ),
-                null
-            ),
-            new ParallelIndexIOConfig(
-                null,
-                getInputSource(),
-                JSON_FORMAT,
-                false,
-                null
-            ),
-            newTuningConfig(
-                new HashedPartitionsSpec(
-                    10,
-                    null,
-                    ImmutableList.of("ts", "unknownDim")
-                ),
-                2,
-                true
-            )
-        ),
-        null
-    );
-
-    task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
-    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
-    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
-    Assert.assertFalse(segments.isEmpty());
-    for (DataSegment segment : segments) {
-      Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
-    }
-  }
-
-  private InputSource getInputSource() throws JsonProcessingException
-  {
-    final ObjectMapper mapper = getObjectMapper();
-    final List<Map<String, Object>> rows = ImmutableList.of(
-        ImmutableMap.of(
-            "ts", "2022-01-01",
-            "dim1", "val1",
-            "dim2", "val11"
-        ),
-        ImmutableMap.of(
-            "ts", "2022-01-02",
-            "dim1", "val2",
-            "dim2", "val12"
-        ),
-        ImmutableMap.of(
-            "ts", "2022-01-03",
-            "dim1", "val3",
-            "dim2", "val13"
-        )
-    );
-    final String data = StringUtils.format(
-        "%s\n%s\n%s\n",
-        mapper.writeValueAsString(rows.get(0)),
-        mapper.writeValueAsString(rows.get(1)),
-        mapper.writeValueAsString(rows.get(2))
-    );
-    return new InlineInputSource(data);
-  }
-}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
new file mode 100644
index 0000000000..2253d9ac33
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputEntityIteratingReader;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+@RunWith(Parameterized.class)
+public class MultiPhaseParallelIndexingWithNullColumnTest extends 
AbstractMultiPhaseParallelIndexingTest
+{
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", 
"auto", null);
+  private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+      DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+  );
+  private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, 
null, null);
+  private static final List<Interval> INTERVAL_TO_INDEX = 
Collections.singletonList(Intervals.of("2022-01/P1M"));
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+        new Object[]{
+            new HashedPartitionsSpec(
+                10,
+                null,
+                ImmutableList.of("ts", "unknownDim")
+            )
+        },
+        new Object[]{
+            new DimensionRangePartitionsSpec(
+                10,
+                null,
+                Collections.singletonList("unknownDim"),
+                false
+            )
+        }
+    );
+  }
+
+  private final PartitionsSpec partitionsSpec;
+
+  public MultiPhaseParallelIndexingWithNullColumnTest(PartitionsSpec 
partitionsSpec)
+  {
+    super(LockGranularity.TIME_CHUNK, true, 0., 0.);
+    this.partitionsSpec = partitionsSpec;
+    getObjectMapper().registerSubtypes(SplittableInlineDataSource.class);
+  }
+
+  @Test
+  public void testIngestNullColumn() throws JsonProcessingException
+  {
+    final List<DimensionSchema> dimensionSchemas = 
DimensionsSpec.getDefaultSchemas(
+        Arrays.asList("ts", "unknownDim")
+    );
+    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        null,
+        null,
+        null,
+        new ParallelIndexIngestionSpec(
+            new DataSchema(
+                DATASOURCE,
+                TIMESTAMP_SPEC,
+                DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
+                DEFAULT_METRICS_SPEC,
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.MINUTE,
+                    INTERVAL_TO_INDEX
+                ),
+                null
+            ),
+            new ParallelIndexIOConfig(
+                null,
+                getInputSource(),
+                JSON_FORMAT,
+                false,
+                null
+            ),
+            newTuningConfig(
+                partitionsSpec,
+                2,
+                true
+            )
+        ),
+        null
+    );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
+    Assert.assertFalse(segments.isEmpty());
+    for (DataSegment segment : segments) {
+      Assert.assertEquals(dimensionSchemas.size(), 
segment.getDimensions().size());
+      for (int i = 0; i < dimensionSchemas.size(); i++) {
+        Assert.assertEquals(dimensionSchemas.get(i).getName(), 
segment.getDimensions().get(i));
+      }
+    }
+  }
+
+  @Test
+  public void 
testIngestNullColumn_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns()
 throws JsonProcessingException
+  {
+    final List<DimensionSchema> dimensionSchemas = 
DimensionsSpec.getDefaultSchemas(
+        Arrays.asList("ts", "unknownDim", "dim1")
+    );
+    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        null,
+        null,
+        null,
+        new ParallelIndexIngestionSpec(
+            new DataSchema(
+                DATASOURCE,
+                TIMESTAMP_SPEC,
+                new 
DimensionsSpec.Builder().setDimensions(dimensionSchemas).setIncludeAllDimensions(true).build(),
+                DEFAULT_METRICS_SPEC,
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.MINUTE,
+                    INTERVAL_TO_INDEX
+                ),
+                null
+            ),
+            new ParallelIndexIOConfig(
+                null,
+                getInputSource(),
+                new JsonInputFormat(
+                    new JSONPathSpec(true, null),
+                    null,
+                    null
+                ),
+                false,
+                null
+            ),
+            newTuningConfig(
+                partitionsSpec,
+                2,
+                true
+            )
+        ),
+        null
+    );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
+    Assert.assertFalse(segments.isEmpty());
+    final List<String> expectedExplicitDimensions = ImmutableList.of("ts", 
"unknownDim", "dim1");
+    final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2", 
"dim3");
+    for (DataSegment segment : segments) {
+      Assert.assertEquals(
+          expectedExplicitDimensions,
+          segment.getDimensions().subList(0, expectedExplicitDimensions.size())
+      );
+      Assert.assertEquals(
+          expectedImplicitDimensions,
+          new 
HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), 
segment.getDimensions().size()))
+      );
+    }
+  }
+
+  @Test
+  public void 
testIngestNullColumn_explicitPathSpec_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns()
+      throws JsonProcessingException
+  {
+    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        null,
+        null,
+        null,
+        new ParallelIndexIngestionSpec(
+            new DataSchema(
+                DATASOURCE,
+                TIMESTAMP_SPEC,
+                new 
DimensionsSpec.Builder().setIncludeAllDimensions(true).build(),
+                DEFAULT_METRICS_SPEC,
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.MINUTE,
+                    null
+                ),
+                null
+            ),
+            new ParallelIndexIOConfig(
+                null,
+                getInputSource(),
+                new JsonInputFormat(
+                    new JSONPathSpec(
+                        true,
+                        ImmutableList.of(
+                            new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"dim1", "$.dim1"),
+                            new JSONPathFieldSpec(JSONPathFieldType.PATH, "k", 
"$.dim4.k")
+                        )
+                    ),
+                    null,
+                    null
+                ),
+                false,
+                null
+            ),
+            newTuningConfig(
+                partitionsSpec,
+                2,
+                true
+            )
+        ),
+        null
+    );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
+    Assert.assertFalse(segments.isEmpty());
+    final List<String> expectedExplicitDimensions = ImmutableList.of("dim1", 
"k");
+    final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2", 
"dim3");
+    for (DataSegment segment : segments) {
+      Assert.assertEquals(
+          expectedExplicitDimensions,
+          segment.getDimensions().subList(0, expectedExplicitDimensions.size())
+      );
+      Assert.assertEquals(
+          expectedImplicitDimensions,
+          new 
HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(), 
segment.getDimensions().size()))
+      );
+    }
+
+  }
+
+  @Test
+  public void 
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws 
JsonProcessingException
+  {
+    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        null,
+        null,
+        null,
+        new ParallelIndexIngestionSpec(
+            new DataSchema(
+                DATASOURCE,
+                TIMESTAMP_SPEC,
+                DIMENSIONS_SPEC.withDimensions(
+                    DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", 
"unknownDim"))
+                ),
+                DEFAULT_METRICS_SPEC,
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.MINUTE,
+                    INTERVAL_TO_INDEX
+                ),
+                null
+            ),
+            new ParallelIndexIOConfig(
+                null,
+                getInputSource(),
+                JSON_FORMAT,
+                false,
+                null
+            ),
+            newTuningConfig(
+                partitionsSpec,
+                2,
+                true
+            )
+        ),
+        null
+    );
+
+    task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
+    Assert.assertFalse(segments.isEmpty());
+    final List<DimensionSchema> expectedDimensions = 
DimensionsSpec.getDefaultSchemas(
+        Collections.singletonList("ts")
+    );
+    for (DataSegment segment : segments) {
+      Assert.assertEquals(expectedDimensions.size(), 
segment.getDimensions().size());
+      for (int i = 0; i < expectedDimensions.size(); i++) {
+        Assert.assertEquals(expectedDimensions.get(i).getName(), 
segment.getDimensions().get(i));
+      }
+    }
+  }
+
+  private InputSource getInputSource() throws JsonProcessingException
+  {
+    final ObjectMapper mapper = getObjectMapper();
+    final List<Map<String, Object>> rows = new ArrayList<>();
+    Map<String, Object> row;
+    for (int i = 0; i < 3; i++) {
+      rows.add(row(StringUtils.format("2022-01-%02d", i + 1), "val1", "val2", 
null));
+    }
+    rows.add(row("2022-01-04", null, null, null, ImmutableMap.of("k", "v")));
+    final String data = StringUtils.format(
+        "%s\n%s\n%s\n%s\n",
+        mapper.writeValueAsString(rows.get(0)),
+        mapper.writeValueAsString(rows.get(1)),
+        mapper.writeValueAsString(rows.get(2)),
+        mapper.writeValueAsString(rows.get(3))
+    );
+
+    return new SplittableInlineDataSource(ImmutableList.of(data));
+  }
+
+  private static Map<String, Object> row(String timestamp, Object... dims)
+  {
+    Map<String, Object> row = new HashMap<>();
+    row.put("ts", timestamp);
+    IntStream.range(0, dims.length).forEach(i -> row.put("dim" + (i + 1), 
dims[i]));
+    return row;
+  }
+
+  /**
+   * Splittable inlineDataSource to run tests with range partitioning which 
requires the inputSource to be splittable.
+   */
+  private static final class SplittableInlineDataSource implements 
SplittableInputSource<String>
+  {
+    private final List<String> data;
+
+    @JsonCreator
+    public SplittableInlineDataSource(@JsonProperty("data") List<String> data)
+    {
+      this.data = data;
+    }
+
+    @JsonProperty
+    public List<String> getData()
+    {
+      return data;
+    }
+
+    @Override
+    public Stream<InputSplit<String>> createSplits(InputFormat inputFormat, 
@Nullable SplitHintSpec splitHintSpec)
+    {
+      return data.stream().map(InputSplit::new);
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+    {
+      return data.size();
+    }
+
+    @Override
+    public InputSource withSplit(InputSplit<String> split)
+    {
+      return new SplittableInlineDataSource(ImmutableList.of(split.get()));
+    }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return true;
+    }
+
+    @Override
+    public InputSourceReader reader(
+        InputRowSchema inputRowSchema,
+        @Nullable InputFormat inputFormat,
+        File temporaryDirectory
+    )
+    {
+      return new InputEntityIteratingReader(
+          inputRowSchema,
+          inputFormat,
+          data.stream().map(str -> new 
ByteEntity(StringUtils.toUtf8(str))).iterator(),
+          temporaryDirectory
+      );
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 4314a342cb..759d70fe2f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -29,9 +29,7 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
@@ -40,13 +38,9 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.scan.ScanResultValue;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -388,122 +382,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
     }
   }
 
-  @Test
-  public void testIngestNullColumn()
-  {
-    // storeEmptyColumns flag should do nothing with using inputFormat or 
multiValueDim
-    if (!isUseInputFormatApi() || useMultivalueDim) {
-      return;
-    }
-    int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / 
NUM_PARTITION;
-    final List<DimensionSchema> dimensionSchemas = 
DimensionsSpec.getDefaultSchemas(
-        Arrays.asList("ts", "unknownDim")
-    );
-    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
-        null,
-        null,
-        null,
-        new ParallelIndexIngestionSpec(
-            new DataSchema(
-                DATASOURCE,
-                TIMESTAMP_SPEC,
-                DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
-                DEFAULT_METRICS_SPEC,
-                new UniformGranularitySpec(
-                    Granularities.DAY,
-                    Granularities.MINUTE,
-                    intervalToIndex == null ? null : 
Collections.singletonList(intervalToIndex)
-                ),
-                null
-            ),
-            new ParallelIndexIOConfig(
-                null,
-                new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
-                DEFAULT_INPUT_FORMAT,
-                false,
-                null
-            ),
-            newTuningConfig(
-                new DimensionRangePartitionsSpec(
-                    targetRowsPerSegment,
-                    null,
-                    Collections.singletonList("unknownDim"),
-                    false
-                ),
-                maxNumConcurrentSubTasks,
-                true
-            )
-        ),
-        null
-    );
-
-    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
-    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
-    for (DataSegment segment : segments) {
-      for (int i = 0; i < dimensionSchemas.size(); i++) {
-        Assert.assertEquals(dimensionSchemas.get(i).getName(), 
segment.getDimensions().get(i));
-      }
-    }
-  }
-
-  @Test
-  public void 
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns()
-  {
-    // storeEmptyColumns flag should do nothing with using inputFormat or 
multiValueDim
-    if (!isUseInputFormatApi() || useMultivalueDim) {
-      return;
-    }
-    int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / 
NUM_PARTITION;
-    ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
-        null,
-        null,
-        null,
-        new ParallelIndexIngestionSpec(
-            new DataSchema(
-                DATASOURCE,
-                TIMESTAMP_SPEC,
-                DIMENSIONS_SPEC.withDimensions(
-                    DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", 
"unknownDim"))
-                ),
-                DEFAULT_METRICS_SPEC,
-                new UniformGranularitySpec(
-                    Granularities.DAY,
-                    Granularities.MINUTE,
-                    intervalToIndex == null ? null : 
Collections.singletonList(intervalToIndex)
-                ),
-                null
-            ),
-            new ParallelIndexIOConfig(
-                null,
-                new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
-                DEFAULT_INPUT_FORMAT,
-                false,
-                null
-            ),
-            newTuningConfig(
-                new DimensionRangePartitionsSpec(
-                    targetRowsPerSegment,
-                    null,
-                    Collections.singletonList("unknownDim"),
-                    false
-                ),
-                maxNumConcurrentSubTasks,
-                true
-            )
-        ),
-        null
-    );
-
-    task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
-    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
-    Set<DataSegment> segments = 
getIndexingServiceClient().getPublishedSegments(task);
-    for (DataSegment segment : segments) {
-      Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
-    }
-  }
-
   private ParallelIndexSupervisorTask runTestTask(
       PartitionsSpec partitionsSpec,
       File inputDirectory,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 03018c752d..daa4696371 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -205,8 +205,7 @@ public class IndexMergerV9 implements IndexMerger
       final File outDir,
       final ProgressIndicator progress,
       final List<String> mergedDimensions, // should have both explicit and 
implicit dimensions
-      // a subset of mergedDimensions that are explicitly specified in 
DimensionsSpec
-      final Set<String> explicitDimensions,
+      final DimensionsSpecInspector dimensionsSpecInspector,
       final List<String> mergedMetrics,
       final Function<List<TransformableRowIterator>, TimeAndDimsIterator> 
rowMergerFn,
       final boolean fillRowNumConversions,
@@ -333,7 +332,7 @@ public class IndexMergerV9 implements IndexMerger
         if (!merger.hasOnlyNulls()) {
           ColumnDescriptor columnDesc = merger.makeColumnDescriptor();
           makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc);
-        } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
+        } else if 
(dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
           // shouldStore AND hasOnlyNulls
           ColumnDescriptor columnDesc = ColumnDescriptor
               .builder()
@@ -357,7 +356,7 @@ public class IndexMergerV9 implements IndexMerger
           progress,
           indexSpec,
           mergers,
-          explicitDimensions
+          dimensionsSpecInspector
       );
       makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
 
@@ -374,14 +373,6 @@ public class IndexMergerV9 implements IndexMerger
     }
   }
 
-  private boolean shouldStore(
-      Set<String> explicitDimensions,
-      String dimension
-  )
-  {
-    return storeEmptyColumns && explicitDimensions.contains(dimension);
-  }
-
   private void makeMetadataBinary(
       final FileSmoosher v9Smoosher,
       final ProgressIndicator progress,
@@ -404,7 +395,7 @@ public class IndexMergerV9 implements IndexMerger
       final ProgressIndicator progress,
       final IndexSpec indexSpec,
       final List<DimensionMergerV9> mergers,
-      final Set<String> explicitDimensions
+      final DimensionsSpecInspector dimensionsSpecInspector
   ) throws IOException
   {
     final Set<String> columnSet = new HashSet<>(mergedDimensions);
@@ -439,7 +430,7 @@ public class IndexMergerV9 implements IndexMerger
         nonNullOnlyColumns.add(mergedDimensions.get(i));
         allDimensions.add(null);
         allColumns.add(null);
-      } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
+      } else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) 
{
         // shouldStore AND hasOnlyNulls
         allDimensions.add(mergedDimensions.get(i));
         allColumns.add(mergedDimensions.get(i));
@@ -1315,7 +1306,7 @@ public class IndexMergerV9 implements IndexMerger
         outDir,
         progress,
         mergedDimensions,
-        dimensionsSpec == null ? ImmutableSet.of() : new 
HashSet<>(dimensionsSpec.getDimensionNames()),
+        new DimensionsSpecInspector(storeEmptyColumns, dimensionsSpec),
         mergedMetrics,
         rowMergerFn,
         true,
@@ -1460,4 +1451,34 @@ public class IndexMergerV9 implements IndexMerger
       );
     }
   }
+
+  private static class DimensionsSpecInspector
+  {
+    private final boolean storeEmptyColumns;
+    private final Set<String> explicitDimensions;
+    private final boolean includeAllDimensions;
+
+    private DimensionsSpecInspector(
+        boolean storeEmptyColumns,
+        @Nullable DimensionsSpec dimensionsSpec
+    )
+    {
+      this.storeEmptyColumns = storeEmptyColumns;
+      this.explicitDimensions = dimensionsSpec == null
+                                ? ImmutableSet.of()
+                                : new 
HashSet<>(dimensionsSpec.getDimensionNames());
+      this.includeAllDimensions = dimensionsSpec != null && 
dimensionsSpec.isIncludeAllDimensions();
+    }
+
+    /**
+     * Returns true if the given dimension should be stored in the segment 
even when the column has only nulls.
+     * If it has non-nulls, then the column must be stored.
+     *
+     * @see DimensionMerger#hasOnlyNulls()
+     */
+    private boolean shouldStore(String dimension)
+    {
+      return storeEmptyColumns && (includeAllDimensions || 
explicitDimensions.contains(dimension));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to