This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 45aa51a Add support hash partitioning by a subset of dimensions to
indexTask (#6326)
45aa51a is described below
commit 45aa51a00c642a501834e2dfe54d68cbab8e0464
Author: Jihoon Son <[email protected]>
AuthorDate: Sat Oct 6 16:45:07 2018 -0700
Add support hash partitioning by a subset of dimensions to indexTask (#6326)
* Add support hash partitioning by a subset of dimensions to indexTask
* add doc
* fix style
* fix test
* fix doc
* fix build
---
docs/content/ingestion/native_tasks.md | 1 +
.../druid/indexing/common/task/IndexTask.java | 26 ++-
.../parallel/ParallelIndexSupervisorTask.java | 1 +
.../batch/parallel/ParallelIndexTuningConfig.java | 1 +
.../apache/druid/indexing/common/TestUtils.java | 15 ++
.../indexing/common/task/CompactionTaskTest.java | 6 +
.../druid/indexing/common/task/IndexTaskTest.java | 260 ++++++++++++++++-----
.../druid/indexing/common/task/TaskSerdeTest.java | 2 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 3 +
.../partition/HashBasedNumberedShardSpec.java | 9 +-
10 files changed, 260 insertions(+), 64 deletions(-)
diff --git a/docs/content/ingestion/native_tasks.md
b/docs/content/ingestion/native_tasks.md
index 3d27823..11497de 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -475,6 +475,7 @@ The tuningConfig is optional and default parameters will be
used if no tuningCon
|maxBytesInMemory|Used in determining when intermediate persists to disk
should occur. Normally this is computed internally and user does not need to
set it. This value represents number of bytes to aggregate in heap memory
before persisting. This is based on a rough estimate of memory usage and not
actual usage. The maximum heap memory usage for indexing is maxBytesInMemory *
(2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxTotalRows|Total number of rows in segments waiting for being pushed. Used
in determining when intermediate pushing should occur.|20000000|no|
|numShards|Directly specify the number of shards to create. If this is
specified and 'intervals' is specified in the granularitySpec, the index task
can skip the determine intervals/partitions pass through the data. numShards
cannot be specified if targetPartitionSize is set.|null|no|
+|partitionDimensions|The dimensions to partition on. Leave blank to select all
dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored
otherwise.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time,
see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not
started. If this limit would be exceeded by a new intermediate persist,
ingestion will block until the currently-running persist finishes. Maximum heap
memory usage for indexing scales with maxRowsInMemory * (2 +
maxPendingPersists).|0 (meaning one persist can be running concurrently with
ingestion, and none can be queued up)|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental
feature intended for use with the [Kafka indexing service
extension](../development/extensions-core/kafka-ingestion.html).|false|no|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index f00cd0a..4ec4128 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -112,6 +112,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -654,6 +655,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
final int numShards = tuningConfig.getNumShards() == null ? 1 :
tuningConfig.getNumShards();
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn =
getShardSpecCreateFunction(
numShards,
+ tuningConfig.getPartitionDimensions(),
jsonMapper
);
@@ -721,6 +723,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
// Overwrite mode, guaranteed rollup: shardSpecs must be known in
advance.
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn =
getShardSpecCreateFunction(
numShards,
+ tuningConfig.getPartitionDimensions(),
jsonMapper
);
@@ -839,6 +842,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
private static BiFunction<Integer, Integer, ShardSpec>
getShardSpecCreateFunction(
Integer numShards,
+ List<String> partitionDimensions,
ObjectMapper jsonMapper
)
{
@@ -847,7 +851,12 @@ public class IndexTask extends AbstractTask implements
ChatHandler
if (numShards == 1) {
return (shardId, totalNumShards) -> NoneShardSpec.instance();
} else {
- return (shardId, totalNumShards) -> new
HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
+ return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(
+ shardId,
+ totalNumShards,
+ partitionDimensions,
+ jsonMapper
+ );
}
}
@@ -1347,6 +1356,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
private final Long maxTotalRows;
@Nullable
private final Integer numShards;
+ private final List<String> partitionDimensions;
private final IndexSpec indexSpec;
private final File basePersistDirectory;
private final int maxPendingPersists;
@@ -1386,6 +1396,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("rowFlushBoundary") @Nullable Integer
rowFlushBoundary_forBackCompatibility, // DEPRECATED
@JsonProperty("numShards") @Nullable Integer numShards,
+ @JsonProperty("partitionDimensions") @Nullable List<String>
partitionDimensions,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("maxPendingPersists") @Nullable Integer
maxPendingPersists,
// This parameter is left for compatibility when reading existing
JSONs, to be removed in Druid 0.12.
@@ -1408,6 +1419,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
maxBytesInMemory != null ? maxBytesInMemory : 0,
maxTotalRows,
numShards,
+ partitionDimensions,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
@@ -1424,7 +1436,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
private IndexTuningConfig()
{
- this(null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null);
+ this(null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null);
}
private IndexTuningConfig(
@@ -1433,6 +1445,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
@Nullable Long maxBytesInMemory,
@Nullable Long maxTotalRows,
@Nullable Integer numShards,
+ @Nullable List<String> partitionDimensions,
@Nullable IndexSpec indexSpec,
@Nullable Integer maxPendingPersists,
@Nullable Boolean forceExtendableShardSpecs,
@@ -1460,6 +1473,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = maxTotalRows;
this.numShards = numShards == null || numShards.equals(-1) ? null :
numShards;
+ this.partitionDimensions = partitionDimensions == null ?
Collections.emptyList() : partitionDimensions;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ?
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
@@ -1498,6 +1512,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
maxBytesInMemory,
maxTotalRows,
numShards,
+ partitionDimensions,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
@@ -1520,6 +1535,7 @@ public class IndexTask extends AbstractTask implements
ChatHandler
maxBytesInMemory,
maxTotalRows,
numShards,
+ partitionDimensions,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
@@ -1578,6 +1594,12 @@ public class IndexTask extends AbstractTask implements
ChatHandler
}
@JsonProperty
+ public List<String> getPartitionDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @JsonProperty
@Override
public IndexSpec getIndexSpec()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index b640c56..aecbbc0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -296,6 +296,7 @@ public class ParallelIndexSupervisorTask extends
AbstractTask implements ChatHan
tuningConfig.getMaxTotalRows(),
null,
tuningConfig.getNumShards(),
+ null,
tuningConfig.getIndexSpec(),
tuningConfig.getMaxPendingPersists(),
true,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index e090559..8f6239d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -105,6 +105,7 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
maxTotalRows,
null,
numShards,
+ null,
indexSpec,
maxPendingPersists,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index a71545e..b9f36b2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -37,6 +38,8 @@ import
org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -93,6 +96,18 @@ public class TestUtils
.addValue(DataSegment.PruneLoadSpecHolder.class,
DataSegment.PruneLoadSpecHolder.DEFAULT)
.addValue(IndexingServiceClient.class, new
NoopIndexingServiceClient())
.addValue(AuthorizerMapper.class, new
AuthorizerMapper(ImmutableMap.of()))
+ .addValue(LocalDataSegmentPuller.class, new
LocalDataSegmentPuller())
+ );
+
+ jsonMapper.registerModule(
+ new SimpleModule()
+ {
+ @Override
+ public void setupModule(SetupContext context)
+ {
+ context.registerSubtypes(LocalLoadSpec.class);
+ }
+ }
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 36eb56a..837b33a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -273,6 +273,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -423,6 +424,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -483,6 +485,7 @@ public class CompactionTaskTest
5L,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -543,6 +546,7 @@ public class CompactionTaskTest
null,
null,
3,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -758,6 +762,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
@@ -850,6 +855,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 3e288c5..9d4f6e7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
@@ -61,21 +62,35 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthTestUtils;
@@ -93,10 +108,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -131,6 +146,10 @@ public class IndexTaskTest
0
);
+ private DataSegmentPusher pusher;
+ private SegmentLoader segmentLoader;
+ private List<DataSegment> segments;
+
private static final IndexSpec indexSpec = new IndexSpec();
private final ObjectMapper jsonMapper;
private IndexMergerV9 indexMergerV9;
@@ -143,6 +162,7 @@ public class IndexTaskTest
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
+
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
@@ -151,7 +171,50 @@ public class IndexTaskTest
@Before
public void setup() throws IOException
{
- reportsFile = File.createTempFile("IndexTaskTestReports-" +
System.currentTimeMillis(), "json");
+ reportsFile = temporaryFolder.newFile(
+ StringUtils.format("IndexTaskTestReports-%s.json",
System.currentTimeMillis())
+ );
+
+ final File deepStorageDir = temporaryFolder.newFolder();
+ final File cacheDir = temporaryFolder.newFolder();
+
+ pusher = new LocalDataSegmentPusher(
+ new LocalDataSegmentPusherConfig()
+ {
+ @Override
+ public File getStorageDirectory()
+ {
+ return deepStorageDir;
+ }
+ },
+ jsonMapper
+ )
+ {
+ @Override
+ public DataSegment push(final File dataSegmentFile, final DataSegment
segment, final boolean useUniquePath)
+ throws IOException
+ {
+ final DataSegment returnSegment = super.push(dataSegmentFile, segment,
useUniquePath);
+ segments.add(returnSegment);
+ return returnSegment;
+ }
+ };
+ segmentLoader = new SegmentLoaderLocalCacheManager(
+ indexIO,
+ new SegmentLoaderConfig()
+ {
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return Collections.singletonList(
+ new StorageLocationConfig().setPath(cacheDir)
+ );
+ }
+ },
+ jsonMapper
+ );
+ segments = new ArrayList<>();
+
}
@After
@@ -180,7 +243,7 @@ public class IndexTaskTest
tmpDir,
null,
null,
- createTuningConfig(2, null, false, true),
+ createTuningConfigWithTargetPartitionSize(2, false, true),
false
),
null,
@@ -226,7 +289,7 @@ public class IndexTaskTest
tmpDir,
null,
null,
- createTuningConfig(2, null, true, true),
+ createTuningConfigWithTargetPartitionSize(2, true, true),
false
),
null,
@@ -278,7 +341,7 @@ public class IndexTaskTest
)
),
null,
- createTuningConfig(2, null, true, false),
+ createTuningConfigWithTargetPartitionSize(2, true, false),
false
),
null,
@@ -322,7 +385,7 @@ public class IndexTaskTest
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
),
- createTuningConfig(10, null, false, true),
+ createTuningConfigWithTargetPartitionSize(10, false, true),
false
),
null,
@@ -331,7 +394,7 @@ public class IndexTaskTest
rowIngestionMetersFactory
);
- List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runTask(indexTask).rhs;
Assert.assertEquals(1, segments.size());
}
@@ -359,7 +422,7 @@ public class IndexTaskTest
Granularities.HOUR,
Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
),
- createTuningConfig(50, null, false, true),
+ createTuningConfigWithTargetPartitionSize(50, false, true),
false
),
null,
@@ -392,7 +455,7 @@ public class IndexTaskTest
tmpDir,
null,
null,
- createTuningConfig(null, 1, false, true),
+ createTuningConfigWithNumShards(1, null, false, true),
false
),
null,
@@ -407,11 +470,85 @@ public class IndexTaskTest
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"),
segments.get(0).getInterval());
-
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
+ Assert.assertEquals(NoneShardSpec.class,
segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
}
@Test
+ public void testNumShardsAndPartitionDimensionsProvided() throws Exception
+ {
+ final File tmpDir = temporaryFolder.newFolder();
+ final File tmpFile = File.createTempFile("druid", "index", tmpDir);
+
+ try (BufferedWriter writer = Files.newWriter(tmpFile,
StandardCharsets.UTF_8)) {
+ writer.write("2014-01-01T00:00:10Z,a,1\n");
+ writer.write("2014-01-01T01:00:20Z,b,1\n");
+ writer.write("2014-01-01T02:00:30Z,c,1\n");
+ }
+
+ final IndexTask indexTask = new IndexTask(
+ null,
+ null,
+ createIngestionSpec(
+ tmpDir,
+ null,
+ null,
+ createTuningConfigWithNumShards(2, ImmutableList.of("dim"), false,
true),
+ false
+ ),
+ null,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ null,
+ rowIngestionMetersFactory
+ );
+
+ runTask(indexTask);
+
+ Assert.assertEquals(2, segments.size());
+
+ for (DataSegment segment : segments) {
+ Assert.assertEquals("test", segment.getDataSource());
+ Assert.assertEquals(Intervals.of("2014/P1D"), segment.getInterval());
+ Assert.assertEquals(HashBasedNumberedShardSpec.class,
segment.getShardSpec().getClass());
+
+ final File segmentFile = segmentLoader.getSegmentFiles(segment);
+
+ final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
+ new QueryableIndexStorageAdapter(indexIO.loadIndex(segmentFile)),
+ segment.getInterval()
+ );
+
+ final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
+ null,
+ segment.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+ final List<Integer> hashes = cursorSequence
+ .map(cursor -> {
+ final DimensionSelector selector =
cursor.getColumnSelectorFactory()
+
.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
+ try {
+ final int hash = HashBasedNumberedShardSpec.hash(
+ jsonMapper,
+ Collections.singletonList(selector.getObject())
+ );
+ cursor.advance();
+ return hash;
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .toList();
+
+ Assert.assertTrue(hashes.stream().allMatch(h -> h.intValue() ==
hashes.get(0)));
+ }
+ }
+
+ @Test
public void testAppendToExisting() throws Exception
{
segmentAllocatePartitionCounter = 0;
@@ -431,7 +568,7 @@ public class IndexTaskTest
tmpDir,
null,
null,
- createTuningConfig(2, null, false, false),
+ createTuningConfigWithTargetPartitionSize(2, false, false),
true
),
null,
@@ -481,7 +618,7 @@ public class IndexTaskTest
Granularities.MINUTE,
null
),
- createTuningConfig(2, null, false, true),
+ createTuningConfigWithTargetPartitionSize(2, false, true),
false
),
null,
@@ -544,7 +681,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, null, false, true),
+ createTuningConfigWithTargetPartitionSize(2, false, true),
false
),
null,
@@ -596,7 +733,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, null, false, true),
+ createTuningConfigWithTargetPartitionSize(2, false, true),
false
),
null,
@@ -643,7 +780,7 @@ public class IndexTaskTest
Granularities.MINUTE,
null
),
- createTuningConfig(2, 2, null, 2L, null, false, false, true),
+ createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
false
),
null,
@@ -688,7 +825,7 @@ public class IndexTaskTest
true,
null
),
- createTuningConfig(3, 2, null, 2L, null, false, true, true),
+ createTuningConfig(3, 2, null, 2L, null, null, false, true, true),
false
),
null,
@@ -732,7 +869,7 @@ public class IndexTaskTest
true,
null
),
- createTuningConfig(3, 2, null, 2L, null, false, false, true),
+ createTuningConfig(3, 2, null, 2L, null, null, false, false, true),
false
),
null,
@@ -805,7 +942,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, null, null, null, null, false, false, false), //
ignore parse exception,
+ createTuningConfig(2, null, null, null, null, null, false, false,
false), // ignore parse exception,
false
);
@@ -858,7 +995,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, null, null, null, null, false, false, true), //
report parse exception
+ createTuningConfig(2, null, null, null, null, null, false, false,
true), // report parse exception
false
);
@@ -912,6 +1049,7 @@ public class IndexTaskTest
null,
null,
null,
+ null,
indexSpec,
null,
true,
@@ -1033,6 +1171,7 @@ public class IndexTaskTest
null,
null,
null,
+ null,
indexSpec,
null,
true,
@@ -1147,6 +1286,7 @@ public class IndexTaskTest
null,
null,
null,
+ null,
indexSpec,
null,
true,
@@ -1283,7 +1423,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, 1, null, null, null, false, true, true), //
report parse exception
+ createTuningConfig(2, 1, null, null, null, null, false, true, true),
// report parse exception
false
);
@@ -1353,7 +1493,7 @@ public class IndexTaskTest
0
),
null,
- createTuningConfig(2, null, null, null, null, false, false, true), //
report parse exception
+ createTuningConfig(2, null, null, null, null, null, false, false,
true), // report parse exception
false
);
@@ -1392,8 +1532,6 @@ public class IndexTaskTest
private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask indexTask)
throws Exception
{
- final List<DataSegment> segments = Lists.newArrayList();
-
final TaskActionClient actionClient = new TaskActionClient()
{
@Override
@@ -1450,35 +1588,6 @@ public class IndexTaskTest
}
};
- final DataSegmentPusher pusher = new DataSegmentPusher()
- {
- @Deprecated
- @Override
- public String getPathForHadoop(String dataSource)
- {
- return getPathForHadoop();
- }
-
- @Override
- public String getPathForHadoop()
- {
- return null;
- }
-
- @Override
- public DataSegment push(File file, DataSegment segment, boolean
useUniquePath)
- {
- segments.add(segment);
- return segment;
- }
-
- @Override
- public Map<String, Object> makeLoadSpec(URI uri)
- {
- throw new UnsupportedOperationException();
- }
- };
-
final DataSegmentKiller killer = new DataSegmentKiller()
{
@Override
@@ -1526,7 +1635,14 @@ public class IndexTaskTest
indexTask.isReady(box.getTaskActionClient());
TaskStatus status = indexTask.run(box);
- Collections.sort(segments);
+ segments.sort((s1, s2) -> {
+ final int comp =
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(),
s2.getInterval());
+ if (comp != 0) {
+ return comp;
+ }
+ //noinspection SubtractionInCompareTo
+ return s1.getShardSpec().getPartitionNum() -
s2.getShardSpec().getPartitionNum();
+ });
return Pair.of(status, segments);
}
@@ -1584,9 +1700,8 @@ public class IndexTaskTest
);
}
- private static IndexTuningConfig createTuningConfig(
- Integer targetPartitionSize,
- Integer numShards,
+ private static IndexTuningConfig createTuningConfigWithTargetPartitionSize(
+ int targetPartitionSize,
boolean forceExtendableShardSpecs,
boolean forceGuaranteedRollup
)
@@ -1596,7 +1711,28 @@ public class IndexTaskTest
1,
null,
null,
+ null,
+ null,
+ forceExtendableShardSpecs,
+ forceGuaranteedRollup,
+ true
+ );
+ }
+
+ private static IndexTuningConfig createTuningConfigWithNumShards(
+ int numShards,
+ @Nullable List<String> partitionDimensions,
+ boolean forceExtendableShardSpecs,
+ boolean forceGuaranteedRollup
+ )
+ {
+ return createTuningConfig(
+ null,
+ 1,
+ null,
+ null,
numShards,
+ partitionDimensions,
forceExtendableShardSpecs,
forceGuaranteedRollup,
true
@@ -1604,11 +1740,12 @@ public class IndexTaskTest
}
private static IndexTuningConfig createTuningConfig(
- Integer targetPartitionSize,
- Integer maxRowsInMemory,
- Long maxBytesInMemory,
- Long maxTotalRows,
- Integer numShards,
+ @Nullable Integer targetPartitionSize,
+ @Nullable Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Long maxTotalRows,
+ @Nullable Integer numShards,
+ @Nullable List<String> partitionDimensions,
boolean forceExtendableShardSpecs,
boolean forceGuaranteedRollup,
boolean reportParseException
@@ -1621,6 +1758,7 @@ public class IndexTaskTest
maxTotalRows,
null,
numShards,
+ partitionDimensions,
indexSpec,
null,
true,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 1fe36fe..98c42b7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -193,6 +193,7 @@ public class TaskSerdeTest
null,
9999,
null,
+ null,
indexSpec,
3,
true,
@@ -278,6 +279,7 @@ public class TaskSerdeTest
null,
null,
null,
+ null,
indexSpec,
3,
true,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index eb03933..af814d9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -692,6 +692,7 @@ public class TaskLifecycleTest
null,
null,
null,
+ null,
indexSpec,
3,
true,
@@ -772,6 +773,7 @@ public class TaskLifecycleTest
null,
null,
null,
+ null,
indexSpec,
3,
true,
@@ -1159,6 +1161,7 @@ public class TaskLifecycleTest
null,
null,
null,
+ null,
indexSpec,
null,
false,
diff --git
a/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
b/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
index 456da4b..3554326 100644
---
a/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++
b/server/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -74,7 +75,7 @@ public class HashBasedNumberedShardSpec extends
NumberedShardSpec
{
final List<Object> groupKey = getGroupKey(timestamp, inputRow);
try {
- return
hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asInt();
+ return hash(jsonMapper, groupKey);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
@@ -93,6 +94,12 @@ public class HashBasedNumberedShardSpec extends
NumberedShardSpec
}
}
+ @VisibleForTesting
+ public static int hash(ObjectMapper jsonMapper, List<Object> objects) throws
JsonProcessingException
+ {
+ return
hashFunction.hashBytes(jsonMapper.writeValueAsBytes(objects)).asInt();
+ }
+
@Override
public String toString()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]