This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e66736  Do not use UnmodifiableList in auto compaction (#9535)
1e66736 is described below

commit 1e667362ebabd6f4348e9f46cf613a2f18cb70ba
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Mar 19 11:43:33 2020 -0700

    Do not use UnmodifiableList in auto compaction (#9535)
---
 .../client/indexing/HttpIndexingServiceClient.java |   2 +-
 .../duty/NewestSegmentFirstIterator.java           |   7 +-
 .../coordinator/duty/CompactSegmentsTest.java      | 296 ++++++++++++++-------
 3 files changed, 210 insertions(+), 95 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 9e69b71..8ae9777 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -231,7 +231,7 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
       );
 
       if (!responseHolder.getStatus().equals(HttpResponseStatus.OK)) {
-        throw new ISE("Error while fetching the status of the last complete 
task");
+        throw new ISE("Error while fetching the status of tasks");
       }
 
       return jsonMapper.readValue(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 78ab09c..551e445 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -41,6 +41,7 @@ import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.utils.Streams;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -240,9 +241,9 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       if (holders.isEmpty()) {
         throw new NoSuchElementException();
       }
-      return FluentIterable.from(holders.remove(holders.size() - 
1).getObject())
-                           .transform(PartitionChunk::getObject)
-                           .toList();
+      return Streams.sequentialStreamFrom(holders.remove(holders.size() - 
1).getObject())
+                    .map(PartitionChunk::getObject)
+                    .collect(Collectors.toList());
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 3baf47f..6bac494 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -19,20 +19,32 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.client.indexing.NoopIndexingServiceClient;
-import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.client.indexing.ClientTaskQuery;
+import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingWorker;
+import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
 import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -45,6 +57,13 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.utils.Streams;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -52,102 +71,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 public class CompactSegmentsTest
 {
   private static final String DATA_SOURCE_PREFIX = "dataSource_";
 
-  private final IndexingServiceClient indexingServiceClient = new 
NoopIndexingServiceClient()
-  {
-    private int compactVersionSuffix = 0;
-    private int idSuffix = 0;
-
-    @Override
-    public String compactSegments(
-        List<DataSegment> segments,
-        int compactionTaskPriority,
-        ClientCompactionTaskQueryTuningConfig tuningConfig,
-        Map<String, Object> context
-    )
-    {
-      Preconditions.checkArgument(segments.size() > 1);
-      DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
-      for (DataSegment segment : segments) {
-        if (segment.getInterval().getStart().compareTo(minStart) < 0) {
-          minStart = segment.getInterval().getStart();
-        }
-        if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
-          maxEnd = segment.getInterval().getEnd();
-        }
-      }
-      Interval compactInterval = new Interval(minStart, maxEnd);
-      final VersionedIntervalTimeline<String, DataSegment> timeline = 
dataSources.get(segments.get(0).getDataSource());
-      segments.forEach(
-          segment -> timeline.remove(
-              segment.getInterval(),
-              segment.getVersion(),
-              segment.getShardSpec().createChunk(segment)
-          )
-      );
-      final String version = "newVersion_" + compactVersionSuffix++;
-      final long segmentSize = 
segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
-      for (int i = 0; i < 2; i++) {
-        DataSegment compactSegment = new DataSegment(
-            segments.get(0).getDataSource(),
-            compactInterval,
-            version,
-            null,
-            segments.get(0).getDimensions(),
-            segments.get(0).getMetrics(),
-            new NumberedShardSpec(i, 0),
-            new CompactionState(
-                new DynamicPartitionsSpec(
-                    tuningConfig.getMaxRowsPerSegment(),
-                    tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
-                ),
-                ImmutableMap.of(
-                    "bitmap",
-                    ImmutableMap.of("type", "concise"),
-                    "dimensionCompression",
-                    "lz4",
-                    "metricCompression",
-                    "lz4",
-                    "longEncoding",
-                    "longs"
-                )
-            ),
-            1,
-            segmentSize
-        );
-
-        timeline.add(
-            compactInterval,
-            compactSegment.getVersion(),
-            compactSegment.getShardSpec().createChunk(compactSegment)
-        );
-      }
-
-      return "task_" + idSuffix++;
-    }
-
-    @Override
-    public List<TaskStatusPlus> getActiveTasks()
-    {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public int getTotalWorkerCapacity()
-    {
-      return 10;
-    }
-  };
-
   private Map<String, VersionedIntervalTimeline<String, DataSegment>> 
dataSources;
 
   @Before
@@ -156,7 +94,7 @@ public class CompactSegmentsTest
     List<DataSegment> segments = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       final String dataSource = DATA_SOURCE_PREFIX + i;
-      for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
+      for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
         for (int k = 0; k < 4; k++) {
           segments.add(createSegment(dataSource, j, true, k));
           segments.add(createSegment(dataSource, j, false, k));
@@ -202,7 +140,11 @@ public class CompactSegmentsTest
   @Test
   public void testRun()
   {
-    final CompactSegments compactSegments = new CompactSegments(new 
DefaultObjectMapper(), indexingServiceClient);
+    final ObjectMapper jsonMapper = new DefaultObjectMapper();
+    final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(jsonMapper);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new 
HttpIndexingServiceClient(jsonMapper, leaderClient);
+    final CompactSegments compactSegments = new CompactSegments(jsonMapper, 
indexingServiceClient);
 
     final Supplier<String> expectedVersionSupplier = new Supplier<String>()
     {
@@ -408,4 +350,176 @@ public class CompactSegmentsTest
     }
     return compactionConfigs;
   }
+
+  private class TestDruidLeaderClient extends DruidLeaderClient
+  {
+    private final ObjectMapper jsonMapper;
+
+    private int compactVersionSuffix = 0;
+    private int idSuffix = 0;
+
+    private TestDruidLeaderClient(ObjectMapper jsonMapper)
+    {
+      super(null, new TestNodeDiscoveryProvider(), null, null);
+      this.jsonMapper = jsonMapper;
+    }
+
+    @Override
+    public Request makeRequest(HttpMethod httpMethod, String urlPath) throws 
IOException
+    {
+      return new Request(httpMethod, new URL("http", "host", 8090, urlPath));
+    }
+
+    @Override
+    public StringFullResponseHolder go(Request request) throws IOException
+    {
+      final String urlString = request.getUrl().toString();
+      if (urlString.contains("/druid/indexer/v1/task")) {
+        return handleTask(request);
+      } else if (urlString.contains("/druid/indexer/v1/workers")) {
+        return handleWorkers();
+      } else if (urlString.contains("/druid/indexer/v1/waitingTasks")
+                 || urlString.contains("/druid/indexer/v1/pendingTasks")
+                 || urlString.contains("/druid/indexer/v1/runningTasks")) {
+        return 
createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
+      } else {
+        throw new IAE("Cannot handle request for url[%s]", request.getUrl());
+      }
+    }
+
+    private StringFullResponseHolder createStringFullResponseHolder(String 
content)
+    {
+      final HttpResponse httpResponse = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      final StringFullResponseHolder holder = new StringFullResponseHolder(
+          HttpResponseStatus.OK,
+          httpResponse,
+          StandardCharsets.UTF_8
+      );
+      holder.addChunk(content);
+      return holder;
+    }
+
+    private StringFullResponseHolder handleWorkers() throws 
JsonProcessingException
+    {
+      final List<IndexingWorkerInfo> workerInfos = new ArrayList<>();
+      // There are 10 workers available in this test
+      for (int i = 0; i < 10; i++) {
+        workerInfos.add(
+            new IndexingWorkerInfo(
+                new IndexingWorker("http", "host", "8091", 1, "version"),
+                0,
+                Collections.emptySet(),
+                Collections.emptyList(),
+                DateTimes.EPOCH,
+                null
+            )
+        );
+      }
+      return 
createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
+    }
+
+    private StringFullResponseHolder handleTask(Request request) throws 
IOException
+    {
+      final ClientTaskQuery taskQuery = 
jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
+      if (!(taskQuery instanceof ClientCompactionTaskQuery)) {
+        throw new IAE("Cannot run non-compaction task");
+      }
+      final ClientCompactionTaskQuery compactionTaskQuery = 
(ClientCompactionTaskQuery) taskQuery;
+      final Interval intervalToCompact = 
compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
+      final VersionedIntervalTimeline<String, DataSegment> timeline = 
dataSources.get(
+          compactionTaskQuery.getDataSource()
+      );
+      final List<DataSegment> segments = timeline.lookup(intervalToCompact)
+                                                 .stream()
+                                                 .flatMap(holder -> 
Streams.sequentialStreamFrom(holder.getObject()))
+                                                 
.map(PartitionChunk::getObject)
+                                                 .collect(Collectors.toList());
+      final String taskId = compactSegments(
+          timeline,
+          segments,
+          compactionTaskQuery.getTuningConfig()
+      );
+      return 
createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task",
 taskId)));
+    }
+
+    private String compactSegments(
+        VersionedIntervalTimeline<String, DataSegment> timeline,
+        List<DataSegment> segments,
+        ClientCompactionTaskQueryTuningConfig tuningConfig
+    )
+    {
+      Preconditions.checkArgument(segments.size() > 1);
+      DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
+      for (DataSegment segment : segments) {
+        if (segment.getInterval().getStart().compareTo(minStart) < 0) {
+          minStart = segment.getInterval().getStart();
+        }
+        if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
+          maxEnd = segment.getInterval().getEnd();
+        }
+      }
+      Interval compactInterval = new Interval(minStart, maxEnd);
+      segments.forEach(
+          segment -> timeline.remove(
+              segment.getInterval(),
+              segment.getVersion(),
+              segment.getShardSpec().createChunk(segment)
+          )
+      );
+      final String version = "newVersion_" + compactVersionSuffix++;
+      final long segmentSize = 
segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
+      for (int i = 0; i < 2; i++) {
+        DataSegment compactSegment = new DataSegment(
+            segments.get(0).getDataSource(),
+            compactInterval,
+            version,
+            null,
+            segments.get(0).getDimensions(),
+            segments.get(0).getMetrics(),
+            new NumberedShardSpec(i, 0),
+            new CompactionState(
+                new DynamicPartitionsSpec(
+                    tuningConfig.getMaxRowsPerSegment(),
+                    tuningConfig.getMaxTotalRowsOr(Long.MAX_VALUE)
+                ),
+                ImmutableMap.of(
+                    "bitmap",
+                    ImmutableMap.of("type", "concise"),
+                    "dimensionCompression",
+                    "lz4",
+                    "metricCompression",
+                    "lz4",
+                    "longEncoding",
+                    "longs"
+                )
+            ),
+            1,
+            segmentSize
+        );
+
+        timeline.add(
+            compactInterval,
+            compactSegment.getVersion(),
+            compactSegment.getShardSpec().createChunk(compactSegment)
+        );
+      }
+
+      return "task_" + idSuffix++;
+    }
+  }
+
+  private static class TestNodeDiscoveryProvider extends 
DruidNodeDiscoveryProvider
+  {
+    @Override
+    public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
+    {
+      return EasyMock.niceMock(DruidNodeDiscovery.class);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to