jihoonson closed pull request #6676: Handoff should ignore segments that are
dropped by drop rules
URL: https://github.com/apache/incubator-druid/pull/6676
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 4f59d87a849..ef731b31869 100644
---
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -23,16 +23,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.joda.time.Interval;
-
-import java.util.List;
public class CoordinatorClient
{
@@ -49,18 +46,20 @@ public CoordinatorClient(
this.druidLeaderClient = druidLeaderClient;
}
-
- public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource,
Interval interval, boolean incompleteOk)
+ public boolean isHandOffComplete(String dataSource, SegmentDescriptor
descriptor)
{
try {
FullResponseHolder response = druidLeaderClient.go(
- druidLeaderClient.makeRequest(HttpMethod.GET,
- StringUtils.format(
-
"/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
- dataSource,
- interval.toString().replace('/',
'_'),
- incompleteOk
- ))
+ druidLeaderClient.makeRequest(
+ HttpMethod.GET,
+ StringUtils.format(
+
"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
+ dataSource,
+ descriptor.getInterval(),
+ descriptor.getPartitionNumber(),
+ descriptor.getVersion()
+ )
+ )
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -70,12 +69,9 @@ public CoordinatorClient(
response.getContent()
);
}
- return jsonMapper.readValue(
- response.getContent(), new
TypeReference<List<ImmutableSegmentLoadInfo>>()
- {
-
- }
- );
+ return jsonMapper.readValue(response.getContent(), new
TypeReference<Boolean>()
+ {
+ });
}
catch (Exception e) {
throw Throwables.propagate(e);
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index 6d062151e83..028183f943b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -19,18 +19,13 @@
package org.apache.druid.segment.realtime.plumber;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -95,13 +90,7 @@ void checkForSegmentHandoffs()
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry =
itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
- List<ImmutableSegmentLoadInfo> loadedSegments =
coordinatorClient.fetchServerView(
- dataSource,
- descriptor.getInterval(),
- true
- );
-
- if (isHandOffComplete(loadedSegments, entry.getKey())) {
+ if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
log.info("Segment Handoff complete for dataSource[%s]
Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
@@ -131,30 +120,6 @@ void checkForSegmentHandoffs()
}
}
-
- static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView,
SegmentDescriptor descriptor)
- {
- for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
- if
(segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
- && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
- == descriptor.getPartitionNumber()
- &&
segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
- && Iterables.any(
- segmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>()
- {
- @Override
- public boolean apply(DruidServerMetadata input)
- {
- return input.segmentReplicatable();
- }
- }
- )) {
- return true;
- }
- }
- return false;
- }
-
@Override
public void close()
{
diff --git
a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
index 450dbd4af08..03d3e6ad198 100644
--- a/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DatasourcesResource.java
@@ -37,8 +37,13 @@
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.MetadataSegmentManager;
+import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.rules.LoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -85,6 +90,7 @@
private final CoordinatorServerView serverInventoryView;
private final MetadataSegmentManager databaseSegmentManager;
+ private final MetadataRuleManager databaseRuleManager;
private final IndexingServiceClient indexingServiceClient;
private final AuthConfig authConfig;
private final AuthorizerMapper authorizerMapper;
@@ -93,6 +99,7 @@
public DatasourcesResource(
CoordinatorServerView serverInventoryView,
MetadataSegmentManager databaseSegmentManager,
+ MetadataRuleManager databaseRuleManager,
@Nullable IndexingServiceClient indexingServiceClient,
AuthConfig authConfig,
AuthorizerMapper authorizerMapper
@@ -100,6 +107,7 @@ public DatasourcesResource(
{
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
+ this.databaseRuleManager = databaseRuleManager;
this.indexingServiceClient = indexingServiceClient;
this.authConfig = authConfig;
this.authorizerMapper = authorizerMapper;
@@ -647,4 +655,85 @@ public Response getSegmentDataSourceSpecificInterval(
);
return Response.ok(retval).build();
}
+
+ /**
+ * Used by the realtime tasks to learn whether a segment is handed off or
not.
+ * It returns true when the segment will never be handed off or is already
handed off. Otherwise, it returns false.
+ */
+ @GET
+ @Path("/{dataSourceName}/handoffComplete")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response isHandOffComplete(
+ @PathParam("dataSourceName") String dataSourceName,
+ @QueryParam("interval") final String interval,
+ @QueryParam("partitionNumber") final int partitionNumber,
+ @QueryParam("version") final String version
+ )
+ {
+ try {
+ final List<Rule> rules =
databaseRuleManager.getRulesWithDefault(dataSourceName);
+ final Interval theInterval = Intervals.of(interval);
+ final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval,
version, partitionNumber);
+ final DateTime now = DateTimes.nowUtc();
+ // dropped means a segment will never be handed off, i.e it completed
hand off
+ // init to true, reset to false only if this segment can be loaded by
rules
+ boolean dropped = true;
+ for (Rule rule : rules) {
+ if (rule.appliesTo(theInterval, now)) {
+ if (rule instanceof LoadRule) {
+ dropped = false;
+ }
+ break;
+ }
+ }
+ if (dropped) {
+ return Response.ok(true).build();
+ }
+
+ TimelineLookup<String, SegmentLoadInfo> timeline =
serverInventoryView.getTimeline(
+ new TableDataSource(dataSourceName)
+ );
+ if (timeline == null) {
+ log.debug("No timeline found for datasource[%s]", dataSourceName);
+ return Response.ok(false).build();
+ }
+
+ Iterable<TimelineObjectHolder<String, SegmentLoadInfo>> lookup =
timeline.lookupWithIncompletePartitions(
+ theInterval);
+ FunctionalIterable<ImmutableSegmentLoadInfo> loadInfoIterable =
FunctionalIterable
+ .create(lookup).transformCat(
+ (TimelineObjectHolder<String, SegmentLoadInfo> input) ->
+ Iterables.transform(
+ input.getObject(),
+ (PartitionChunk<SegmentLoadInfo> chunk) ->
+ chunk.getObject().toImmutableSegmentLoadInfo()
+ )
+ );
+ if (isSegmentLoaded(loadInfoIterable, descriptor)) {
+ return Response.ok(true).build();
+ }
+
+ return Response.ok(false).build();
+ }
+ catch (Exception e) {
+ log.error(e, "Error while handling hand off check request");
+ return Response.serverError().entity(ImmutableMap.of("error",
e.toString())).build();
+ }
+ }
+
+ static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo>
serverView, SegmentDescriptor descriptor)
+ {
+ for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
+ if
(segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
+ && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() ==
descriptor.getPartitionNumber()
+ &&
segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
+ && Iterables.any(
+ segmentLoadInfo.getServers(),
DruidServerMetadata::segmentReplicatable
+ )) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 15964158073..228d7957843 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -19,23 +19,16 @@
package org.apache.druid.segment.realtime.plumber;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
-import junit.framework.Assert;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
+import org.junit.Assert;
import org.junit.Test;
-import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
public class CoordinatorBasedSegmentHandoffNotifierTest
@@ -55,27 +48,10 @@ public void testHandoffCallbackNotCalled()
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
- DataSegment segment = new DataSegment(
- "test_ds",
- interval,
- "v1",
- null,
- null,
- null,
- new NumberedShardSpec(2, 3),
- 0, 0
- );
CoordinatorClient coordinatorClient =
EasyMock.createMock(CoordinatorClient.class);
- EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval,
true))
- .andReturn(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- segment,
- Sets.newHashSet(createRealtimeServerMetadata("a1"))
- )
- )
- )
+ EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+ .andReturn(false)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new
CoordinatorBasedSegmentHandoffNotifier(
@@ -102,27 +78,11 @@ public void testHandoffCallbackCalled()
{
Interval interval = Intervals.of("2011-04-01/2011-04-02");
SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2);
- DataSegment segment = new DataSegment(
- "test_ds",
- interval,
- "v1",
- null,
- null,
- null,
- new NumberedShardSpec(2, 3),
- 0, 0
- );
+
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
CoordinatorClient coordinatorClient =
EasyMock.createMock(CoordinatorClient.class);
- EasyMock.expect(coordinatorClient.fetchServerView("test_ds", interval,
true))
- .andReturn(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- segment,
- Sets.newHashSet(createHistoricalServerMetadata("a1"))
- )
- )
- )
+ EasyMock.expect(coordinatorClient.isHandOffComplete("test_ds", descriptor))
+ .andReturn(true)
.anyTimes();
EasyMock.replay(coordinatorClient);
CoordinatorBasedSegmentHandoffNotifier notifier = new
CoordinatorBasedSegmentHandoffNotifier(
@@ -144,177 +104,4 @@ public void testHandoffCallbackCalled()
Assert.assertTrue(callbackCalled.get());
EasyMock.verify(coordinatorClient);
}
-
- @Test
- public void testHandoffChecksForVersion()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v2", 2)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v2", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- }
-
- @Test
- public void testHandoffChecksForAssignableServer()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 2),
- Sets.newHashSet(createRealtimeServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
- }
-
- @Test
- public void testHandoffChecksForPartitionNumber()
- {
- Interval interval = Intervals.of(
- "2011-04-01/2011-04-02"
- );
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 1)
- )
- );
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(interval, "v1", 1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(interval, "v1", 2)
- )
- );
-
- }
-
- @Test
- public void testHandoffChecksForInterval()
- {
-
- Assert.assertFalse(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1",
1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1",
1)
- )
- );
-
- Assert.assertTrue(
- CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
- Collections.singletonList(
- new ImmutableSegmentLoadInfo(
- createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1",
1),
- Sets.newHashSet(createHistoricalServerMetadata("a"))
- )
- ),
- new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1",
1)
- )
- );
- }
-
- private DruidServerMetadata createRealtimeServerMetadata(String name)
- {
- return createServerMetadata(name, ServerType.REALTIME);
- }
-
- private DruidServerMetadata createHistoricalServerMetadata(String name)
- {
- return createServerMetadata(name, ServerType.HISTORICAL);
- }
-
- private DruidServerMetadata createServerMetadata(String name, ServerType
type)
- {
- return new DruidServerMetadata(
- name,
- name,
- null,
- 10000,
- type,
- "tier",
- 1
- );
- }
-
- private DataSegment createSegment(Interval interval, String version, int
partitionNumber)
- {
- return new DataSegment(
- "test_ds",
- interval,
- version,
- null,
- null,
- null,
- new NumberedShardSpec(partitionNumber, 100),
- 0, 0
- );
- }
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
index d42b6783f5d..3d2c78654bd 100644
---
a/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/DatasourcesResourceTest.java
@@ -21,13 +21,23 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.SegmentLoadInfo;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.rules.IntervalDropRule;
+import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@@ -37,6 +47,11 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -46,6 +61,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -156,6 +172,7 @@ public void testGetFullQueryableDataSources()
inventoryView,
null,
null,
+ null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@@ -240,6 +257,7 @@ public Access authorize(AuthenticationResult
authenticationResult1, Resource res
inventoryView,
null,
null,
+ null,
new AuthConfig(),
authMapper
);
@@ -294,6 +312,7 @@ public void testGetSimpleQueryableDataSources()
inventoryView,
null,
null,
+ null,
new AuthConfig(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
@@ -323,7 +342,7 @@ public void testFullGetTheDataSource()
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1",
"full");
ImmutableDruidDataSource result = (ImmutableDruidDataSource)
response.getEntity();
Assert.assertEquals(200, response.getStatus());
@@ -340,7 +359,7 @@ public void testNullGetTheDataSource()
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Assert.assertEquals(204, datasourcesResource.getTheDataSource("none",
null).getStatus());
EasyMock.verify(inventoryView, server);
}
@@ -361,7 +380,7 @@ public void testSimpleGetTheDataSource()
).atLeastOnce();
EasyMock.replay(inventoryView, server);
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1",
null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String,
Object>>) response.getEntity();
@@ -400,7 +419,7 @@ public void testSimpleGetTheDataSourceManyTiers()
).atLeastOnce();
EasyMock.replay(inventoryView, server, server2, server3);
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1",
null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String,
Object>>) response.getEntity();
@@ -431,7 +450,7 @@ public void testGetSegmentDataSourceIntervals()
List<Interval> expectedIntervals = new ArrayList<>();
expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response =
datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null,
null);
Assert.assertEquals(response.getEntity(), null);
@@ -478,7 +497,7 @@ public void testGetSegmentDataSourceSpecificInterval()
).atLeastOnce();
EasyMock.replay(inventoryView);
- DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
+ DatasourcesResource datasourcesResource = new
DatasourcesResource(inventoryView, null, null, null, new AuthConfig(), null);
Response response =
datasourcesResource.getSegmentDataSourceSpecificInterval(
"invalidDataSource",
"2010-01-01/P1D",
@@ -548,6 +567,7 @@ public void testDeleteDataSourceSpecificInterval()
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
+ null,
indexingServiceClient,
new AuthConfig(),
null
@@ -567,6 +587,7 @@ public void testDeleteDataSource()
DatasourcesResource datasourcesResource = new DatasourcesResource(
inventoryView,
null,
+ null,
indexingServiceClient,
new AuthConfig(),
null
@@ -579,4 +600,254 @@ public void testDeleteDataSource()
EasyMock.verify(indexingServiceClient, server);
}
+ @Test
+ public void testIsHandOffComplete()
+ {
+ MetadataRuleManager databaseRuleManager =
EasyMock.createMock(MetadataRuleManager.class);
+ Rule loadRule = new
IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"),
null);
+ Rule dropRule = new
IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
+ DatasourcesResource datasourcesResource = new DatasourcesResource(
+ inventoryView,
+ null,
+ databaseRuleManager,
+ null,
+ new AuthConfig(),
+ null
+ );
+
+ // test dropped
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.replay(databaseRuleManager);
+
+ String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+ Response response1 = datasourcesResource.isHandOffComplete("dataSource1",
interval1, 1, "v1");
+ Assert.assertTrue((boolean) response1.getEntity());
+
+ EasyMock.verify(databaseRuleManager);
+
+ // test isn't dropped and no timeline found
+ EasyMock.reset(databaseRuleManager);
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.expect(inventoryView.getTimeline(new
TableDataSource("dataSource1")))
+ .andReturn(null)
+ .once();
+ EasyMock.replay(inventoryView, databaseRuleManager);
+
+ String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z";
+ Response response2 = datasourcesResource.isHandOffComplete("dataSource1",
interval2, 1, "v1");
+ Assert.assertFalse((boolean) response2.getEntity());
+
+ EasyMock.verify(inventoryView, databaseRuleManager);
+
+ // test isn't dropped and timeline exist
+ String interval3 = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z";
+ SegmentLoadInfo segmentLoadInfo = new
SegmentLoadInfo(createSegment(Intervals.of(interval3), "v1", 1));
+ segmentLoadInfo.addServer(createHistoricalServerMetadata("test"));
+ VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new
VersionedIntervalTimeline<String, SegmentLoadInfo>(
+ null)
+ {
+ @Override
+ public List<TimelineObjectHolder<String, SegmentLoadInfo>>
lookupWithIncompletePartitions(Interval interval)
+ {
+ PartitionHolder<SegmentLoadInfo> partitionHolder = new
PartitionHolder<>(new NumberedPartitionChunk<>(
+ 1,
+ 1,
+ segmentLoadInfo
+ ));
+ List<TimelineObjectHolder<String, SegmentLoadInfo>> ret = new
ArrayList<>();
+ ret.add(new TimelineObjectHolder<>(Intervals.of(interval3), "v1",
partitionHolder));
+ return ret;
+ }
+ };
+ EasyMock.reset(inventoryView, databaseRuleManager);
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
+ .andReturn(ImmutableList.of(loadRule, dropRule))
+ .once();
+ EasyMock.expect(inventoryView.getTimeline(new
TableDataSource("dataSource1")))
+ .andReturn(timeline)
+ .once();
+ EasyMock.replay(inventoryView, databaseRuleManager);
+
+ Response response3 = datasourcesResource.isHandOffComplete("dataSource1",
interval3, 1, "v1");
+ Assert.assertTrue((boolean) response3.getEntity());
+
+ EasyMock.verify(inventoryView, databaseRuleManager);
+ }
+
+ @Test
+ public void testSegmentLoadChecksForVersion()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v2", 2)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v2", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ }
+
+ @Test
+ public void testSegmentLoadChecksForAssignableServer()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 2),
+ Sets.newHashSet(createRealtimeServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+ }
+
+ @Test
+ public void testSegmentLoadChecksForPartitionNumber()
+ {
+ Interval interval = Intervals.of(
+ "2011-04-01/2011-04-02"
+ );
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 1)
+ )
+ );
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(interval, "v1", 1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(interval, "v1", 2)
+ )
+ );
+
+ }
+
+ @Test
+ public void testSegmentLoadChecksForInterval()
+ {
+
+ Assert.assertFalse(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1",
1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1",
1)
+ )
+ );
+
+ Assert.assertTrue(
+ DatasourcesResource.isSegmentLoaded(
+ Collections.singletonList(
+ new ImmutableSegmentLoadInfo(
+ createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1",
1),
+ Sets.newHashSet(createHistoricalServerMetadata("a"))
+ )
+ ),
+ new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1",
1)
+ )
+ );
+ }
+
+ private DruidServerMetadata createRealtimeServerMetadata(String name)
+ {
+ return createServerMetadata(name, ServerType.REALTIME);
+ }
+
+ private DruidServerMetadata createHistoricalServerMetadata(String name)
+ {
+ return createServerMetadata(name, ServerType.HISTORICAL);
+ }
+
+ private DruidServerMetadata createServerMetadata(String name, ServerType
type)
+ {
+ return new DruidServerMetadata(
+ name,
+ name,
+ null,
+ 10000,
+ type,
+ "tier",
+ 1
+ );
+ }
+
+ private DataSegment createSegment(Interval interval, String version, int
partitionNumber)
+ {
+ return new DataSegment(
+ "test_ds",
+ interval,
+ version,
+ null,
+ null,
+ null,
+ new NumberedShardSpec(partitionNumber, 100),
+ 0, 0
+ );
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]