fjy closed pull request #6326: Add support hash partitioning by a subset of 
dimensions to indexTask
URL: https://github.com/apache/incubator-druid/pull/6326
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/ingestion/native_tasks.md 
b/docs/content/ingestion/native_tasks.md
index 3d27823c233..11497de02ef 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -475,6 +475,7 @@ The tuningConfig is optional and default parameters will be 
used if no tuningCon
 |maxBytesInMemory|Used in determining when intermediate persists to disk 
should occur. Normally this is computed internally and user does not need to 
set it. This value represents number of bytes to aggregate in heap memory 
before persisting. This is based on a rough estimate of memory usage and not 
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * 
(2 + maxPendingPersists)|1/6 of max JVM memory|no|
 |maxTotalRows|Total number of rows in segments waiting for being pushed. Used 
in determining when intermediate pushing should occur.|20000000|no|
 |numShards|Directly specify the number of shards to create. If this is 
specified and 'intervals' is specified in the granularitySpec, the index task 
can skip the determine intervals/partitions pass through the data. numShards 
cannot be specified if targetPartitionSize is set.|null|no|
+|partitionDimensions|The dimensions to partition on. Leave blank to select all 
dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored 
otherwise.|null|no|
 |indexSpec|defines segment storage format options to be used at indexing time, 
see [IndexSpec](#indexspec)|null|no|
 |maxPendingPersists|Maximum number of persists that can be pending but not 
started. If this limit would be exceeded by a new intermediate persist, 
ingestion will block until the currently-running persist finishes. Maximum heap 
memory usage for indexing scales with maxRowsInMemory * (2 + 
maxPendingPersists).|0 (meaning one persist can be running concurrently with 
ingestion, and none can be queued up)|no|
 |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental 
feature intended for use with the [Kafka indexing service 
extension](../development/extensions-core/kafka-ingestion.html).|false|no|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index f00cd0abc84..4ec41283c57 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -112,6 +112,7 @@
 import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -654,6 +655,7 @@ private static ShardSpecs createShardSpecWithoutInputScan(
       final int numShards = tuningConfig.getNumShards() == null ? 1 : 
tuningConfig.getNumShards();
       final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = 
getShardSpecCreateFunction(
           numShards,
+          tuningConfig.getPartitionDimensions(),
           jsonMapper
       );
 
@@ -721,6 +723,7 @@ private ShardSpecs createShardSpecsFromInput(
         // Overwrite mode, guaranteed rollup: shardSpecs must be known in 
advance.
         final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = 
getShardSpecCreateFunction(
             numShards,
+            tuningConfig.getPartitionDimensions(),
             jsonMapper
         );
 
@@ -839,6 +842,7 @@ private ShardSpecs createShardSpecsFromInput(
 
   private static BiFunction<Integer, Integer, ShardSpec> 
getShardSpecCreateFunction(
       Integer numShards,
+      List<String> partitionDimensions,
       ObjectMapper jsonMapper
   )
   {
@@ -847,7 +851,12 @@ private ShardSpecs createShardSpecsFromInput(
     if (numShards == 1) {
       return (shardId, totalNumShards) -> NoneShardSpec.instance();
     } else {
-      return (shardId, totalNumShards) -> new 
HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
+      return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(
+          shardId,
+          totalNumShards,
+          partitionDimensions,
+          jsonMapper
+      );
     }
   }
 
@@ -1347,6 +1356,7 @@ public boolean isAppendToExisting()
     private final Long maxTotalRows;
     @Nullable
     private final Integer numShards;
+    private final List<String> partitionDimensions;
     private final IndexSpec indexSpec;
     private final File basePersistDirectory;
     private final int maxPendingPersists;
@@ -1386,6 +1396,7 @@ public IndexTuningConfig(
         @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
         @JsonProperty("rowFlushBoundary") @Nullable Integer 
rowFlushBoundary_forBackCompatibility, // DEPRECATED
         @JsonProperty("numShards") @Nullable Integer numShards,
+        @JsonProperty("partitionDimensions") @Nullable List<String> 
partitionDimensions,
         @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
         @JsonProperty("maxPendingPersists") @Nullable Integer 
maxPendingPersists,
         // This parameter is left for compatibility when reading existing 
JSONs, to be removed in Druid 0.12.
@@ -1408,6 +1419,7 @@ public IndexTuningConfig(
           maxBytesInMemory != null ? maxBytesInMemory : 0,
           maxTotalRows,
           numShards,
+          partitionDimensions,
           indexSpec,
           maxPendingPersists,
           forceExtendableShardSpecs,
@@ -1424,7 +1436,7 @@ public IndexTuningConfig(
 
     private IndexTuningConfig()
     {
-      this(null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null);
+      this(null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null);
     }
 
     private IndexTuningConfig(
@@ -1433,6 +1445,7 @@ private IndexTuningConfig(
         @Nullable Long maxBytesInMemory,
         @Nullable Long maxTotalRows,
         @Nullable Integer numShards,
+        @Nullable List<String> partitionDimensions,
         @Nullable IndexSpec indexSpec,
         @Nullable Integer maxPendingPersists,
         @Nullable Boolean forceExtendableShardSpecs,
@@ -1460,6 +1473,7 @@ private IndexTuningConfig(
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
       this.maxTotalRows = maxTotalRows;
       this.numShards = numShards == null || numShards.equals(-1) ? null : 
numShards;
+      this.partitionDimensions = partitionDimensions == null ? 
Collections.emptyList() : partitionDimensions;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
       this.maxPendingPersists = maxPendingPersists == null ? 
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
       this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
@@ -1498,6 +1512,7 @@ public IndexTuningConfig withBasePersistDirectory(File 
dir)
           maxBytesInMemory,
           maxTotalRows,
           numShards,
+          partitionDimensions,
           indexSpec,
           maxPendingPersists,
           forceExtendableShardSpecs,
@@ -1520,6 +1535,7 @@ public IndexTuningConfig withTargetPartitionSize(int 
targetPartitionSize)
           maxBytesInMemory,
           maxTotalRows,
           numShards,
+          partitionDimensions,
           indexSpec,
           maxPendingPersists,
           forceExtendableShardSpecs,
@@ -1577,6 +1593,12 @@ public Integer getNumShards()
       return numShards;
     }
 
+    @JsonProperty
+    public List<String> getPartitionDimensions()
+    {
+      return partitionDimensions;
+    }
+
     @JsonProperty
     @Override
     public IndexSpec getIndexSpec()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index b640c56ceb1..aecbbc00689 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -296,6 +296,7 @@ private static IndexTuningConfig 
convertToIndexTuningConfig(ParallelIndexTuningC
         tuningConfig.getMaxTotalRows(),
         null,
         tuningConfig.getNumShards(),
+        null,
         tuningConfig.getIndexSpec(),
         tuningConfig.getMaxPendingPersists(),
         true,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index e09055948a7..8f6239dbca2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -105,6 +105,7 @@ public ParallelIndexTuningConfig(
         maxTotalRows,
         null,
         numShards,
+        null,
         indexSpec,
         maxPendingPersists,
         null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index a71545ebd7d..b9f36b2b54e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -37,6 +38,8 @@
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -93,6 +96,18 @@ public int columnCacheSizeBytes()
             .addValue(DataSegment.PruneLoadSpecHolder.class, 
DataSegment.PruneLoadSpecHolder.DEFAULT)
             .addValue(IndexingServiceClient.class, new 
NoopIndexingServiceClient())
             .addValue(AuthorizerMapper.class, new 
AuthorizerMapper(ImmutableMap.of()))
+            .addValue(LocalDataSegmentPuller.class, new 
LocalDataSegmentPuller())
+    );
+
+    jsonMapper.registerModule(
+        new SimpleModule()
+        {
+          @Override
+          public void setupModule(SetupContext context)
+          {
+            context.registerSubtypes(LocalLoadSpec.class);
+          }
+        }
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 36eb56ac48c..837b33a50a0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -273,6 +273,7 @@ private static IndexTuningConfig createTuningConfig()
         null,
         null,
         null,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -423,6 +424,7 @@ public void 
testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio
         null,
         null,
         null,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -483,6 +485,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() 
throws IOException, Segm
         5L,
         null,
         null,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -543,6 +546,7 @@ public void testCreateIngestionSchemaWithNumShards() throws 
IOException, Segment
         null,
         null,
         3,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -758,6 +762,7 @@ public void testTargetPartitionSizeWithPartitionConfig() 
throws IOException, Seg
         null,
         null,
         null,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -850,6 +855,7 @@ private static void assertIngestionSchema(
             null,
             null,
             null,
+            null,
             new IndexSpec(
                 new RoaringBitmapSerdeFactory(true),
                 CompressionStrategy.LZ4,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 3e288c5d2f7..9d4f6e7fb75 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
@@ -61,21 +62,35 @@
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.security.AuthTestUtils;
@@ -93,10 +108,10 @@
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -131,6 +146,10 @@
       0
   );
 
+  private DataSegmentPusher pusher;
+  private SegmentLoader segmentLoader;
+  private List<DataSegment> segments;
+
   private static final IndexSpec indexSpec = new IndexSpec();
   private final ObjectMapper jsonMapper;
   private IndexMergerV9 indexMergerV9;
@@ -143,6 +162,7 @@ public IndexTaskTest()
   {
     TestUtils testUtils = new TestUtils();
     jsonMapper = testUtils.getTestObjectMapper();
+
     indexMergerV9 = testUtils.getTestIndexMergerV9();
     indexIO = testUtils.getTestIndexIO();
     rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
@@ -151,7 +171,50 @@ public IndexTaskTest()
   @Before
   public void setup() throws IOException
   {
-    reportsFile = File.createTempFile("IndexTaskTestReports-" + 
System.currentTimeMillis(), "json");
+    reportsFile = temporaryFolder.newFile(
+        StringUtils.format("IndexTaskTestReports-%s.json", 
System.currentTimeMillis())
+    );
+
+    final File deepStorageDir = temporaryFolder.newFolder();
+    final File cacheDir = temporaryFolder.newFolder();
+
+    pusher = new LocalDataSegmentPusher(
+        new LocalDataSegmentPusherConfig()
+        {
+          @Override
+          public File getStorageDirectory()
+          {
+            return deepStorageDir;
+          }
+        },
+        jsonMapper
+    )
+    {
+      @Override
+      public DataSegment push(final File dataSegmentFile, final DataSegment 
segment, final boolean useUniquePath)
+          throws IOException
+      {
+        final DataSegment returnSegment = super.push(dataSegmentFile, segment, 
useUniquePath);
+        segments.add(returnSegment);
+        return returnSegment;
+      }
+    };
+    segmentLoader = new SegmentLoaderLocalCacheManager(
+        indexIO,
+        new SegmentLoaderConfig()
+        {
+          @Override
+          public List<StorageLocationConfig> getLocations()
+          {
+            return Collections.singletonList(
+                new StorageLocationConfig().setPath(cacheDir)
+            );
+          }
+        },
+        jsonMapper
+    );
+    segments = new ArrayList<>();
+
   }
 
   @After
@@ -180,7 +243,7 @@ public void testDeterminePartitions() throws Exception
             tmpDir,
             null,
             null,
-            createTuningConfig(2, null, false, true),
+            createTuningConfigWithTargetPartitionSize(2, false, true),
             false
         ),
         null,
@@ -226,7 +289,7 @@ public void testForceExtendableShardSpecs() throws Exception
             tmpDir,
             null,
             null,
-            createTuningConfig(2, null, true, true),
+            createTuningConfigWithTargetPartitionSize(2, true, true),
             false
         ),
         null,
@@ -278,7 +341,7 @@ public void testTransformSpec() throws Exception
                 )
             ),
             null,
-            createTuningConfig(2, null, true, false),
+            createTuningConfigWithTargetPartitionSize(2, true, false),
             false
         ),
         null,
@@ -322,7 +385,7 @@ public void testWithArbitraryGranularity() throws Exception
                 Granularities.MINUTE,
                 
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            createTuningConfig(10, null, false, true),
+            createTuningConfigWithTargetPartitionSize(10, false, true),
             false
         ),
         null,
@@ -331,7 +394,7 @@ public void testWithArbitraryGranularity() throws Exception
         rowIngestionMetersFactory
     );
 
-    List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runTask(indexTask).rhs;
 
     Assert.assertEquals(1, segments.size());
   }
@@ -359,7 +422,7 @@ public void testIntervalBucketing() throws Exception
                 Granularities.HOUR,
                 
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
             ),
-            createTuningConfig(50, null, false, true),
+            createTuningConfigWithTargetPartitionSize(50, false, true),
             false
         ),
         null,
@@ -392,7 +455,7 @@ public void testNumShardsProvided() throws Exception
             tmpDir,
             null,
             null,
-            createTuningConfig(null, 1, false, true),
+            createTuningConfigWithNumShards(1, null, false, true),
             false
         ),
         null,
@@ -407,10 +470,84 @@ public void testNumShardsProvided() throws Exception
 
     Assert.assertEquals("test", segments.get(0).getDataSource());
     Assert.assertEquals(Intervals.of("2014/P1D"), 
segments.get(0).getInterval());
-    
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
+    Assert.assertEquals(NoneShardSpec.class, 
segments.get(0).getShardSpec().getClass());
     Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
   }
 
+  @Test
+  public void testNumShardsAndPartitionDimensionsProvided() throws Exception
+  {
+    final File tmpDir = temporaryFolder.newFolder();
+    final File tmpFile = File.createTempFile("druid", "index", tmpDir);
+
+    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+      writer.write("2014-01-01T00:00:10Z,a,1\n");
+      writer.write("2014-01-01T01:00:20Z,b,1\n");
+      writer.write("2014-01-01T02:00:30Z,c,1\n");
+    }
+
+    final IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        createIngestionSpec(
+            tmpDir,
+            null,
+            null,
+            createTuningConfigWithNumShards(2, ImmutableList.of("dim"), false, 
true),
+            false
+        ),
+        null,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    runTask(indexTask);
+
+    Assert.assertEquals(2, segments.size());
+
+    for (DataSegment segment : segments) {
+      Assert.assertEquals("test", segment.getDataSource());
+      Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval());
+      Assert.assertEquals(HashBasedNumberedShardSpec.class, 
segment.getShardSpec().getClass());
+
+      final File segmentFile = segmentLoader.getSegmentFiles(segment);
+
+      final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
+          new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
+          segment.getInterval()
+      );
+
+      final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
+          null,
+          segment.getInterval(),
+          VirtualColumns.EMPTY,
+          Granularities.ALL,
+          false,
+          null
+      );
+      final List<Integer> hashes = cursorSequence
+          .map(cursor -> {
+            final DimensionSelector selector = 
cursor.getColumnSelectorFactory()
+                                                     
.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
+            try {
+              final int hash = HashBasedNumberedShardSpec.hash(
+                  jsonMapper,
+                  Collections.singletonList(selector.getObject())
+              );
+              cursor.advance();
+              return hash;
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .toList();
+
+      Assert.assertTrue(hashes.stream().allMatch(h -> h.intValue() == 
hashes.get(0)));
+    }
+  }
+
   @Test
   public void testAppendToExisting() throws Exception
   {
@@ -431,7 +568,7 @@ public void testAppendToExisting() throws Exception
             tmpDir,
             null,
             null,
-            createTuningConfig(2, null, false, false),
+            createTuningConfigWithTargetPartitionSize(2, false, false),
             true
         ),
         null,
@@ -481,7 +618,7 @@ public void testIntervalNotSpecified() throws Exception
                 Granularities.MINUTE,
                 null
             ),
-            createTuningConfig(2, null, false, true),
+            createTuningConfigWithTargetPartitionSize(2, false, true),
             false
         ),
         null,
@@ -544,7 +681,7 @@ public void testCSVFileWithHeader() throws Exception
                 0
             ),
             null,
-            createTuningConfig(2, null, false, true),
+            createTuningConfigWithTargetPartitionSize(2, false, true),
             false
         ),
         null,
@@ -596,7 +733,7 @@ public void testCSVFileWithHeaderColumnOverride() throws 
Exception
                 0
             ),
             null,
-            createTuningConfig(2, null, false, true),
+            createTuningConfigWithTargetPartitionSize(2, false, true),
             false
         ),
         null,
@@ -643,7 +780,7 @@ public void testWithSmallMaxTotalRows() throws Exception
                 Granularities.MINUTE,
                 null
             ),
-            createTuningConfig(2, 2, null, 2L, null, false, false, true),
+            createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
             false
         ),
         null,
@@ -688,7 +825,7 @@ public void testPerfectRollup() throws Exception
                 true,
                 null
             ),
-            createTuningConfig(3, 2, null, 2L, null, false, true, true),
+            createTuningConfig(3, 2, null, 2L, null, null, false, true, true),
             false
         ),
         null,
@@ -732,7 +869,7 @@ public void testBestEffortRollup() throws Exception
                 true,
                 null
             ),
-            createTuningConfig(3, 2, null, 2L, null, false, false, true),
+            createTuningConfig(3, 2, null, 2L, null, null, false, false, true),
             false
         ),
         null,
@@ -805,7 +942,7 @@ public void testIgnoreParseException() throws Exception
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, false, false, false), // 
ignore parse exception,
+        createTuningConfig(2, null, null, null, null, null, false, false, 
false), // ignore parse exception,
         false
     );
 
@@ -858,7 +995,7 @@ public void testReportParseException() throws Exception
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, false, false, true), // 
report parse exception
+        createTuningConfig(2, null, null, null, null, null, false, false, 
true), // report parse exception
         false
     );
 
@@ -912,6 +1049,7 @@ public void testMultipleParseExceptionsSuccess() throws 
Exception
         null,
         null,
         null,
+        null,
         indexSpec,
         null,
         true,
@@ -1033,6 +1171,7 @@ public void testMultipleParseExceptionsFailure() throws 
Exception
         null,
         null,
         null,
+        null,
         indexSpec,
         null,
         true,
@@ -1147,6 +1286,7 @@ public void 
testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc
         null,
         null,
         null,
+        null,
         indexSpec,
         null,
         true,
@@ -1283,7 +1423,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws 
Exception
             0
         ),
         null,
-        createTuningConfig(2, 1, null, null, null, false, true, true), // 
report parse exception
+        createTuningConfig(2, 1, null, null, null, null, false, true, true), 
// report parse exception
         false
     );
 
@@ -1353,7 +1493,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws 
Exception
             0
         ),
         null,
-        createTuningConfig(2, null, null, null, null, false, false, true), // 
report parse exception
+        createTuningConfig(2, null, null, null, null, null, false, false, 
true), // report parse exception
         false
     );
 
@@ -1392,8 +1532,6 @@ public static void 
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus
 
   private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask indexTask) 
throws Exception
   {
-    final List<DataSegment> segments = Lists.newArrayList();
-
     final TaskActionClient actionClient = new TaskActionClient()
     {
       @Override
@@ -1450,35 +1588,6 @@ public static void 
checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus
       }
     };
 
-    final DataSegmentPusher pusher = new DataSegmentPusher()
-    {
-      @Deprecated
-      @Override
-      public String getPathForHadoop(String dataSource)
-      {
-        return getPathForHadoop();
-      }
-
-      @Override
-      public String getPathForHadoop()
-      {
-        return null;
-      }
-
-      @Override
-      public DataSegment push(File file, DataSegment segment, boolean 
useUniquePath)
-      {
-        segments.add(segment);
-        return segment;
-      }
-
-      @Override
-      public Map<String, Object> makeLoadSpec(URI uri)
-      {
-        throw new UnsupportedOperationException();
-      }
-    };
-
     final DataSegmentKiller killer = new DataSegmentKiller()
     {
       @Override
@@ -1526,7 +1635,14 @@ public void killAll()
     indexTask.isReady(box.getTaskActionClient());
     TaskStatus status = indexTask.run(box);
 
-    Collections.sort(segments);
+    segments.sort((s1, s2) -> {
+      final int comp = 
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), 
s2.getInterval());
+      if (comp != 0) {
+        return comp;
+      }
+      //noinspection SubtractionInCompareTo
+      return s1.getShardSpec().getPartitionNum() - 
s2.getShardSpec().getPartitionNum();
+    });
 
     return Pair.of(status, segments);
   }
@@ -1584,9 +1700,8 @@ public void killAll()
     );
   }
 
-  private static IndexTuningConfig createTuningConfig(
-      Integer targetPartitionSize,
-      Integer numShards,
+  private static IndexTuningConfig createTuningConfigWithTargetPartitionSize(
+      int targetPartitionSize,
       boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup
   )
@@ -1596,7 +1711,28 @@ private static IndexTuningConfig createTuningConfig(
         1,
         null,
         null,
+        null,
+        null,
+        forceExtendableShardSpecs,
+        forceGuaranteedRollup,
+        true
+    );
+  }
+
+  private static IndexTuningConfig createTuningConfigWithNumShards(
+      int numShards,
+      @Nullable List<String> partitionDimensions,
+      boolean forceExtendableShardSpecs,
+      boolean forceGuaranteedRollup
+  )
+  {
+    return createTuningConfig(
+        null,
+        1,
+        null,
+        null,
         numShards,
+        partitionDimensions,
         forceExtendableShardSpecs,
         forceGuaranteedRollup,
         true
@@ -1604,11 +1740,12 @@ private static IndexTuningConfig createTuningConfig(
   }
 
   private static IndexTuningConfig createTuningConfig(
-      Integer targetPartitionSize,
-      Integer maxRowsInMemory,
-      Long maxBytesInMemory,
-      Long maxTotalRows,
-      Integer numShards,
+      @Nullable Integer targetPartitionSize,
+      @Nullable Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Long maxTotalRows,
+      @Nullable Integer numShards,
+      @Nullable List<String> partitionDimensions,
       boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup,
       boolean reportParseException
@@ -1621,6 +1758,7 @@ private static IndexTuningConfig createTuningConfig(
         maxTotalRows,
         null,
         numShards,
+        partitionDimensions,
         indexSpec,
         null,
         true,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 1fe36fe0144..98c42b7117d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -193,6 +193,7 @@ public void testIndexTaskSerde() throws Exception
                 null,
                 9999,
                 null,
+                null,
                 indexSpec,
                 3,
                 true,
@@ -278,6 +279,7 @@ public void testIndexTaskwithResourceSerde() throws 
Exception
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 3,
                 true,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index eb0393332e5..af814d9f849 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -692,6 +692,7 @@ public void testIndexTask() throws Exception
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 3,
                 true,
@@ -772,6 +773,7 @@ public void testIndexTaskFailure() throws Exception
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 3,
                 true,
@@ -1159,6 +1161,7 @@ public void testResumeTasks() throws Exception
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 null,
                 false,
diff --git 
a/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
 
b/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
index 456da4b8c94..3554326606e 100644
--- 
a/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++ 
b/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@@ -25,6 +25,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -74,7 +75,7 @@ protected int hash(long timestamp, InputRow inputRow)
   {
     final List<Object> groupKey = getGroupKey(timestamp, inputRow);
     try {
-      return 
hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
+      return hash(jsonMapper, groupKey);
     }
     catch (JsonProcessingException e) {
       throw Throwables.propagate(e);
@@ -93,6 +94,12 @@ protected int hash(long timestamp, InputRow inputRow)
     }
   }
 
+  @VisibleForTesting
+  public static int hash(ObjectMapper jsonMapper, List<Object> objects) throws 
JsonProcessingException
+  {
+    return 
hashFunction.hashBytes(jsonMapper.writeValueAsBytes(objects)).asInt();
+  }
+
   @Override
   public String toString()
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to