This is an automated email from the ASF dual-hosted git repository.
soumyava pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5d39b94149c allow compaction to work with spatial dimensions (#15321)
5d39b94149c is described below
commit 5d39b94149c3c7f7662e195be37bcbb3f5e6d2e0
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Nov 3 11:27:50 2023 -0700
allow compaction to work with spatial dimensions (#15321)
---
.../common/task/CompactionTaskRunTest.java | 198 ++++++++++++++++++++-
.../druid/segment/StringDimensionHandler.java | 5 +
.../druid/segment/DimensionHandlerUtilsTest.java | 19 ++
3 files changed, 218 insertions(+), 4 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 275e7670661..e78068086f2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -34,7 +34,9 @@ import
org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -67,6 +69,8 @@ import
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexSpec;
@@ -205,16 +209,33 @@ public class CompactionTaskRunTest extends
IngestionTestBase
List<Interval> intervals
) throws JsonProcessingException
{
- ObjectMapper mapper = new DefaultObjectMapper();
- // Expected compaction state to exist after compaction as we store
compaction state by default
Map<String, String> expectedLongSumMetric = new HashMap<>();
expectedLongSumMetric.put("type", "longSum");
expectedLongSumMetric.put("name", "val");
expectedLongSumMetric.put("fieldName", "val");
+ return getDefaultCompactionState(
+ segmentGranularity,
+ queryGranularity,
+ intervals,
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
+ expectedLongSumMetric
+ );
+ }
+
+ public static CompactionState getDefaultCompactionState(
+ Granularity segmentGranularity,
+ Granularity queryGranularity,
+ List<Interval> intervals,
+ DimensionsSpec expectedDims,
+ Map<String, String> expectedMetric
+ ) throws JsonProcessingException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ // Expected compaction state to exist after compaction as we store
compaction state by default
return new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
- new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
- ImmutableList.of(expectedLongSumMetric),
+ expectedDims,
+ ImmutableList.of(expectedMetric),
null,
IndexSpec.DEFAULT.asMap(mapper),
mapper.readValue(
@@ -1572,6 +1593,135 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Assert.assertEquals(TaskState.FAILED,
compactionResult.lhs.getStatusCode());
}
+ @Test
+ public void testRunWithSpatialDimensions() throws Exception
+ {
+ final List<String> spatialrows = ImmutableList.of(
+ "2014-01-01T00:00:10Z,a,10,100,1\n",
+ "2014-01-01T00:00:10Z,b,20,110,2\n",
+ "2014-01-01T00:00:10Z,c,30,120,3\n",
+ "2014-01-01T01:00:20Z,a,10,100,1\n",
+ "2014-01-01T01:00:20Z,b,20,110,2\n",
+ "2014-01-01T01:00:20Z,c,30,120,3\n"
+ );
+ final ParseSpec spatialSpec = new CSVParseSpec(
+ new TimestampSpec("ts", "auto", null),
+ DimensionsSpec.builder()
+ .setDimensions(Arrays.asList(
+ new StringDimensionSchema("ts"),
+ new StringDimensionSchema("dim"),
+ new NewSpatialDimensionSchema("spatial",
Arrays.asList("x", "y"))
+ ))
+ .build(),
+ "|",
+ Arrays.asList("ts", "dim", "x", "y", "val"),
+ false,
+ 0
+ );
+ runIndexTask(null, null, spatialSpec, spatialrows, false);
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory,
+ RETRY_POLICY_FACTORY
+ );
+
+ final CompactionTask compactionTask = builder
+ .interval(Intervals.of("2014-01-01/2014-01-02"))
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> resultPair =
runTask(compactionTask);
+
+ Assert.assertTrue(resultPair.lhs.isSuccess());
+
+ final List<DataSegment> segments = resultPair.rhs;
+ Assert.assertEquals(2, segments.size());
+
+ for (int i = 0; i < 2; i++) {
+ Assert.assertEquals(
+ Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
+ segments.get(i).getInterval()
+ );
+ Map<String, String> expectedLongSumMetric = new HashMap<>();
+ expectedLongSumMetric.put("name", "val");
+ expectedLongSumMetric.put("type", "longSum");
+ expectedLongSumMetric.put("fieldName", "val");
+ Assert.assertEquals(
+ getDefaultCompactionState(
+ Granularities.HOUR,
+ Granularities.MINUTE,
+
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1)),
+ DimensionsSpec.builder()
+ .setDimensions(Arrays.asList(
+ new StringDimensionSchema("ts"),
+ new StringDimensionSchema("dim"),
+ new NewSpatialDimensionSchema("spatial",
Collections.singletonList("spatial"))
+ ))
+ .build(),
+ expectedLongSumMetric
+ ),
+ segments.get(i).getLastCompactionState()
+ );
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ Assert.assertEquals(
+ new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
+ segments.get(i).getShardSpec()
+ );
+ } else {
+ Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(i).getShardSpec());
+ }
+ }
+
+ final File cacheDir = temporaryFolder.newFolder();
+ final SegmentCacheManager segmentCacheManager =
segmentCacheManagerFactory.manufacturate(cacheDir);
+
+ List<String> rowsFromSegment = new ArrayList<>();
+ for (DataSegment segment : segments) {
+ final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
+
+ final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
+ new
QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
+ segment.getInterval()
+ );
+ final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
+ null,
+ segment.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+
+ cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> {
+ cursor.reset();
+ final ColumnSelectorFactory factory =
cursor.getColumnSelectorFactory();
+
Assert.assertTrue(factory.getColumnCapabilities("spatial").hasSpatialIndexes());
+ while (!cursor.isDone()) {
+ final ColumnValueSelector<?> selector1 =
factory.makeColumnValueSelector("ts");
+ final DimensionSelector selector2 =
factory.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
+ final DimensionSelector selector3 =
factory.makeDimensionSelector(new DefaultDimensionSpec("spatial", "spatial"));
+ final DimensionSelector selector4 =
factory.makeDimensionSelector(new DefaultDimensionSpec("val", "val"));
+
+
+ rowsFromSegment.add(
+ StringUtils.format(
+ "%s,%s,%s,%s\n",
+ selector1.getObject(),
+ selector2.getObject(),
+ selector3.getObject(),
+ selector4.getObject()
+ )
+ );
+
+ cursor.advance();
+ }
+
+ return accumulated;
+ });
+ }
+ Assert.assertEquals(spatialrows, rowsFromSegment);
+ }
+
private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
{
return runIndexTask(null, null, false);
@@ -1620,6 +1770,46 @@ public class CompactionTaskRunTest extends
IngestionTestBase
return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
}
+ private Pair<TaskStatus, List<DataSegment>> runIndexTask(
+ @Nullable CountDownLatch readyLatchToCountDown,
+ @Nullable CountDownLatch latchToAwaitBeforeRun,
+ ParseSpec parseSpec,
+ List<String> rows,
+ boolean appendToExisting
+ ) throws Exception
+ {
+ File tmpDir = temporaryFolder.newFolder();
+ File tmpFile = File.createTempFile("druid", "index", tmpDir);
+
+ try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ for (String testRow : rows) {
+ writer.write(testRow);
+ }
+ }
+
+ IndexTask indexTask = new IndexTask(
+ null,
+ null,
+ IndexTaskTest.createIngestionSpec(
+ getObjectMapper(),
+ tmpDir,
+ parseSpec,
+ null,
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.MINUTE,
+ null
+ ),
+ IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false,
true),
+ appendToExisting,
+ false
+ ),
+ null
+ );
+
+ return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
+ }
+
private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws
Exception
{
return runTask(task, null, null);
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index b3ff6639403..49366023231 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
@@ -31,6 +32,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.util.Collections;
import java.util.Comparator;
public class StringDimensionHandler implements DimensionHandler<Integer,
int[], String>
@@ -124,6 +126,9 @@ public class StringDimensionHandler implements
DimensionHandler<Integer, int[],
@Override
public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities)
{
+ if (hasSpatialIndexes) {
+ return new NewSpatialDimensionSchema(dimensionName,
Collections.singletonList(dimensionName));
+ }
return new StringDimensionSchema(dimensionName, multiValueHandling,
hasBitmapIndexes);
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
b/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
index 53b598a0cd4..3de4f276f62 100644
---
a/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -115,6 +116,24 @@ public class DimensionHandlerUtilsTest extends
InitializedNullHandlingTest
Assert.assertTrue(stringHandler.getDimensionSchema(stringCapabilities)
instanceof StringDimensionSchema);
}
+ @Test
+ public void testGetHandlerFromStringCapabilitiesSpatialIndexes()
+ {
+ ColumnCapabilities stringCapabilities =
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+
.setHasBitmapIndexes(true)
+
.setDictionaryEncoded(true)
+
.setDictionaryValuesUnique(true)
+
.setDictionaryValuesUnique(true)
+
.setHasSpatialIndexes(true);
+ DimensionHandler spatialHandler =
DimensionHandlerUtils.getHandlerFromCapabilities(
+ DIM_NAME,
+ stringCapabilities,
+ DimensionSchema.MultiValueHandling.SORTED_SET
+ );
+ Assert.assertTrue(spatialHandler instanceof StringDimensionHandler);
+ Assert.assertTrue(spatialHandler.getDimensionSchema(stringCapabilities)
instanceof NewSpatialDimensionSchema);
+ }
+
@Test
public void testGetHandlerFromFloatCapabilities()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]