This is an automated email from the ASF dual-hosted git repository.

soumyava pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d39b94149c allow compaction to work with spatial dimensions (#15321)
5d39b94149c is described below

commit 5d39b94149c3c7f7662e195be37bcbb3f5e6d2e0
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Nov 3 11:27:50 2023 -0700

    allow compaction to work with spatial dimensions (#15321)
---
 .../common/task/CompactionTaskRunTest.java         | 198 ++++++++++++++++++++-
 .../druid/segment/StringDimensionHandler.java      |   5 +
 .../druid/segment/DimensionHandlerUtilsTest.java   |  19 ++
 3 files changed, 218 insertions(+), 4 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 275e7670661..e78068086f2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -34,7 +34,9 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.client.indexing.NoopOverlordClient;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
 import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -67,6 +69,8 @@ import 
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IndexSpec;
@@ -205,16 +209,33 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
       List<Interval> intervals
   ) throws JsonProcessingException
   {
-    ObjectMapper mapper = new DefaultObjectMapper();
-    // Expected compaction state to exist after compaction as we store 
compaction state by default
     Map<String, String> expectedLongSumMetric = new HashMap<>();
     expectedLongSumMetric.put("type", "longSum");
     expectedLongSumMetric.put("name", "val");
     expectedLongSumMetric.put("fieldName", "val");
+    return getDefaultCompactionState(
+        segmentGranularity,
+        queryGranularity,
+        intervals,
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
+        expectedLongSumMetric
+    );
+  }
+
+  public static CompactionState getDefaultCompactionState(
+      Granularity segmentGranularity,
+      Granularity queryGranularity,
+      List<Interval> intervals,
+      DimensionsSpec expectedDims,
+      Map<String, String> expectedMetric
+  ) throws JsonProcessingException
+  {
+    ObjectMapper mapper = new DefaultObjectMapper();
+    // Expected compaction state to exist after compaction as we store 
compaction state by default
     return new CompactionState(
         new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
-        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
-        ImmutableList.of(expectedLongSumMetric),
+        expectedDims,
+        ImmutableList.of(expectedMetric),
         null,
         IndexSpec.DEFAULT.asMap(mapper),
         mapper.readValue(
@@ -1572,6 +1593,135 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     Assert.assertEquals(TaskState.FAILED, 
compactionResult.lhs.getStatusCode());
   }
 
+  @Test
+  public void testRunWithSpatialDimensions() throws Exception
+  {
+    final List<String> spatialrows = ImmutableList.of(
+        "2014-01-01T00:00:10Z,a,10,100,1\n",
+        "2014-01-01T00:00:10Z,b,20,110,2\n",
+        "2014-01-01T00:00:10Z,c,30,120,3\n",
+        "2014-01-01T01:00:20Z,a,10,100,1\n",
+        "2014-01-01T01:00:20Z,b,20,110,2\n",
+        "2014-01-01T01:00:20Z,c,30,120,3\n"
+    );
+    final ParseSpec spatialSpec = new CSVParseSpec(
+        new TimestampSpec("ts", "auto", null),
+        DimensionsSpec.builder()
+                      .setDimensions(Arrays.asList(
+                          new StringDimensionSchema("ts"),
+                          new StringDimensionSchema("dim"),
+                          new NewSpatialDimensionSchema("spatial", 
Arrays.asList("x", "y"))
+                      ))
+                      .build(),
+        "|",
+        Arrays.asList("ts", "dim", "x", "y", "val"),
+        false,
+        0
+    );
+    runIndexTask(null, null, spatialSpec, spatialrows, false);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    final CompactionTask compactionTask = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> resultPair = 
runTask(compactionTask);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    final List<DataSegment> segments = resultPair.rhs;
+    Assert.assertEquals(2, segments.size());
+
+    for (int i = 0; i < 2; i++) {
+      Assert.assertEquals(
+          Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
+          segments.get(i).getInterval()
+      );
+      Map<String, String> expectedLongSumMetric = new HashMap<>();
+      expectedLongSumMetric.put("name", "val");
+      expectedLongSumMetric.put("type", "longSum");
+      expectedLongSumMetric.put("fieldName", "val");
+      Assert.assertEquals(
+          getDefaultCompactionState(
+              Granularities.HOUR,
+              Granularities.MINUTE,
+              
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i 
+ 1)),
+              DimensionsSpec.builder()
+                            .setDimensions(Arrays.asList(
+                                new StringDimensionSchema("ts"),
+                                new StringDimensionSchema("dim"),
+                                new NewSpatialDimensionSchema("spatial", 
Collections.singletonList("spatial"))
+                            ))
+                            .build(),
+              expectedLongSumMetric
+          ),
+          segments.get(i).getLastCompactionState()
+      );
+      if (lockGranularity == LockGranularity.SEGMENT) {
+        Assert.assertEquals(
+            new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
+            segments.get(i).getShardSpec()
+        );
+      } else {
+        Assert.assertEquals(new NumberedShardSpec(0, 1), 
segments.get(i).getShardSpec());
+      }
+    }
+
+    final File cacheDir = temporaryFolder.newFolder();
+    final SegmentCacheManager segmentCacheManager = 
segmentCacheManagerFactory.manufacturate(cacheDir);
+
+    List<String> rowsFromSegment = new ArrayList<>();
+    for (DataSegment segment : segments) {
+      final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
+
+      final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
+          new 
QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
+          segment.getInterval()
+      );
+      final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
+          null,
+          segment.getInterval(),
+          VirtualColumns.EMPTY,
+          Granularities.ALL,
+          false,
+          null
+      );
+
+      cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> {
+        cursor.reset();
+        final ColumnSelectorFactory factory = 
cursor.getColumnSelectorFactory();
+        
Assert.assertTrue(factory.getColumnCapabilities("spatial").hasSpatialIndexes());
+        while (!cursor.isDone()) {
+          final ColumnValueSelector<?> selector1 = 
factory.makeColumnValueSelector("ts");
+          final DimensionSelector selector2 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
+          final DimensionSelector selector3 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("spatial", "spatial"));
+          final DimensionSelector selector4 = 
factory.makeDimensionSelector(new DefaultDimensionSpec("val", "val"));
+
+
+          rowsFromSegment.add(
+              StringUtils.format(
+                  "%s,%s,%s,%s\n",
+                  selector1.getObject(),
+                  selector2.getObject(),
+                  selector3.getObject(),
+                  selector4.getObject()
+              )
+          );
+
+          cursor.advance();
+        }
+
+        return accumulated;
+      });
+    }
+    Assert.assertEquals(spatialrows, rowsFromSegment);
+  }
+
   private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
   {
     return runIndexTask(null, null, false);
@@ -1620,6 +1770,46 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
     return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
   }
 
+  private Pair<TaskStatus, List<DataSegment>> runIndexTask(
+      @Nullable CountDownLatch readyLatchToCountDown,
+      @Nullable CountDownLatch latchToAwaitBeforeRun,
+      ParseSpec parseSpec,
+      List<String> rows,
+      boolean appendToExisting
+  ) throws Exception
+  {
+    File tmpDir = temporaryFolder.newFolder();
+    File tmpFile = File.createTempFile("druid", "index", tmpDir);
+
+    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+      for (String testRow : rows) {
+        writer.write(testRow);
+      }
+    }
+
+    IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        IndexTaskTest.createIngestionSpec(
+            getObjectMapper(),
+            tmpDir,
+            parseSpec,
+            null,
+            new UniformGranularitySpec(
+                Granularities.HOUR,
+                Granularities.MINUTE,
+                null
+            ),
+            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, 
true),
+            appendToExisting,
+            false
+        ),
+        null
+    );
+
+    return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
+  }
+
   private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws 
Exception
   {
     return runTask(task, null, null);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index b3ff6639403..49366023231 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment;
 
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.Closer;
@@ -31,6 +32,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.util.Collections;
 import java.util.Comparator;
 
 public class StringDimensionHandler implements DimensionHandler<Integer, 
int[], String>
@@ -124,6 +126,9 @@ public class StringDimensionHandler implements 
DimensionHandler<Integer, int[],
   @Override
   public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities)
   {
+    if (hasSpatialIndexes) {
+      return new NewSpatialDimensionSchema(dimensionName, 
Collections.singletonList(dimensionName));
+    }
     return new StringDimensionSchema(dimensionName, multiValueHandling, 
hasBitmapIndexes);
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
index 53b598a0cd4..3de4f276f62 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/DimensionHandlerUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DoubleDimensionSchema;
 import org.apache.druid.data.input.impl.FloatDimensionSchema;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -115,6 +116,24 @@ public class DimensionHandlerUtilsTest extends 
InitializedNullHandlingTest
     Assert.assertTrue(stringHandler.getDimensionSchema(stringCapabilities) 
instanceof StringDimensionSchema);
   }
 
+  @Test
+  public void testGetHandlerFromStringCapabilitiesSpatialIndexes()
+  {
+    ColumnCapabilities stringCapabilities = 
ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                                                                  
.setHasBitmapIndexes(true)
+                                                                  
.setDictionaryEncoded(true)
+                                                                  
.setDictionaryValuesUnique(true)
+                                                                  
.setDictionaryValuesUnique(true)
+                                                                  
.setHasSpatialIndexes(true);
+    DimensionHandler spatialHandler = 
DimensionHandlerUtils.getHandlerFromCapabilities(
+        DIM_NAME,
+        stringCapabilities,
+        DimensionSchema.MultiValueHandling.SORTED_SET
+    );
+    Assert.assertTrue(spatialHandler instanceof StringDimensionHandler);
+    Assert.assertTrue(spatialHandler.getDimensionSchema(stringCapabilities) 
instanceof NewSpatialDimensionSchema);
+  }
+
   @Test
   public void testGetHandlerFromFloatCapabilities()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to