This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 96eb69e ignore brokers in broker views (#10017)
96eb69e is described below
commit 96eb69e4757b4f6c9c31e0262007df39109d4454
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 10 12:29:30 2020 -0700
ignore brokers in broker views (#10017)
---
.../org/apache/druid/client/BrokerServerView.java | 11 ++
.../apache/druid/client/BrokerServerViewTest.java | 143 +++++++++++++++++----
.../druid/sql/calcite/schema/DruidSchema.java | 10 ++
.../druid/sql/calcite/schema/DruidSchemaTest.java | 32 +++++
4 files changed, 171 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 337e9a4..3b5d408 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -42,6 +42,7 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -217,6 +218,12 @@ public class BrokerServerView implements TimelineServerView
private void serverAddedSegment(final DruidServerMetadata server, final
DataSegment segment)
{
+ if (server.getType().equals(ServerType.BROKER)) {
+ // in theory we could just filter this to ensure we don't put ourselves
in here, to make dope broker tree
+ // query topologies, but for now just skip all brokers, so we don't
create some sort of wild infinite query
+ // loop...
+ return;
+ }
SegmentId segmentId = segment.getId();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
@@ -246,6 +253,10 @@ public class BrokerServerView implements TimelineServerView
private void serverRemovedSegment(DruidServerMetadata server, DataSegment
segment)
{
+ if (server.getType().equals(ServerType.BROKER)) {
+ // might as well save the trouble of grabbing a lock for something that
isn't there..
+ return;
+ }
SegmentId segmentId = segment.getId();
final ServerSelector selector;
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index dd3961e..30b93a1 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
-import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -161,22 +160,15 @@ public class BrokerServerViewTest extends CuratorTestBase
final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
- new Function<String, DruidServer>()
- {
- @Override
- public DruidServer apply(String input)
- {
- return new DruidServer(
- input,
- input,
- null,
- 10000000L,
- ServerType.HISTORICAL,
- "default_tier",
- 0
- );
- }
- }
+ input -> new DruidServer(
+ input,
+ input,
+ null,
+ 10000000L,
+ ServerType.HISTORICAL,
+ "default_tier",
+ 0
+ )
);
for (DruidServer druidServer : druidServers) {
@@ -190,14 +182,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
- ), new Function<Pair<String, String>, DataSegment>()
- {
- @Override
- public DataSegment apply(Pair<String, String> input)
- {
- return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
- }
- }
+ ), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);
for (int i = 0; i < 5; ++i) {
@@ -261,6 +246,114 @@ public class BrokerServerViewTest extends CuratorTestBase
);
}
+ @Test
+ public void testMultipleServerAndBroker() throws Exception
+ {
+ segmentViewInitLatch = new CountDownLatch(1);
+ segmentAddedLatch = new CountDownLatch(6);
+
+ // temporarily set latch count to 1
+ segmentRemovedLatch = new CountDownLatch(1);
+
+ setupViews();
+
+ final DruidServer druidBroker = new DruidServer(
+ "localhost:5",
+ "localhost:5",
+ null,
+ 10000000L,
+ ServerType.BROKER,
+ "default_tier",
+ 0
+ );
+
+ final List<DruidServer> druidServers = Lists.transform(
+ ImmutableList.of("locahost:0", "localhost:1", "localhost:2",
"localhost:3", "localhost:4"),
+ input -> new DruidServer(
+ input,
+ input,
+ null,
+ 10000000L,
+ ServerType.HISTORICAL,
+ "default_tier",
+ 0
+ )
+ );
+
+ setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
+ for (DruidServer druidServer : druidServers) {
+ setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
+ }
+
+ final List<DataSegment> segments = Lists.transform(
+ ImmutableList.of(
+ Pair.of("2011-04-01/2011-04-03", "v1"),
+ Pair.of("2011-04-03/2011-04-06", "v1"),
+ Pair.of("2011-04-01/2011-04-09", "v2"),
+ Pair.of("2011-04-06/2011-04-09", "v3"),
+ Pair.of("2011-04-01/2011-04-02", "v3")
+ ),
+ input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
+ );
+
+ DataSegment brokerSegment =
dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4");
+ announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig,
jsonMapper);
+ for (int i = 0; i < 5; ++i) {
+ announceSegmentForServer(druidServers.get(i), segments.get(i),
zkPathsConfig, jsonMapper);
+ }
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+ TimelineLookup timeline = brokerServerView.getTimeline(
+ DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ ).get();
+
+ assertValues(
+ Arrays.asList(
+ createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4),
segments.get(4)),
+ createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2),
segments.get(2)),
+ createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3),
segments.get(3))
+ ),
+ (List<TimelineObjectHolder>) timeline.lookup(
+ Intervals.of(
+ "2011-04-01/2011-04-09"
+ )
+ )
+ );
+
+ // unannounce the broker segment should do nothing to announcements
+ unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+ // renew segmentRemovedLatch since we still have 5 segments to unannounce
+ segmentRemovedLatch = new CountDownLatch(5);
+
+ timeline = brokerServerView.getTimeline(
+ DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ ).get();
+
+ // expect same set of segments as before
+ assertValues(
+ Arrays.asList(
+ createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4),
segments.get(4)),
+ createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2),
segments.get(2)),
+ createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3),
segments.get(3))
+ ),
+ (List<TimelineObjectHolder>) timeline.lookup(
+ Intervals.of(
+ "2011-04-01/2011-04-09"
+ )
+ )
+ );
+
+ // unannounce all the segments
+ for (int i = 0; i < 5; ++i) {
+ unannounceSegmentForServer(druidServers.get(i), segments.get(i),
zkPathsConfig);
+ }
+ Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+ }
+
+
private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>>
createExpected(
String intervalStr,
String version,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 54e7e5a..35037e2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -350,6 +350,12 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
+ if (server.getType().equals(ServerType.BROKER)) {
+ // in theory we could just filter this to ensure we don't put ourselves
in here, to make dope broker tree
+ // query topologies, but for now just skip all brokers, so we don't
create some sort of wild infinite metadata
+ // loop...
+ return;
+ }
synchronized (lock) {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments =
segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ?
knownSegments.get(segment.getId()) : null;
@@ -428,6 +434,10 @@ public class DruidSchema extends AbstractSchema
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment
segment)
{
+ if (server.getType().equals(ServerType.BROKER)) {
+ // cheese it
+ return;
+ }
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(),
server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments =
segmentMetadataInfo.get(segment.getDataSource());
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 6e2f8f6..ee3bca1 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -57,6 +57,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.AfterClass;
@@ -418,4 +419,35 @@ public class DruidSchemaTest extends CalciteTestBase
Assert.assertEquals(0L, currentMetadata.isRealtime());
}
+ @Test
+ public void testAvailableSegmentFromBrokerIsIgnored()
+ {
+
+ Assert.assertEquals(4, schema.getTotalSegments());
+
+ DruidServerMetadata metadata = new DruidServerMetadata(
+ "broker",
+ "localhost:0",
+ null,
+ 1000L,
+ ServerType.BROKER,
+ "broken",
+ 0
+ );
+
+ DataSegment segment = new DataSegment(
+ "test",
+ Intervals.of("2011-04-01/2011-04-11"),
+ "v1",
+ ImmutableMap.of(),
+ ImmutableList.of(),
+ ImmutableList.of(),
+ NoneShardSpec.instance(),
+ 1,
+ 100L
+ );
+ schema.addSegment(metadata, segment);
+ Assert.assertEquals(4, schema.getTotalSegments());
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]