This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d3d02a  Within a partition, only allow querying the first CONSUMING 
segment for the real-time table routing (#4338)
4d3d02a is described below

commit 4d3d02aa4df3738b9cae3e4eddc0b78c62b006a7
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Jun 20 15:20:10 2019 -0700

    Within a partition, only allow querying the first CONSUMING segment for the 
real-time table routing (#4338)
    
    Within a partition, we only allow querying the first CONSUMING segment 
(segment with instances in CONSUMING state) for the following reasons:
    - Within a partition, there could be multiple CONSUMING segments (typically 
caused by the delay of CONSUMING to ONLINE state transition). We should only 
query the first CONSUMING segment because it might contain records that 
overlapped with the records in the next segment (over-consumed).
    - If the instance states for a segment are partial CONSUMING (instances can 
be ONLINE if they have finished the CONSUMING to ONLINE state transition; 
instances can be OFFLINE if they encountered error while consuming and 
controller set the IdealState to OFFLINE; instances can be ERROR if they 
encountered error during the state transition), we count the segment as 
CONSUMING segment. If we don't count the segment as CONSUMING segment, then 
this segment is not allowed to be in the CONSUM [...]
    - It is possible that the latest CONSUMING segment is not allowed for 
routing purpose and we won't query it, but it should only last for a short 
period of time. Once the older CONSUMING segment becomes ONLINE (all instances 
finished the CONSUMING to ONLINE state transition), the latest CONSUMING 
segment will become the first CONSUMING segment and will be allowed for routing 
purpose.
---
 .../LowLevelConsumerRoutingTableBuilder.java       |  6 +-
 .../builder/LowLevelRoutingTableBuilderUtil.java   | 83 ++++++++++------------
 .../LowLevelConsumerRoutingTableBuilderTest.java   | 74 +++++++++++--------
 3 files changed, 83 insertions(+), 80 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
index f6c40b2..4a39da2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilder.java
@@ -122,9 +122,9 @@ public class LowLevelConsumerRoutingTableBuilder extends 
GeneratorBasedRoutingTa
             continue;
           }
 
-          // Replicas in CONSUMING state are only allowed on the last segment
-          if 
(state.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
-              && segmentName.equals(validConsumingSegment)) {
+          // If the server is in CONSUMING status, the segment has to be match 
with the valid consuming segment
+          if 
(state.equals(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING)
+              && validConsumingSegment != null && 
segmentNameStr.equals(validConsumingSegment.getSegmentName())) {
             validServers.add(instance);
           }
         }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
index 8490397..dcc4fcd 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/LowLevelRoutingTableBuilderUtil.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import org.apache.helix.model.ExternalView;
-import org.apache.pinot.common.utils.CommonConstants;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.SegmentName;
 
 
@@ -32,59 +32,50 @@ import org.apache.pinot.common.utils.SegmentName;
 public class LowLevelRoutingTableBuilderUtil {
 
   /**
-   * Compute the map of allowed 'consuming' segments for each partition.
+   * Compute the map of allowed CONSUMING segments for each partition for 
routing purpose.
+   * <p>Within a partition, we only allow querying the first CONSUMING segment 
(segment with instances in CONSUMING
+   * state) for the following reasons:
+   * <ul>
+   *   <li>
+   *     Within a partition, there could be multiple CONSUMING segments 
(typically caused by the delay of CONSUMING to
+   *     ONLINE state transition). We should only query the first CONSUMING 
segment because it might contain records
+   *     that overlapped with the records in the next segment (over-consumed).
+   *   </li>
+   *   <li>
+   *     If the instance states for a segment are partial CONSUMING (instances 
can be ONLINE if they have finished
+   *     the CONSUMING to ONLINE state transition; instances can be OFFLINE if 
they encountered error while consuming
+   *     and controller set the IdealState to OFFLINE; instances can be ERROR 
if they encountered error during the state
+   *     transition), we count the segment as CONSUMING segment. If we don't 
count the segment as CONSUMING segment,
+   *     then this segment is not allowed to be in the CONSUMING state for 
routing purpose, and we will not route
+   *     queries to this segment if there is no ONLINE instances, or route all 
queries to the ONLINE instances which can
+   *     potentially overwhelm instances.
+   *   </li>
+   *   <li>
+   *     It is possible that the latest CONSUMING segment is not allowed for 
routing purpose and we won't query it, but
+   *     it should only last for a short period of time. Once the older 
CONSUMING segment becomes ONLINE (all instances
+   *     finished the CONSUMING to ONLINE state transition), the latest 
CONSUMING segment will become the first
+   *     CONSUMING segment and will be allowed for routing purpose.
+   *   </li>
+   * </ul>
    *
-   * @param externalView helix external view
-   * @param sortedSegmentsByPartition map of partition to sorted set of 
segment names.
-   * @return map of allowed consuming segment for each partition for routing.
+   * @param externalView External view for the real-time table
+   * @param sortedSegmentsByPartition Map from partition to segments
+   * @return Map from partition to allowed CONSUMING segment for routing 
purpose
    */
   public static Map<String, SegmentName> 
getAllowedConsumingStateSegments(ExternalView externalView,
       Map<String, SortedSet<SegmentName>> sortedSegmentsByPartition) {
-    Map<String, SegmentName> allowedSegmentInConsumingStateByPartition = new 
HashMap<>();
-    for (String partition : sortedSegmentsByPartition.keySet()) {
-      SortedSet<SegmentName> sortedSegmentsForPartition = 
sortedSegmentsByPartition.get(partition);
-      SegmentName lastAllowedSegmentInConsumingState = null;
-
+    Map<String, SegmentName> allowedConsumingSegments = new HashMap<>();
+    for (Map.Entry<String, SortedSet<SegmentName>> entry : 
sortedSegmentsByPartition.entrySet()) {
+      String partitionId = entry.getKey();
+      SortedSet<SegmentName> sortedSegmentsForPartition = entry.getValue();
       for (SegmentName segmentName : sortedSegmentsForPartition) {
-        Map<String, String> helixPartitionState = 
externalView.getStateMap(segmentName.getSegmentName());
-        boolean allInConsumingState = true;
-        int replicasInConsumingState = 0;
-
-        // Only keep the segment if all replicas have it in CONSUMING state
-        for (String externalViewState : helixPartitionState.values()) {
-          // Ignore ERROR state
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ERROR))
 {
-            continue;
-          }
-
-          // Not all segments are in CONSUMING state, therefore don't consider 
the last segment assignable to CONSUMING
-          // replicas
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.ONLINE))
 {
-            allInConsumingState = false;
-            break;
-          }
-
-          // Otherwise count the replica as being in CONSUMING state
-          if (externalViewState
-              
.equalsIgnoreCase(CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
 {
-            replicasInConsumingState++;
-          }
-        }
-
-        // If all replicas have this segment in consuming state (and not all 
of them are in ERROR state), then pick this
-        // segment to be the last allowed segment to be in CONSUMING state
-        if (allInConsumingState && 0 < replicasInConsumingState) {
-          lastAllowedSegmentInConsumingState = segmentName;
+        if (externalView.getStateMap(segmentName.getSegmentName())
+            .containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
+          allowedConsumingSegments.put(partitionId, segmentName);
           break;
         }
       }
-
-      if (lastAllowedSegmentInConsumingState != null) {
-        allowedSegmentInConsumingStateByPartition.put(partition, 
lastAllowedSegmentInConsumingState);
-      }
     }
-    return allowedSegmentInConsumingStateByPartition;
+    return allowedConsumingSegments;
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
index 23be4be..7538108 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/LowLevelConsumerRoutingTableBuilderTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.broker.routing.builder;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,9 @@ import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.slf4j.Logger;
@@ -157,48 +160,57 @@ public class LowLevelConsumerRoutingTableBuilderTest {
 
   @Test
   public void testMultipleConsumingSegments() {
-    final int SEGMENT_COUNT = 10;
-    final int ONLINE_SEGMENT_COUNT = 8;
-    final int CONSUMING_SEGMENT_COUNT = SEGMENT_COUNT - ONLINE_SEGMENT_COUNT;
+    String rawTableName = "testTable";
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+    String instance1 = "Server_localhost_1234";
+    String instance2 = "Server_localhost_5678";
+    int numSegments = 10;
+    int numConsumingSegments = 2;
+    int numOnlineSegments = numSegments - numConsumingSegments;
 
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName("tableName").build();
     LowLevelConsumerRoutingTableBuilder routingTableBuilder = new 
LowLevelConsumerRoutingTableBuilder();
+    TableConfig tableConfig =
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(rawTableName).build();
     routingTableBuilder.init(new BaseConfiguration(), tableConfig, null, null);
 
-    List<SegmentName> segmentNames = new ArrayList<>();
-    for (int i = 0; i < SEGMENT_COUNT; ++i) {
-      segmentNames.add(new LLCSegmentName("table", 0, i, 
System.currentTimeMillis()));
-    }
-
-    List<InstanceConfig> instanceConfigs = new ArrayList<>();
-    InstanceConfig instanceConfig = new 
InstanceConfig("Server_localhost_1234");
-    instanceConfigs.add(instanceConfig);
-    
instanceConfig.getRecord().setSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
 "false");
+    List<InstanceConfig> instanceConfigs = Arrays.asList(new 
InstanceConfig(instance1), new InstanceConfig(instance2));
 
-    // Generate an external view for a single server with some consuming 
segments
-    ExternalView externalView = new ExternalView("table_REALTIME");
-    for (int i = 0; i < ONLINE_SEGMENT_COUNT; i++) {
-      externalView.setState(segmentNames.get(i).getSegmentName(), 
"Server_localhost_1234", "ONLINE");
-    }
-    for (int i = ONLINE_SEGMENT_COUNT; i < SEGMENT_COUNT; ++i) {
-      externalView.setState(segmentNames.get(i).getSegmentName(), 
"Server_localhost_1234", "CONSUMING");
+    List<String> segments = new ArrayList<>();
+    for (int i = 0; i < numSegments; i++) {
+      segments.add(new LLCSegmentName(rawTableName, 0, i, 
System.currentTimeMillis()).getSegmentName());
     }
 
-    routingTableBuilder.computeOnExternalViewChange("table", externalView, 
instanceConfigs);
+    // Generate an external view for two servers with some CONSUMING segments
+    ExternalView externalView = new ExternalView(realtimeTableName);
+    for (int i = 0; i < numOnlineSegments; i++) {
+      String segmentName = segments.get(i);
+      externalView.setState(segmentName, instance1, 
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+      externalView.setState(segmentName, instance2, 
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+    }
+    // The first CONSUMING segment has one instance in ONLINE state and the 
other in CONSUMING state (only one instance
+    // finished the CONSUMING -> ONLINE state transition)
+    String consumingSegment1 = segments.get(numOnlineSegments);
+    externalView.setState(consumingSegment1, instance1, 
RealtimeSegmentOnlineOfflineStateModel.ONLINE);
+    externalView.setState(consumingSegment1, instance2, 
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+    // The second CONSUMING segment has both instances in CONSUMING state
+    String consumingSegment2 = segments.get(numOnlineSegments + 1);
+    externalView.setState(consumingSegment2, instance1, 
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+    externalView.setState(consumingSegment2, instance2, 
RealtimeSegmentOnlineOfflineStateModel.CONSUMING);
+
+    routingTableBuilder.computeOnExternalViewChange(realtimeTableName, 
externalView, instanceConfigs);
     List<Map<String, List<String>>> routingTables = 
routingTableBuilder.getRoutingTables();
     for (Map<String, List<String>> routingTable : routingTables) {
+      ArrayList<String> segmentsInRoutingTable = new ArrayList<>();
       for (List<String> segmentsForServer : routingTable.values()) {
-        assertEquals(segmentsForServer.size(), ONLINE_SEGMENT_COUNT + 1);
-
-        // Should only contain the first consuming segment, not the second
-        
assertTrue(segmentsForServer.contains(segmentNames.get(ONLINE_SEGMENT_COUNT).getSegmentName()),
-            "Segment set does not contain the first segment in consuming 
state");
-        for (int i = ONLINE_SEGMENT_COUNT + 1; i < SEGMENT_COUNT; i++) {
-          
assertFalse(segmentsForServer.contains(segmentNames.get(i).getSegmentName()),
-              "Segment set contains a segment in consuming state that should 
not be there");
-        }
+        segmentsInRoutingTable.addAll(segmentsForServer);
       }
+      assertEquals(segmentsInRoutingTable.size(), numOnlineSegments + 1);
+
+      // Should only contain the first consuming segment, not the second
+      
assertTrue(segmentsInRoutingTable.contains(segments.get(numOnlineSegments)),
+          "Segment set does not contain the first segment in consuming state");
+      
assertFalse(segmentsInRoutingTable.contains(segments.get(numOnlineSegments + 
1)),
+          "Segment set contains a segment in consuming state that should not 
be there");
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to