This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 191572a  Add safeguard to make sure new Rules added are aware of Rule 
usage in loadstatus API (#10054)
191572a is described below

commit 191572ad5e250628ce7ab27259d1fe175a10b7a6
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Jun 19 17:18:56 2020 -1000

    Add safeguard to make sure new Rules added are aware of Rule usage in 
loadstatus API (#10054)
    
    * Add safeguard to make sure new Rules added are aware of Rule usuage in 
loadstatus API
    
    * address comments
    
    * address comments
    
    * add tests
---
 .../druid/server/coordinator/DruidCoordinator.java | 42 +++++-----------------
 .../server/coordinator/SegmentReplicantLookup.java | 28 +++++++++++++--
 .../druid/server/coordinator/ServerHolder.java     |  8 ++++-
 .../rules/BroadcastDistributionRule.java           | 35 ++++++++++++++++++
 .../druid/server/coordinator/rules/DropRule.java   |  6 ++++
 .../druid/server/coordinator/rules/LoadRule.java   | 29 +++++++++++++++
 .../druid/server/coordinator/rules/Rule.java       | 26 ++++++++++++++
 .../druid/server/coordinator/ServerHolderTest.java | 22 ++++++++++--
 8 files changed, 156 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 0dcf636..64168e1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -29,7 +29,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMaps;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.client.DataSourcesSnapshot;
@@ -69,7 +68,6 @@ import 
org.apache.druid.server.coordinator.duty.LogUsedSegments;
 import 
org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
 import org.apache.druid.server.coordinator.duty.RunRules;
 import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
-import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -275,13 +273,6 @@ public class DruidCoordinator
   )
   {
     final Map<String, Object2LongMap<String>> 
underReplicationCountsPerDataSourcePerTier = new HashMap<>();
-    final Set<String> decommissioningServers = 
getDynamicConfigs().getDecommissioningNodes();
-    final List<ImmutableDruidServer> broadcastTargetServers = 
serverInventoryView
-        .getInventory()
-        .stream()
-        .filter(druidServer -> druidServer.isSegmentBroadcastTarget() && 
!decommissioningServers.contains(druidServer.getHost()))
-        .map(DruidServer::toImmutableDruidServer)
-        .collect(Collectors.toList());
 
     if (segmentReplicantLookup == null) {
       return underReplicationCountsPerDataSourcePerTier;
@@ -294,36 +285,19 @@ public class DruidCoordinator
 
       for (final Rule rule : rules) {
         if (!rule.appliesTo(segment, now)) {
+          // Rule did not match. Continue to the next Rule.
           continue;
         }
-
-        if (rule instanceof LoadRule) {
-          ((LoadRule) rule)
-              .getTieredReplicants()
-              .forEach((final String tier, final Integer ruleReplicants) -> {
-                int currentReplicants = 
segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
-                Object2LongMap<String> underReplicationPerDataSource = 
underReplicationCountsPerDataSourcePerTier
-                    .computeIfAbsent(tier, ignored -> new 
Object2LongOpenHashMap<>());
-                ((Object2LongOpenHashMap<String>) 
underReplicationPerDataSource)
-                    .addTo(segment.getDataSource(), Math.max(ruleReplicants - 
currentReplicants, 0));
-              });
+        if (!rule.canLoadSegments()) {
+          // Rule matched but rule does not and cannot load segments.
+          // Hence, there is no need to update 
underReplicationCountsPerDataSourcePerTier map
+          break;
         }
 
-        if (rule instanceof BroadcastDistributionRule) {
-          for (ImmutableDruidServer server : broadcastTargetServers) {
-            Object2LongMap<String> underReplicationPerDataSource = 
underReplicationCountsPerDataSourcePerTier
-                .computeIfAbsent(server.getTier(), ignored -> new 
Object2LongOpenHashMap<>());
-            if (server.getSegment(segment.getId()) == null) {
-              ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
-                  .addTo(segment.getDataSource(), 1);
-            } else {
-              // This make sure that every datasource has a entry even if the 
all segments are loaded
-              
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
-            }
-          }
-        }
+        rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, 
segmentReplicantLookup, segment);
 
-        // only the first matching rule applies
+        // Only the first matching rule applies. This is because the 
Coordinator cycle through all used segments
+        // and match each segment with the first rule that applies. Each 
segment may only match a single rule.
         break;
       }
     }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
index cd04bfd..b86ca01 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator;
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -62,19 +64,22 @@ public class SegmentReplicantLookup
       }
     }
 
-    return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
+    return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, 
cluster);
   }
 
   private final Table<SegmentId, String, Integer> segmentsInCluster;
   private final Table<SegmentId, String, Integer> loadingSegments;
+  private final DruidCluster cluster;
 
   private SegmentReplicantLookup(
       Table<SegmentId, String, Integer> segmentsInCluster,
-      Table<SegmentId, String, Integer> loadingSegments
+      Table<SegmentId, String, Integer> loadingSegments,
+      DruidCluster cluster
   )
   {
     this.segmentsInCluster = segmentsInCluster;
     this.loadingSegments = loadingSegments;
+    this.cluster = cluster;
   }
 
   public Map<String, Integer> getClusterTiers(SegmentId segmentId)
@@ -93,7 +98,7 @@ public class SegmentReplicantLookup
     return retVal;
   }
 
-  int getLoadedReplicants(SegmentId segmentId, String tier)
+  public int getLoadedReplicants(SegmentId segmentId, String tier)
   {
     Integer retVal = segmentsInCluster.get(segmentId, tier);
     return (retVal == null) ? 0 : retVal;
@@ -124,4 +129,21 @@ public class SegmentReplicantLookup
   {
     return getLoadedReplicants(segmentId, tier) + 
getLoadingReplicants(segmentId, tier);
   }
+
+  public Object2LongMap<String> getBroadcastUnderReplication(SegmentId 
segmentId)
+  {
+    Object2LongOpenHashMap<String> perTier = new Object2LongOpenHashMap<>();
+    for (ServerHolder holder : cluster.getAllServers()) {
+      // Only record tier entry for server that is segment broadcast target
+      if (holder.getServer().getType().isSegmentBroadcastTarget()) {
+        // Every broadcast target server should be serving 1 replica of the 
segment
+        if (!holder.isServingSegment(segmentId)) {
+          perTier.addTo(holder.getServer().getTier(), 1L);
+        } else {
+          perTier.putIfAbsent(holder.getServer().getTier(), 0);
+        }
+      }
+    }
+    return perTier;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 26fa9a5..43fdaae 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 
 import java.util.Objects;
 
@@ -114,7 +115,7 @@ public class ServerHolder implements 
Comparable<ServerHolder>
 
   public boolean isServingSegment(DataSegment segment)
   {
-    return server.getSegment(segment.getId()) != null;
+    return isServingSegment(segment.getId());
   }
 
   public boolean isLoadingSegment(DataSegment segment)
@@ -132,6 +133,11 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return peon.getNumberOfSegmentsInQueue();
   }
 
+  public boolean isServingSegment(SegmentId segmentId)
+  {
+    return server.getSegment(segmentId) != null;
+  }
+
   @Override
   public int compareTo(ServerHolder serverHolder)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
index 35ff39e..0b8c37b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java
@@ -19,15 +19,19 @@
 
 package org.apache.druid.server.coordinator.rules;
 
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -71,6 +75,37 @@ public abstract class BroadcastDistributionRule implements 
Rule
                 .accumulate(drop(dropServerHolders, segment));
   }
 
+  @Override
+  public boolean canLoadSegments()
+  {
+    return true;
+  }
+
+  @Override
+  public void updateUnderReplicated(
+      Map<String, Object2LongMap<String>> underReplicatedPerTier,
+      SegmentReplicantLookup segmentReplicantLookup,
+      DataSegment segment
+  )
+  {
+    Object2LongMap<String> underReplicatedBroadcastTiers = 
segmentReplicantLookup.getBroadcastUnderReplication(segment.getId());
+    for (final Object2LongMap.Entry<String> entry : 
underReplicatedBroadcastTiers.object2LongEntrySet()) {
+      final String tier = entry.getKey();
+      final long underReplicatedCount = entry.getLongValue();
+      underReplicatedPerTier.compute(tier, (_tier, existing) -> {
+        Object2LongMap<String> underReplicationPerDataSource = existing;
+        if (existing == null) {
+          underReplicationPerDataSource = new Object2LongOpenHashMap<>();
+        }
+        underReplicationPerDataSource.compute(
+            segment.getDataSource(),
+            (_datasource, count) -> count != null ? count + 
underReplicatedCount : underReplicatedCount
+        );
+        return underReplicationPerDataSource;
+      });
+    }
+  }
+
   private CoordinatorStats assign(
       final Set<ServerHolder> serverHolders,
       final DataSegment segment
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
index c565df9..7ffc7a2 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java
@@ -37,4 +37,10 @@ public abstract class DropRule implements Rule
     stats.addToGlobalStat("deletedCount", 1);
     return stats;
   }
+
+  @Override
+  public boolean canLoadSegments()
+  {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 8091254..d0bf5b8 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.coordinator.rules;
 
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -30,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ReplicationThrottler;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -90,6 +93,32 @@ public abstract class LoadRule implements Rule
     }
   }
 
+  @Override
+  public boolean canLoadSegments()
+  {
+    return true;
+  }
+
+  @Override
+  public void updateUnderReplicated(
+      Map<String, Object2LongMap<String>> underReplicatedPerTier,
+      SegmentReplicantLookup segmentReplicantLookup,
+      DataSegment segment
+  )
+  {
+    getTieredReplicants().forEach((final String tier, final Integer 
ruleReplicants) -> {
+      int currentReplicants = 
segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
+      Object2LongMap<String> underReplicationPerDataSource = 
underReplicatedPerTier.computeIfAbsent(
+          tier,
+          ignored -> new Object2LongOpenHashMap<>()
+      );
+      ((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
+          segment.getDataSource(),
+          Math.max(ruleReplicants - currentReplicants, 0)
+      );
+    });
+  }
+
   /**
    * @param stats {@link CoordinatorStats} to accumulate assignment statistics.
    */
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
index d475f67..02c552f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
@@ -21,13 +21,18 @@ package org.apache.druid.server.coordinator.rules;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.SegmentReplicantLookup;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import java.util.Map;
+
 /**
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@@ -52,6 +57,27 @@ public interface Rule
   boolean appliesTo(Interval interval, DateTime referenceTimestamp);
 
   /**
+   * Return true if this Rule can load segment onto one or more type of Druid 
node, otherwise return false.
+   * Any Rule that returns true for this method should implement logic for 
calculating segment under replicated
+   * in {@link Rule#updateUnderReplicated}
+   */
+  boolean canLoadSegments();
+
+  /**
+   * This method should update the {@param underReplicatedPerTier} with the 
replication count of the
+   * {@param segment}. Rule that returns true for {@link 
Rule#canLoadSegments()} must override this method.
+   * Note that {@param underReplicatedPerTier} is a map of tier -> { 
dataSource -> underReplicationCount }
+   */
+  default void updateUnderReplicated(
+      Map<String, Object2LongMap<String>> underReplicatedPerTier,
+      SegmentReplicantLookup segmentReplicantLookup,
+      DataSegment segment
+  )
+  {
+    Preconditions.checkArgument(!canLoadSegments());
+  }
+
+  /**
    * {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be 
called in Rule's code, because the used
    * segments are not specified for the {@link DruidCoordinatorRuntimeParams} 
passed into Rule's code. This is because
    * {@link DruidCoordinatorRuntimeParams} entangles two slightly different 
(nonexistent yet) abstractions:
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index fcebbde..cb1ee04 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -39,7 +39,7 @@ public class ServerHolderTest
 {
   private static final List<DataSegment> SEGMENTS = ImmutableList.of(
       new DataSegment(
-          "test",
+          "src1",
           Intervals.of("2015-04-12/2015-04-13"),
           "1",
           ImmutableMap.of("containerName", "container1", "blobPath", 
"blobPath1"),
@@ -50,7 +50,7 @@ public class ServerHolderTest
           1
       ),
       new DataSegment(
-          "test",
+          "src2",
           Intervals.of("2015-04-12/2015-04-13"),
           "1",
           ImmutableMap.of("containerName", "container2", "blobPath", 
"blobPath2"),
@@ -177,4 +177,22 @@ public class ServerHolderTest
     Assert.assertNotEquals(h1, h4);
     Assert.assertNotEquals(h1, h5);
   }
+
+  @Test
+  public void testIsServingSegment()
+  {
+    final ServerHolder h1 = new ServerHolder(
+        new ImmutableDruidServer(
+            new DruidServerMetadata("name1", "host1", null, 100L, 
ServerType.HISTORICAL, "tier1", 0),
+            0L,
+            ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
+            1
+        ),
+        new LoadQueuePeonTester()
+    );
+    Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
+    Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
+    Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId()));
+    Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId()));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to