This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 115cab9  The external view check in rebalancer should also check for 
no extra servers (#4498)
115cab9 is described below

commit 115cab9649378dcbd6afffafcfa5b0bbca8c24f7
Author: Sidd <[email protected]>
AuthorDate: Mon Aug 12 11:42:50 2019 -0700

    The external view check in rebalancer should also check for no extra 
servers (#4498)
    
    * Re-enable the flakey test in table rebalancer cluster integration
    test. The function to check if external view has converged now also
    ensures that servers that lost segments as part of rebalancing are
    either not present in external view or they have the segment in
    DROPPED state.
    
    * Undo print
    
    * Do not check for dropped state
    
    * Keep stats disabled
---
 .../controller/helix/core/TableRebalancer.java     | 40 +++++++++++-------
 ...eRebalancerAdminToolClusterIntegrationTest.java | 48 ++++++++++------------
 2 files changed, 48 insertions(+), 40 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
index 0c90d22..4e2d0e5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
@@ -510,11 +510,11 @@ public class TableRebalancer {
 
     boolean stable = true;
     for (String segment : mapFieldsIS.keySet()) {
-      Map<String, String> mapIS = mapFieldsIS.get(segment);
-      Map<String, String> mapEV = mapFieldsEV.get(segment);
+      Map<String, String> hostAndStatesInIdealState = mapFieldsIS.get(segment);
+      Map<String, String> hostAndStatesInExternalView = 
mapFieldsEV.get(segment);
       boolean converged = true;
 
-      if (mapEV == null) {
+      if (hostAndStatesInExternalView == null) {
         LOGGER.info("Host-state mapping of segment {} not yet available in 
external view", segment);
         // we have found that external view hasn't yet converged to ideal 
state.
         // still go on to check for other segments just so that we can dump 
debug
@@ -527,27 +527,27 @@ public class TableRebalancer {
 
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Hosts and states for segment {} in ideal state", 
segment);
-        prettyPrintMap(mapIS, Level.DEBUG);
+        prettyPrintMap(hostAndStatesInIdealState, Level.DEBUG);
         LOGGER.debug("Hosts and states for segment {} in external view", 
segment);
-        prettyPrintMap(mapEV, Level.DEBUG);
+        prettyPrintMap(hostAndStatesInExternalView, Level.DEBUG);
       }
 
-      for (String server : mapIS.keySet()) {
-        if (!mapEV.containsKey(server)) {
+      for (String server : hostAndStatesInIdealState.keySet()) {
+        if (!hostAndStatesInExternalView.containsKey(server)) {
           LOGGER.info("Host-state mapping of segment {} doesn't yet have 
server {} in external view",
               segment, server);
           // external view not yet converged
           stable = false;
           converged = false;
-        } else if (mapEV.get(server).equalsIgnoreCase("error")) {
+        } else if 
(hostAndStatesInExternalView.get(server).equalsIgnoreCase("error")) {
           LOGGER.error("Detected error state for segment {} for server {}", 
segment, server);
-          prettyPrintMap(mapIS, Level.ERROR);
-          prettyPrintMap(mapEV, Level.ERROR);
+          prettyPrintMap(hostAndStatesInIdealState, Level.ERROR);
+          prettyPrintMap(hostAndStatesInExternalView, Level.ERROR);
           throw new IllegalStateException("External view reports error state 
for segment " + segment + " for host " + server,
               new ExternalViewErrored());
         } else {
-          final String stateInIdealState = mapIS.get(server);
-          final String stateInExternalView = mapEV.get(server);
+          final String stateInIdealState = 
hostAndStatesInIdealState.get(server);
+          final String stateInExternalView = 
hostAndStatesInExternalView.get(server);
           if (!stateInIdealState.equalsIgnoreCase(stateInExternalView)) {
             LOGGER.info("Host-state mapping of segment {} has state {} in 
external view and state {} in ideal state",
                 segment, stateInExternalView, stateInIdealState);
@@ -561,10 +561,22 @@ public class TableRebalancer {
       if (!converged) {
         segmentsNotConverged++;
       }
+
+      // now do the reverse comparison
+      // if a server had lost a segment as part of rebalancing (implying ideal 
state no
+      // longer has that server for the particular segment) then the server 
should
+      // no longer present for that segment in external view as well
+      for (String server : hostAndStatesInExternalView.keySet()) {
+        if (!hostAndStatesInIdealState.containsKey(server)) {
+            stable = false;
+            LOGGER.info("Server {} for segment {} should not be present in 
external view", server, segment);
+          }
+      }
     }
 
     LOGGER.info("{} of total {} segments from ideal state don't yet have 
external view converged",
         segmentsNotConverged, mapFieldsIS.size());
+
     return stable;
   }
 
@@ -610,9 +622,9 @@ public class TableRebalancer {
           LOGGER.info("Waiting for externalView to match idealstate for 
table:" + resourceName);
           Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
           wait += EXTERNAL_VIEW_CHECK_INTERVAL_MS;
-          }
         }
-      } catch (InterruptedException e) {
+      }
+    } catch (InterruptedException e) {
       LOGGER.error("Rebalancer got interrupted while waiting for external view 
to converge");
       Thread.currentThread().interrupt();
     }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
index 47ddecc..9677a2f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
@@ -19,11 +19,9 @@
 package org.apache.pinot.integration.tests;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import jdk.nashorn.internal.ir.annotations.Ignore;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -47,7 +45,6 @@ import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.TableRebalancer;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
 import org.apache.pinot.tools.PinotTableRebalancer;
 import org.testng.Assert;
@@ -203,9 +200,9 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       // as part of rebalancing, host2 lost segment1 and host3
       // lost segment2 -- so 2 transitions from ON to OFF and
       // OFF to DROP
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 2);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 2);
+      // HELIX-818: https://issues.apache.org/jira/browse/HELIX-818
+      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 2);
+      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 
2);
     } finally {
       stopFakeServers();
     }
@@ -322,9 +319,9 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       // as part of rebalancing, host2 lost segment1 and segment3,
       // host1 and host3 lost segment2 and segment4 -- so 6
       // transitions from ON to OFF and OFF to DROP
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 6);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 6);
+      // HELIX-818: https://issues.apache.org/jira/browse/HELIX-818
+      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 6);
+      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 
6);
     } finally {
       stopFakeServers();
     }
@@ -458,9 +455,9 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       // and segment4. similarly, host4 lost segment1, segment3
       // and segment4 -- total 6 transitions from ONLINE to OFFLINE
       // and OFFLINE to DROPPED
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 6);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 6);
+      // HELIX-818: https://issues.apache.org/jira/browse/HELIX-818
+      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 6);
+      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 
6);
     } finally {
       stopFakeServers();
     }
@@ -593,9 +590,9 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       // as part of rebalancing, host2 lost segment1 and segment3,
       // host1 and host3 lost segment2 and segment4 -- so 6
       // transitions from ON to OFF and OFF to DROP
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 3);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 3);
+      // HELIX-818: https://issues.apache.org/jira/browse/HELIX-818
+      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 3);
+      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 
3);
     } finally {
       stopFakeServers();
     }
@@ -669,9 +666,9 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       // as part of rebalancing, host2 lost segment1 and segment3,
       // host1 and host3 lost segment2 and segment4 -- so 6
       // transitions from ON to OFF and OFF to DROP
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 3);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 3);
+      // HELIX-818: https://issues.apache.org/jira/browse/HELIX-818
+      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 3);
+      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 
3);
     } finally {
       stopFakeServers();
     }
@@ -730,9 +727,8 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
       Assert.assertEquals(stats.getIncrementalUpdatesToSegmentInstanceMap(), 
0);
       Assert.assertEquals(stats.getNumSegmentMoves(), 0);
 
-      // TODO: fix the flakey test behavior and re-enable these assertions
-      // Assert.assertEquals(_segmentStateTransitionStats.offFromOn, 0);
-      // Assert.assertEquals(_segmentStateTransitionStats.dropFromOff, 0);
+      Assert.assertEquals(_segmentStateTransitionStats.offFromOn.get(), 0);
+      Assert.assertEquals(_segmentStateTransitionStats.dropFromOff.get(), 0);
     } finally {
       stopFakeServers();
     }
@@ -841,14 +837,14 @@ public class 
TableRebalancerAdminToolClusterIntegrationTest extends BaseClusterI
       @Transition(from = "ONLINE", to = "OFFLINE")
       public void onBecomeOfflineFromOnline(Message message, 
NotificationContext context) {
         if (_segmentStateTransitionStats != null) {
-          ++_segmentStateTransitionStats.offFromOn;
+          _segmentStateTransitionStats.offFromOn.incrementAndGet();
         }
       }
 
       @Transition(from = "OFFLINE", to = "DROPPED")
       public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
         if (_segmentStateTransitionStats != null) {
-          ++_segmentStateTransitionStats.dropFromOff;
+          _segmentStateTransitionStats.dropFromOff.incrementAndGet();
         }
       }
 
@@ -859,8 +855,8 @@ public class TableRebalancerAdminToolClusterIntegrationTest 
extends BaseClusterI
   }
 
   private static class SegmentStateTransitionStats {
-    private int offFromOn = 0;
-    private int dropFromOff = 0;
+    private AtomicInteger offFromOn = new AtomicInteger();
+    private AtomicInteger dropFromOff = new AtomicInteger();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to