This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5e5625f3ae Fix indexMerger to respect the includeAllDimensions flag
(#12428)
5e5625f3ae is described below
commit 5e5625f3ae9fa663697214f49e66b5c296d342fb
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Apr 13 12:43:11 2022 -0700
Fix indexMerger to respect the includeAllDimensions flag (#12428)
* Fix indexMerger to respect flag includeAllDimensions flag;
jsonInputFormat should set keepNullColumns if useFieldDiscovery is set
* address comments
---
.../druid/data/input/impl/JsonInputFormat.java | 12 +-
.../druid/data/input/impl/JsonInputFormatTest.java | 37 +-
...ltiPhaseParallelIndexingWithNullColumnTest.java | 201 ----------
...ltiPhaseParallelIndexingWithNullColumnTest.java | 435 +++++++++++++++++++++
...ngePartitionMultiPhaseParallelIndexingTest.java | 122 ------
.../org/apache/druid/segment/IndexMergerV9.java | 51 ++-
6 files changed, 517 insertions(+), 341 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 8a9a3fd529..0ce4cbfa8b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -74,7 +74,11 @@ public class JsonInputFormat extends NestedInputFormat
super(flattenSpec);
this.featureSpec = featureSpec == null ? Collections.emptyMap() :
featureSpec;
this.objectMapper = new ObjectMapper();
- this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns;
+ if (keepNullColumns != null) {
+ this.keepNullColumns = keepNullColumns;
+ } else {
+ this.keepNullColumns = flattenSpec != null &&
flattenSpec.isUseFieldDiscovery();
+ }
for (Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
Feature feature = Feature.valueOf(entry.getKey());
objectMapper.configure(feature, entry.getValue());
@@ -88,6 +92,12 @@ public class JsonInputFormat extends NestedInputFormat
return featureSpec;
}
+ @JsonProperty
+ public boolean isKeepNullColumns()
+ {
+ return keepNullColumns;
+ }
+
@Override
public boolean isSplittable()
{
diff --git
a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
index aa9c95a7e4..0577dd72fb 100644
---
a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
+++
b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
@@ -41,7 +41,7 @@ public class JsonInputFormatTest
final ObjectMapper mapper = new ObjectMapper();
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
- false,
+ true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz",
"baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2",
"baz2"),
@@ -52,7 +52,7 @@ public class JsonInputFormatTest
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true,
Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
- false
+ true
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes,
InputFormat.class);
@@ -72,4 +72,37 @@ public class JsonInputFormatTest
.withIgnoredFields("objectMapper")
.verify();
}
+
+ @Test
+ public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault()
+ {
+ final JsonInputFormat format = new JsonInputFormat(
+ new JSONPathSpec(false, null),
+ null,
+ null
+ );
+ Assert.assertFalse(format.isKeepNullColumns());
+ }
+
+ @Test
+ public void testUseFieldDiscovery_setKeepNullColumnsByDefault()
+ {
+ final JsonInputFormat format = new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ null
+ );
+ Assert.assertTrue(format.isKeepNullColumns());
+ }
+
+ @Test
+ public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets()
+ {
+ final JsonInputFormat format = new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ false
+ );
+ Assert.assertFalse(format.isKeepNullColumns());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
deleted file mode 100644
index 1c8669b909..0000000000
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingWithNullColumnTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.InlineInputSource;
-import org.apache.druid.data.input.impl.JsonInputFormat;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.task.Tasks;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HashPartitionMultiPhaseParallelIndexingWithNullColumnTest extends
AbstractMultiPhaseParallelIndexingTest
-{
- private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts",
"auto", null);
- private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
- DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
- );
- private static final InputFormat JSON_FORMAT = new JsonInputFormat(null,
null, null);
- private static final List<Interval> INTERVAL_TO_INDEX =
Collections.singletonList(Intervals.of("2022-01/P1M"));
-
- public HashPartitionMultiPhaseParallelIndexingWithNullColumnTest()
- {
- super(LockGranularity.TIME_CHUNK, true, 0., 0.);
- }
-
- @Test
- public void testIngestNullColumn() throws JsonProcessingException
- {
- final List<DimensionSchema> dimensionSchemas =
DimensionsSpec.getDefaultSchemas(
- Arrays.asList("ts", "unknownDim")
- );
- ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
- null,
- null,
- null,
- new ParallelIndexIngestionSpec(
- new DataSchema(
- DATASOURCE,
- TIMESTAMP_SPEC,
- DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
- DEFAULT_METRICS_SPEC,
- new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- INTERVAL_TO_INDEX
- ),
- null
- ),
- new ParallelIndexIOConfig(
- null,
- getInputSource(),
- JSON_FORMAT,
- false,
- null
- ),
- newTuningConfig(
- new HashedPartitionsSpec(
- 10,
- null,
- ImmutableList.of("ts", "unknownDim")
- ),
- 2,
- true
- )
- ),
- null
- );
-
- Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
- Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
- Assert.assertFalse(segments.isEmpty());
- for (DataSegment segment : segments) {
- for (int i = 0; i < dimensionSchemas.size(); i++) {
- Assert.assertEquals(dimensionSchemas.get(i).getName(),
segment.getDimensions().get(i));
- }
- }
- }
-
- @Test
- public void
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws
JsonProcessingException
- {
- ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
- null,
- null,
- null,
- new ParallelIndexIngestionSpec(
- new DataSchema(
- DATASOURCE,
- TIMESTAMP_SPEC,
- DIMENSIONS_SPEC.withDimensions(
- DimensionsSpec.getDefaultSchemas(Arrays.asList("ts",
"unknownDim"))
- ),
- DEFAULT_METRICS_SPEC,
- new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- INTERVAL_TO_INDEX
- ),
- null
- ),
- new ParallelIndexIOConfig(
- null,
- getInputSource(),
- JSON_FORMAT,
- false,
- null
- ),
- newTuningConfig(
- new HashedPartitionsSpec(
- 10,
- null,
- ImmutableList.of("ts", "unknownDim")
- ),
- 2,
- true
- )
- ),
- null
- );
-
- task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
- Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
- Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
- Assert.assertFalse(segments.isEmpty());
- for (DataSegment segment : segments) {
- Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
- }
- }
-
- private InputSource getInputSource() throws JsonProcessingException
- {
- final ObjectMapper mapper = getObjectMapper();
- final List<Map<String, Object>> rows = ImmutableList.of(
- ImmutableMap.of(
- "ts", "2022-01-01",
- "dim1", "val1",
- "dim2", "val11"
- ),
- ImmutableMap.of(
- "ts", "2022-01-02",
- "dim1", "val2",
- "dim2", "val12"
- ),
- ImmutableMap.of(
- "ts", "2022-01-03",
- "dim1", "val3",
- "dim2", "val13"
- )
- );
- final String data = StringUtils.format(
- "%s\n%s\n%s\n",
- mapper.writeValueAsString(rows.get(0)),
- mapper.writeValueAsString(rows.get(1)),
- mapper.writeValueAsString(rows.get(2))
- );
- return new InlineInputSource(data);
- }
-}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
new file mode 100644
index 0000000000..2253d9ac33
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputEntityIteratingReader;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+@RunWith(Parameterized.class)
+public class MultiPhaseParallelIndexingWithNullColumnTest extends
AbstractMultiPhaseParallelIndexingTest
+{
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts",
"auto", null);
+ private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
+ );
+ private static final InputFormat JSON_FORMAT = new JsonInputFormat(null,
null, null);
+ private static final List<Interval> INTERVAL_TO_INDEX =
Collections.singletonList(Intervals.of("2022-01/P1M"));
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> constructorFeeder()
+ {
+ return ImmutableList.of(
+ new Object[]{
+ new HashedPartitionsSpec(
+ 10,
+ null,
+ ImmutableList.of("ts", "unknownDim")
+ )
+ },
+ new Object[]{
+ new DimensionRangePartitionsSpec(
+ 10,
+ null,
+ Collections.singletonList("unknownDim"),
+ false
+ )
+ }
+ );
+ }
+
+ private final PartitionsSpec partitionsSpec;
+
+ public MultiPhaseParallelIndexingWithNullColumnTest(PartitionsSpec
partitionsSpec)
+ {
+ super(LockGranularity.TIME_CHUNK, true, 0., 0.);
+ this.partitionsSpec = partitionsSpec;
+ getObjectMapper().registerSubtypes(SplittableInlineDataSource.class);
+ }
+
+ @Test
+ public void testIngestNullColumn() throws JsonProcessingException
+ {
+ final List<DimensionSchema> dimensionSchemas =
DimensionsSpec.getDefaultSchemas(
+ Arrays.asList("ts", "unknownDim")
+ );
+ ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ null,
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ DATASOURCE,
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
+ DEFAULT_METRICS_SPEC,
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ INTERVAL_TO_INDEX
+ ),
+ null
+ ),
+ new ParallelIndexIOConfig(
+ null,
+ getInputSource(),
+ JSON_FORMAT,
+ false,
+ null
+ ),
+ newTuningConfig(
+ partitionsSpec,
+ 2,
+ true
+ )
+ ),
+ null
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+ Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
+ Assert.assertFalse(segments.isEmpty());
+ for (DataSegment segment : segments) {
+ Assert.assertEquals(dimensionSchemas.size(),
segment.getDimensions().size());
+ for (int i = 0; i < dimensionSchemas.size(); i++) {
+ Assert.assertEquals(dimensionSchemas.get(i).getName(),
segment.getDimensions().get(i));
+ }
+ }
+ }
+
+ @Test
+ public void
testIngestNullColumn_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns()
throws JsonProcessingException
+ {
+ final List<DimensionSchema> dimensionSchemas =
DimensionsSpec.getDefaultSchemas(
+ Arrays.asList("ts", "unknownDim", "dim1")
+ );
+ ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ null,
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ DATASOURCE,
+ TIMESTAMP_SPEC,
+ new
DimensionsSpec.Builder().setDimensions(dimensionSchemas).setIncludeAllDimensions(true).build(),
+ DEFAULT_METRICS_SPEC,
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ INTERVAL_TO_INDEX
+ ),
+ null
+ ),
+ new ParallelIndexIOConfig(
+ null,
+ getInputSource(),
+ new JsonInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ null
+ ),
+ false,
+ null
+ ),
+ newTuningConfig(
+ partitionsSpec,
+ 2,
+ true
+ )
+ ),
+ null
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+ Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
+ Assert.assertFalse(segments.isEmpty());
+ final List<String> expectedExplicitDimensions = ImmutableList.of("ts",
"unknownDim", "dim1");
+ final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2",
"dim3");
+ for (DataSegment segment : segments) {
+ Assert.assertEquals(
+ expectedExplicitDimensions,
+ segment.getDimensions().subList(0, expectedExplicitDimensions.size())
+ );
+ Assert.assertEquals(
+ expectedImplicitDimensions,
+ new
HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(),
segment.getDimensions().size()))
+ );
+ }
+ }
+
+ @Test
+ public void
testIngestNullColumn_explicitPathSpec_useFieldDiscovery_includeAllDimensions_shouldStoreAllColumns()
+ throws JsonProcessingException
+ {
+ ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ null,
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ DATASOURCE,
+ TIMESTAMP_SPEC,
+ new
DimensionsSpec.Builder().setIncludeAllDimensions(true).build(),
+ DEFAULT_METRICS_SPEC,
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ null
+ ),
+ null
+ ),
+ new ParallelIndexIOConfig(
+ null,
+ getInputSource(),
+ new JsonInputFormat(
+ new JSONPathSpec(
+ true,
+ ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH,
"dim1", "$.dim1"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "k",
"$.dim4.k")
+ )
+ ),
+ null,
+ null
+ ),
+ false,
+ null
+ ),
+ newTuningConfig(
+ partitionsSpec,
+ 2,
+ true
+ )
+ ),
+ null
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+ Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
+ Assert.assertFalse(segments.isEmpty());
+ final List<String> expectedExplicitDimensions = ImmutableList.of("dim1",
"k");
+ final Set<String> expectedImplicitDimensions = ImmutableSet.of("dim2",
"dim3");
+ for (DataSegment segment : segments) {
+ Assert.assertEquals(
+ expectedExplicitDimensions,
+ segment.getDimensions().subList(0, expectedExplicitDimensions.size())
+ );
+ Assert.assertEquals(
+ expectedImplicitDimensions,
+ new
HashSet<>(segment.getDimensions().subList(expectedExplicitDimensions.size(),
segment.getDimensions().size()))
+ );
+ }
+
+ }
+
+ @Test
+ public void
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() throws
JsonProcessingException
+ {
+ ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ null,
+ null,
+ null,
+ new ParallelIndexIngestionSpec(
+ new DataSchema(
+ DATASOURCE,
+ TIMESTAMP_SPEC,
+ DIMENSIONS_SPEC.withDimensions(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("ts",
"unknownDim"))
+ ),
+ DEFAULT_METRICS_SPEC,
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.MINUTE,
+ INTERVAL_TO_INDEX
+ ),
+ null
+ ),
+ new ParallelIndexIOConfig(
+ null,
+ getInputSource(),
+ JSON_FORMAT,
+ false,
+ null
+ ),
+ newTuningConfig(
+ partitionsSpec,
+ 2,
+ true
+ )
+ ),
+ null
+ );
+
+ task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
+ Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+
+ Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
+ Assert.assertFalse(segments.isEmpty());
+ final List<DimensionSchema> expectedDimensions =
DimensionsSpec.getDefaultSchemas(
+ Collections.singletonList("ts")
+ );
+ for (DataSegment segment : segments) {
+ Assert.assertEquals(expectedDimensions.size(),
segment.getDimensions().size());
+ for (int i = 0; i < expectedDimensions.size(); i++) {
+ Assert.assertEquals(expectedDimensions.get(i).getName(),
segment.getDimensions().get(i));
+ }
+ }
+ }
+
+ private InputSource getInputSource() throws JsonProcessingException
+ {
+ final ObjectMapper mapper = getObjectMapper();
+ final List<Map<String, Object>> rows = new ArrayList<>();
+ Map<String, Object> row;
+ for (int i = 0; i < 3; i++) {
+ rows.add(row(StringUtils.format("2022-01-%02d", i + 1), "val1", "val2",
null));
+ }
+ rows.add(row("2022-01-04", null, null, null, ImmutableMap.of("k", "v")));
+ final String data = StringUtils.format(
+ "%s\n%s\n%s\n%s\n",
+ mapper.writeValueAsString(rows.get(0)),
+ mapper.writeValueAsString(rows.get(1)),
+ mapper.writeValueAsString(rows.get(2)),
+ mapper.writeValueAsString(rows.get(3))
+ );
+
+ return new SplittableInlineDataSource(ImmutableList.of(data));
+ }
+
+ private static Map<String, Object> row(String timestamp, Object... dims)
+ {
+ Map<String, Object> row = new HashMap<>();
+ row.put("ts", timestamp);
+ IntStream.range(0, dims.length).forEach(i -> row.put("dim" + (i + 1),
dims[i]));
+ return row;
+ }
+
+ /**
+ * Splittable inlineDataSource to run tests with range partitioning which
requires the inputSource to be splittable.
+ */
+ private static final class SplittableInlineDataSource implements
SplittableInputSource<String>
+ {
+ private final List<String> data;
+
+ @JsonCreator
+ public SplittableInlineDataSource(@JsonProperty("data") List<String> data)
+ {
+ this.data = data;
+ }
+
+ @JsonProperty
+ public List<String> getData()
+ {
+ return data;
+ }
+
+ @Override
+ public Stream<InputSplit<String>> createSplits(InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec)
+ {
+ return data.stream().map(InputSplit::new);
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
+ {
+ return data.size();
+ }
+
+ @Override
+ public InputSource withSplit(InputSplit<String> split)
+ {
+ return new SplittableInlineDataSource(ImmutableList.of(split.get()));
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ return true;
+ }
+
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ return new InputEntityIteratingReader(
+ inputRowSchema,
+ inputFormat,
+ data.stream().map(str -> new
ByteEntity(StringUtils.toUtf8(str))).iterator(),
+ temporaryDirectory
+ );
+ }
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 4314a342cb..759d70fe2f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -29,9 +29,7 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
@@ -40,13 +38,9 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -388,122 +382,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
}
}
- @Test
- public void testIngestNullColumn()
- {
- // storeEmptyColumns flag should do nothing with using inputFormat or
multiValueDim
- if (!isUseInputFormatApi() || useMultivalueDim) {
- return;
- }
- int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY /
NUM_PARTITION;
- final List<DimensionSchema> dimensionSchemas =
DimensionsSpec.getDefaultSchemas(
- Arrays.asList("ts", "unknownDim")
- );
- ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
- null,
- null,
- null,
- new ParallelIndexIngestionSpec(
- new DataSchema(
- DATASOURCE,
- TIMESTAMP_SPEC,
- DIMENSIONS_SPEC.withDimensions(dimensionSchemas),
- DEFAULT_METRICS_SPEC,
- new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- intervalToIndex == null ? null :
Collections.singletonList(intervalToIndex)
- ),
- null
- ),
- new ParallelIndexIOConfig(
- null,
- new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
- DEFAULT_INPUT_FORMAT,
- false,
- null
- ),
- newTuningConfig(
- new DimensionRangePartitionsSpec(
- targetRowsPerSegment,
- null,
- Collections.singletonList("unknownDim"),
- false
- ),
- maxNumConcurrentSubTasks,
- true
- )
- ),
- null
- );
-
- Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
- Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
- for (DataSegment segment : segments) {
- for (int i = 0; i < dimensionSchemas.size(); i++) {
- Assert.assertEquals(dimensionSchemas.get(i).getName(),
segment.getDimensions().get(i));
- }
- }
- }
-
- @Test
- public void
testIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns()
- {
- // storeEmptyColumns flag should do nothing with using inputFormat or
multiValueDim
- if (!isUseInputFormatApi() || useMultivalueDim) {
- return;
- }
- int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY /
NUM_PARTITION;
- ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
- null,
- null,
- null,
- new ParallelIndexIngestionSpec(
- new DataSchema(
- DATASOURCE,
- TIMESTAMP_SPEC,
- DIMENSIONS_SPEC.withDimensions(
- DimensionsSpec.getDefaultSchemas(Arrays.asList("ts",
"unknownDim"))
- ),
- DEFAULT_METRICS_SPEC,
- new UniformGranularitySpec(
- Granularities.DAY,
- Granularities.MINUTE,
- intervalToIndex == null ? null :
Collections.singletonList(intervalToIndex)
- ),
- null
- ),
- new ParallelIndexIOConfig(
- null,
- new LocalInputSource(inputDir, TEST_FILE_NAME_PREFIX + "*"),
- DEFAULT_INPUT_FORMAT,
- false,
- null
- ),
- newTuningConfig(
- new DimensionRangePartitionsSpec(
- targetRowsPerSegment,
- null,
- Collections.singletonList("unknownDim"),
- false
- ),
- maxNumConcurrentSubTasks,
- true
- )
- ),
- null
- );
-
- task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false);
- Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
-
- Set<DataSegment> segments =
getIndexingServiceClient().getPublishedSegments(task);
- for (DataSegment segment : segments) {
- Assert.assertFalse(segment.getDimensions().contains("unknownDim"));
- }
- }
-
private ParallelIndexSupervisorTask runTestTask(
PartitionsSpec partitionsSpec,
File inputDirectory,
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 03018c752d..daa4696371 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -205,8 +205,7 @@ public class IndexMergerV9 implements IndexMerger
final File outDir,
final ProgressIndicator progress,
final List<String> mergedDimensions, // should have both explicit and
implicit dimensions
- // a subset of mergedDimensions that are explicitly specified in
DimensionsSpec
- final Set<String> explicitDimensions,
+ final DimensionsSpecInspector dimensionsSpecInspector,
final List<String> mergedMetrics,
final Function<List<TransformableRowIterator>, TimeAndDimsIterator>
rowMergerFn,
final boolean fillRowNumConversions,
@@ -333,7 +332,7 @@ public class IndexMergerV9 implements IndexMerger
if (!merger.hasOnlyNulls()) {
ColumnDescriptor columnDesc = merger.makeColumnDescriptor();
makeColumn(v9Smoosher, mergedDimensions.get(i), columnDesc);
- } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
+ } else if
(dimensionsSpecInspector.shouldStore(mergedDimensions.get(i))) {
// shouldStore AND hasOnlyNulls
ColumnDescriptor columnDesc = ColumnDescriptor
.builder()
@@ -357,7 +356,7 @@ public class IndexMergerV9 implements IndexMerger
progress,
indexSpec,
mergers,
- explicitDimensions
+ dimensionsSpecInspector
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
@@ -374,14 +373,6 @@ public class IndexMergerV9 implements IndexMerger
}
}
- private boolean shouldStore(
- Set<String> explicitDimensions,
- String dimension
- )
- {
- return storeEmptyColumns && explicitDimensions.contains(dimension);
- }
-
private void makeMetadataBinary(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
@@ -404,7 +395,7 @@ public class IndexMergerV9 implements IndexMerger
final ProgressIndicator progress,
final IndexSpec indexSpec,
final List<DimensionMergerV9> mergers,
- final Set<String> explicitDimensions
+ final DimensionsSpecInspector dimensionsSpecInspector
) throws IOException
{
final Set<String> columnSet = new HashSet<>(mergedDimensions);
@@ -439,7 +430,7 @@ public class IndexMergerV9 implements IndexMerger
nonNullOnlyColumns.add(mergedDimensions.get(i));
allDimensions.add(null);
allColumns.add(null);
- } else if (shouldStore(explicitDimensions, mergedDimensions.get(i))) {
+ } else if (dimensionsSpecInspector.shouldStore(mergedDimensions.get(i)))
{
// shouldStore AND hasOnlyNulls
allDimensions.add(mergedDimensions.get(i));
allColumns.add(mergedDimensions.get(i));
@@ -1315,7 +1306,7 @@ public class IndexMergerV9 implements IndexMerger
outDir,
progress,
mergedDimensions,
- dimensionsSpec == null ? ImmutableSet.of() : new
HashSet<>(dimensionsSpec.getDimensionNames()),
+ new DimensionsSpecInspector(storeEmptyColumns, dimensionsSpec),
mergedMetrics,
rowMergerFn,
true,
@@ -1460,4 +1451,34 @@ public class IndexMergerV9 implements IndexMerger
);
}
}
+
+ private static class DimensionsSpecInspector
+ {
+ private final boolean storeEmptyColumns;
+ private final Set<String> explicitDimensions;
+ private final boolean includeAllDimensions;
+
+ private DimensionsSpecInspector(
+ boolean storeEmptyColumns,
+ @Nullable DimensionsSpec dimensionsSpec
+ )
+ {
+ this.storeEmptyColumns = storeEmptyColumns;
+ this.explicitDimensions = dimensionsSpec == null
+ ? ImmutableSet.of()
+ : new
HashSet<>(dimensionsSpec.getDimensionNames());
+ this.includeAllDimensions = dimensionsSpec != null &&
dimensionsSpec.isIncludeAllDimensions();
+ }
+
+ /**
+ * Returns true if the given dimension should be stored in the segment
even when the column has only nulls.
+ * If it has non-nulls, then the column must be stored.
+ *
+ * @see DimensionMerger#hasOnlyNulls()
+ */
+ private boolean shouldStore(String dimension)
+ {
+ return storeEmptyColumns && (includeAllDimensions ||
explicitDimensions.contains(dimension));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]