This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1e66736 Do not use UnmodifiableList in auto compaction (#9535)
1e66736 is described below
commit 1e667362ebabd6f4348e9f46cf613a2f18cb70ba
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Mar 19 11:43:33 2020 -0700
Do not use UnmodifiableList in auto compaction (#9535)
---
.../client/indexing/HttpIndexingServiceClient.java | 2 +-
.../duty/NewestSegmentFirstIterator.java | 7 +-
.../coordinator/duty/CompactSegmentsTest.java | 296 ++++++++++++++-------
3 files changed, 210 insertions(+), 95 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 9e69b71..8ae9777 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -231,7 +231,7 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
);
if (!responseHolder.getStatus().equals(HttpResponseStatus.OK)) {
- throw new ISE("Error while fetching the status of the last complete
task");
+ throw new ISE("Error while fetching the status of tasks");
}
return jsonMapper.readValue(
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 78ab09c..551e445 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -41,6 +41,7 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -240,9 +241,9 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
if (holders.isEmpty()) {
throw new NoSuchElementException();
}
- return FluentIterable.from(holders.remove(holders.size() -
1).getObject())
- .transform(PartitionChunk::getObject)
- .toList();
+ return Streams.sequentialStreamFrom(holders.remove(holders.size() -
1).getObject())
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 3baf47f..6bac494 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -19,20 +19,32 @@
package org.apache.druid.server.coordinator.duty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.client.indexing.NoopIndexingServiceClient;
-import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.client.indexing.ClientTaskQuery;
+import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingWorker;
+import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.Request;
+import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -45,6 +57,13 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.utils.Streams;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -52,102 +71,21 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
public class CompactSegmentsTest
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";
- private final IndexingServiceClient indexingServiceClient = new
NoopIndexingServiceClient()
- {
- private int compactVersionSuffix = 0;
- private int idSuffix = 0;
-
- @Override
- public String compactSegments(
- List<DataSegment> segments,
- int compactionTaskPriority,
- ClientCompactionTaskQueryTuningConfig tuningConfig,
- Map<String, Object> context
- )
- {
- Preconditions.checkArgument(segments.size() > 1);
- DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
- for (DataSegment segment : segments) {
- if (segment.getInterval().getStart().compareTo(minStart) < 0) {
- minStart = segment.getInterval().getStart();
- }
- if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
- maxEnd = segment.getInterval().getEnd();
- }
- }
- Interval compactInterval = new Interval(minStart, maxEnd);
- final VersionedIntervalTimeline<String, DataSegment> timeline =
dataSources.get(segments.get(0).getDataSource());
- segments.forEach(
- segment -> timeline.remove(
- segment.getInterval(),
- segment.getVersion(),
- segment.getShardSpec().createChunk(segment)
- )
- );
- final String version = "newVersion_" + compactVersionSuffix++;
- final long segmentSize =
segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
- for (int i = 0; i < 2; i++) {
- DataSegment compactSegment = new DataSegment(
- segments.get(0).getDataSource(),
- compactInterval,
- version,
- null,
- segments.get(0).getDimensions(),
- segments.get(0).getMetrics(),
- new NumberedShardSpec(i, 0),
- new CompactionState(
- new DynamicPartitionsSpec(
- tuningConfig.getMaxRowsPerSegment(),
- tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
- ),
- ImmutableMap.of(
- "bitmap",
- ImmutableMap.of("type", "concise"),
- "dimensionCompression",
- "lz4",
- "metricCompression",
- "lz4",
- "longEncoding",
- "longs"
- )
- ),
- 1,
- segmentSize
- );
-
- timeline.add(
- compactInterval,
- compactSegment.getVersion(),
- compactSegment.getShardSpec().createChunk(compactSegment)
- );
- }
-
- return "task_" + idSuffix++;
- }
-
- @Override
- public List<TaskStatusPlus> getActiveTasks()
- {
- return Collections.emptyList();
- }
-
- @Override
- public int getTotalWorkerCapacity()
- {
- return 10;
- }
- };
-
private Map<String, VersionedIntervalTimeline<String, DataSegment>>
dataSources;
@Before
@@ -156,7 +94,7 @@ public class CompactSegmentsTest
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
- for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
+ for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
segments.add(createSegment(dataSource, j, true, k));
segments.add(createSegment(dataSource, j, false, k));
@@ -202,7 +140,11 @@ public class CompactSegmentsTest
@Test
public void testRun()
{
- final CompactSegments compactSegments = new CompactSegments(new
DefaultObjectMapper(), indexingServiceClient);
+ final ObjectMapper jsonMapper = new DefaultObjectMapper();
+ final TestDruidLeaderClient leaderClient = new
TestDruidLeaderClient(jsonMapper);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new
HttpIndexingServiceClient(jsonMapper, leaderClient);
+ final CompactSegments compactSegments = new CompactSegments(jsonMapper,
indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@@ -408,4 +350,176 @@ public class CompactSegmentsTest
}
return compactionConfigs;
}
+
+ private class TestDruidLeaderClient extends DruidLeaderClient
+ {
+ private final ObjectMapper jsonMapper;
+
+ private int compactVersionSuffix = 0;
+ private int idSuffix = 0;
+
+ private TestDruidLeaderClient(ObjectMapper jsonMapper)
+ {
+ super(null, new TestNodeDiscoveryProvider(), null, null);
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public Request makeRequest(HttpMethod httpMethod, String urlPath) throws
IOException
+ {
+ return new Request(httpMethod, new URL("http", "host", 8090, urlPath));
+ }
+
+ @Override
+ public StringFullResponseHolder go(Request request) throws IOException
+ {
+ final String urlString = request.getUrl().toString();
+ if (urlString.contains("/druid/indexer/v1/task")) {
+ return handleTask(request);
+ } else if (urlString.contains("/druid/indexer/v1/workers")) {
+ return handleWorkers();
+ } else if (urlString.contains("/druid/indexer/v1/waitingTasks")
+ || urlString.contains("/druid/indexer/v1/pendingTasks")
+ || urlString.contains("/druid/indexer/v1/runningTasks")) {
+ return
createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
+ } else {
+ throw new IAE("Cannot handle request for url[%s]", request.getUrl());
+ }
+ }
+
+ private StringFullResponseHolder createStringFullResponseHolder(String
content)
+ {
+ final HttpResponse httpResponse = new
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ final StringFullResponseHolder holder = new StringFullResponseHolder(
+ HttpResponseStatus.OK,
+ httpResponse,
+ StandardCharsets.UTF_8
+ );
+ holder.addChunk(content);
+ return holder;
+ }
+
+ private StringFullResponseHolder handleWorkers() throws
JsonProcessingException
+ {
+ final List<IndexingWorkerInfo> workerInfos = new ArrayList<>();
+ // There are 10 workers available in this test
+ for (int i = 0; i < 10; i++) {
+ workerInfos.add(
+ new IndexingWorkerInfo(
+ new IndexingWorker("http", "host", "8091", 1, "version"),
+ 0,
+ Collections.emptySet(),
+ Collections.emptyList(),
+ DateTimes.EPOCH,
+ null
+ )
+ );
+ }
+ return
createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
+ }
+
+ private StringFullResponseHolder handleTask(Request request) throws
IOException
+ {
+ final ClientTaskQuery taskQuery =
jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
+ if (!(taskQuery instanceof ClientCompactionTaskQuery)) {
+ throw new IAE("Cannot run non-compaction task");
+ }
+ final ClientCompactionTaskQuery compactionTaskQuery =
(ClientCompactionTaskQuery) taskQuery;
+ final Interval intervalToCompact =
compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
dataSources.get(
+ compactionTaskQuery.getDataSource()
+ );
+ final List<DataSegment> segments = timeline.lookup(intervalToCompact)
+ .stream()
+ .flatMap(holder ->
Streams.sequentialStreamFrom(holder.getObject()))
+
.map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
+ final String taskId = compactSegments(
+ timeline,
+ segments,
+ compactionTaskQuery.getTuningConfig()
+ );
+ return
createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task",
taskId)));
+ }
+
+ private String compactSegments(
+ VersionedIntervalTimeline<String, DataSegment> timeline,
+ List<DataSegment> segments,
+ ClientCompactionTaskQueryTuningConfig tuningConfig
+ )
+ {
+ Preconditions.checkArgument(segments.size() > 1);
+ DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
+ for (DataSegment segment : segments) {
+ if (segment.getInterval().getStart().compareTo(minStart) < 0) {
+ minStart = segment.getInterval().getStart();
+ }
+ if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
+ maxEnd = segment.getInterval().getEnd();
+ }
+ }
+ Interval compactInterval = new Interval(minStart, maxEnd);
+ segments.forEach(
+ segment -> timeline.remove(
+ segment.getInterval(),
+ segment.getVersion(),
+ segment.getShardSpec().createChunk(segment)
+ )
+ );
+ final String version = "newVersion_" + compactVersionSuffix++;
+ final long segmentSize =
segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
+ for (int i = 0; i < 2; i++) {
+ DataSegment compactSegment = new DataSegment(
+ segments.get(0).getDataSource(),
+ compactInterval,
+ version,
+ null,
+ segments.get(0).getDimensions(),
+ segments.get(0).getMetrics(),
+ new NumberedShardSpec(i, 0),
+ new CompactionState(
+ new DynamicPartitionsSpec(
+ tuningConfig.getMaxRowsPerSegment(),
+ tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
+ ),
+ ImmutableMap.of(
+ "bitmap",
+ ImmutableMap.of("type", "concise"),
+ "dimensionCompression",
+ "lz4",
+ "metricCompression",
+ "lz4",
+ "longEncoding",
+ "longs"
+ )
+ ),
+ 1,
+ segmentSize
+ );
+
+ timeline.add(
+ compactInterval,
+ compactSegment.getVersion(),
+ compactSegment.getShardSpec().createChunk(compactSegment)
+ );
+ }
+
+ return "task_" + idSuffix++;
+ }
+ }
+
+ private static class TestNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
+ {
+ @Override
+ public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
+ {
+ return EasyMock.niceMock(DruidNodeDiscovery.class);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]