This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 96eb69e  ignore brokers in broker views (#10017)
96eb69e is described below

commit 96eb69e4757b4f6c9c31e0262007df39109d4454
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 10 12:29:30 2020 -0700

    ignore brokers in broker views (#10017)
---
 .../org/apache/druid/client/BrokerServerView.java  |  11 ++
 .../apache/druid/client/BrokerServerViewTest.java  | 143 +++++++++++++++++----
 .../druid/sql/calcite/schema/DruidSchema.java      |  10 ++
 .../druid/sql/calcite/schema/DruidSchemaTest.java  |  32 +++++
 4 files changed, 171 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java 
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 337e9a4..3b5d408 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -42,6 +42,7 @@ import org.apache.druid.query.QueryWatcher;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -217,6 +218,12 @@ public class BrokerServerView implements TimelineServerView
 
   private void serverAddedSegment(final DruidServerMetadata server, final 
DataSegment segment)
   {
+    if (server.getType().equals(ServerType.BROKER)) {
+      // in theory we could just filter this to ensure we don't put ourselves 
in here, to make dope broker tree
+      // query topologies, but for now just skip all brokers, so we don't 
create some sort of wild infinite query
+      // loop...
+      return;
+    }
     SegmentId segmentId = segment.getId();
     synchronized (lock) {
       log.debug("Adding segment[%s] for server[%s]", segment, server);
@@ -246,6 +253,10 @@ public class BrokerServerView implements TimelineServerView
 
   private void serverRemovedSegment(DruidServerMetadata server, DataSegment 
segment)
   {
+    if (server.getType().equals(ServerType.BROKER)) {
+      // might as well save the trouble of grabbing a lock for something that 
isn't there..
+      return;
+    }
     SegmentId segmentId = segment.getId();
     final ServerSelector selector;
 
diff --git 
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java 
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index dd3961e..30b93a1 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.client;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
-import com.google.common.base.Function;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -161,22 +160,15 @@ public class BrokerServerViewTest extends CuratorTestBase
 
     final List<DruidServer> druidServers = Lists.transform(
         ImmutableList.of("locahost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
-        new Function<String, DruidServer>()
-        {
-          @Override
-          public DruidServer apply(String input)
-          {
-            return new DruidServer(
-                input,
-                input,
-                null,
-                10000000L,
-                ServerType.HISTORICAL,
-                "default_tier",
-                0
-            );
-          }
-        }
+        input -> new DruidServer(
+            input,
+            input,
+            null,
+            10000000L,
+            ServerType.HISTORICAL,
+            "default_tier",
+            0
+        )
     );
 
     for (DruidServer druidServer : druidServers) {
@@ -190,14 +182,7 @@ public class BrokerServerViewTest extends CuratorTestBase
             Pair.of("2011-04-01/2011-04-09", "v2"),
             Pair.of("2011-04-06/2011-04-09", "v3"),
             Pair.of("2011-04-01/2011-04-02", "v3")
-        ), new Function<Pair<String, String>, DataSegment>()
-        {
-          @Override
-          public DataSegment apply(Pair<String, String> input)
-          {
-            return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
-          }
-        }
+        ), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
     );
 
     for (int i = 0; i < 5; ++i) {
@@ -261,6 +246,114 @@ public class BrokerServerViewTest extends CuratorTestBase
     );
   }
 
+  @Test
+  public void testMultipleServerAndBroker() throws Exception
+  {
+    segmentViewInitLatch = new CountDownLatch(1);
+    segmentAddedLatch = new CountDownLatch(6);
+
+    // temporarily set latch count to 1
+    segmentRemovedLatch = new CountDownLatch(1);
+
+    setupViews();
+
+    final DruidServer druidBroker = new DruidServer(
+        "localhost:5",
+        "localhost:5",
+        null,
+        10000000L,
+        ServerType.BROKER,
+        "default_tier",
+        0
+    );
+
+    final List<DruidServer> druidServers = Lists.transform(
+        ImmutableList.of("locahost:0", "localhost:1", "localhost:2", 
"localhost:3", "localhost:4"),
+        input -> new DruidServer(
+            input,
+            input,
+            null,
+            10000000L,
+            ServerType.HISTORICAL,
+            "default_tier",
+            0
+        )
+    );
+
+    setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
+    for (DruidServer druidServer : druidServers) {
+      setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
+    }
+
+    final List<DataSegment> segments = Lists.transform(
+        ImmutableList.of(
+            Pair.of("2011-04-01/2011-04-03", "v1"),
+            Pair.of("2011-04-03/2011-04-06", "v1"),
+            Pair.of("2011-04-01/2011-04-09", "v2"),
+            Pair.of("2011-04-06/2011-04-09", "v3"),
+            Pair.of("2011-04-01/2011-04-02", "v3")
+        ),
+        input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
+    );
+
+    DataSegment brokerSegment = 
dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4");
+    announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig, 
jsonMapper);
+    for (int i = 0; i < 5; ++i) {
+      announceSegmentForServer(druidServers.get(i), segments.get(i), 
zkPathsConfig, jsonMapper);
+    }
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
+
+    TimelineLookup timeline = brokerServerView.getTimeline(
+        DataSourceAnalysis.forDataSource(new 
TableDataSource("test_broker_server_view"))
+    ).get();
+
+    assertValues(
+        Arrays.asList(
+            createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), 
segments.get(4)),
+            createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), 
segments.get(2)),
+            createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), 
segments.get(3))
+        ),
+        (List<TimelineObjectHolder>) timeline.lookup(
+            Intervals.of(
+                "2011-04-01/2011-04-09"
+            )
+        )
+    );
+
+    // unannounce the broker segment should do nothing to announcements
+    unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+
+    // renew segmentRemovedLatch since we still have 5 segments to unannounce
+    segmentRemovedLatch = new CountDownLatch(5);
+
+    timeline = brokerServerView.getTimeline(
+        DataSourceAnalysis.forDataSource(new 
TableDataSource("test_broker_server_view"))
+    ).get();
+
+    // expect same set of segments as before
+    assertValues(
+        Arrays.asList(
+            createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), 
segments.get(4)),
+            createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), 
segments.get(2)),
+            createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), 
segments.get(3))
+        ),
+        (List<TimelineObjectHolder>) timeline.lookup(
+            Intervals.of(
+                "2011-04-01/2011-04-09"
+            )
+        )
+    );
+
+    // unannounce all the segments
+    for (int i = 0; i < 5; ++i) {
+      unannounceSegmentForServer(druidServers.get(i), segments.get(i), 
zkPathsConfig);
+    }
+    Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
+  }
+
+
   private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> 
createExpected(
       String intervalStr,
       String version,
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 54e7e5a..35037e2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -350,6 +350,12 @@ public class DruidSchema extends AbstractSchema
   @VisibleForTesting
   void addSegment(final DruidServerMetadata server, final DataSegment segment)
   {
+    if (server.getType().equals(ServerType.BROKER)) {
+      // in theory we could just filter this to ensure we don't put ourselves 
in here, to make dope broker tree
+      // query topologies, but for now just skip all brokers, so we don't 
create some sort of wild infinite metadata
+      // loop...
+      return;
+    }
     synchronized (lock) {
       final Map<SegmentId, AvailableSegmentMetadata> knownSegments = 
segmentMetadataInfo.get(segment.getDataSource());
       AvailableSegmentMetadata segmentMetadata = knownSegments != null ? 
knownSegments.get(segment.getId()) : null;
@@ -428,6 +434,10 @@ public class DruidSchema extends AbstractSchema
   @VisibleForTesting
   void removeServerSegment(final DruidServerMetadata server, final DataSegment 
segment)
   {
+    if (server.getType().equals(ServerType.BROKER)) {
+      // cheese it
+      return;
+    }
     synchronized (lock) {
       log.debug("Segment[%s] is gone from server[%s]", segment.getId(), 
server.getName());
       final Map<SegmentId, AvailableSegmentMetadata> knownSegments = 
segmentMetadataInfo.get(segment.getDataSource());
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 6e2f8f6..ee3bca1 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -57,6 +57,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -418,4 +419,35 @@ public class DruidSchemaTest extends CalciteTestBase
     Assert.assertEquals(0L, currentMetadata.isRealtime());
   }
 
+  @Test
+  public void testAvailableSegmentFromBrokerIsIgnored()
+  {
+
+    Assert.assertEquals(4, schema.getTotalSegments());
+
+    DruidServerMetadata metadata = new DruidServerMetadata(
+        "broker",
+        "localhost:0",
+        null,
+        1000L,
+        ServerType.BROKER,
+        "broken",
+        0
+    );
+
+    DataSegment segment = new DataSegment(
+        "test",
+        Intervals.of("2011-04-01/2011-04-11"),
+        "v1",
+        ImmutableMap.of(),
+        ImmutableList.of(),
+        ImmutableList.of(),
+        NoneShardSpec.instance(),
+        1,
+        100L
+    );
+    schema.addSegment(metadata, segment);
+    Assert.assertEquals(4, schema.getTotalSegments());
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to