siddharthteotia commented on a change in pull request #4446: Add support in the 
rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r307190283
 
 

 ##########
 File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancerAdminToolClusterIntegrationTest.java
 ##########
 @@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.helix.core.TableRebalancer;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.server.realtime.ControllerLeaderLocator;
+import 
org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
+import org.apache.pinot.tools.PinotTableRebalancer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Class to test {@link TableRebalancer} using {@link PinotTableRebalancer}
+ * as the entry point to test the end-to-end path from pinot-admin tool.
+ */
+public class TableRebalancerAdminToolClusterIntegrationTest extends 
BaseClusterIntegrationTestSet {
+
+  private static final String ZKSTR = ZkStarter.DEFAULT_ZK_STR;
+  private static final int NUM_INITIAL_SERVERS = 3;
+  private final List<HelixManager> _helixManagers = new ArrayList<>();
+  private final Set<String> servers = new HashSet<>();
+  private StateTransitionStats _stateTransitionStats;
+
+  @Override
+  protected boolean isUsingNewConfigFormat() {
+    return true;
+  }
+
+  @BeforeClass
+  public void setup()
+      throws Exception {
+    startZk();
+    startController();
+    startBroker();
+    startFakeServers(NUM_INITIAL_SERVERS, 
CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
+    addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), 
SegmentVersion.v1, getInvertedIndexColumns(),
+        getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+    completeTableConfiguration();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeServers();
+    stopBroker();
+    stopController();
+    stopZk();
+  }
+
+  private void startFakeServers(final int numServers, final int basePort)
+      throws Exception {
+    for (int i = 0; i < numServers; i++) {
+      final int nettyPort = basePort + i;
+      final String instanceId =
+          CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE + 
NetUtil.getHostAddress() + "_" + nettyPort;
+      if (servers.contains(instanceId)) {
+        continue;
+      }
+      servers.add(instanceId);
+      final HelixManager helixManager = HelixManagerFactory
+          .getZKHelixManager(_clusterName, instanceId, 
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+      helixManager.getStateMachineEngine()
+          
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
+              new FakeServerSegmentStateModelFactory());
+      helixManager.connect();
+      helixManager.getClusterManagmentTool().addInstanceTag(_clusterName, 
instanceId,
+          
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+      _helixManagers.add(helixManager);
+      ControllerLeaderLocator.create(helixManager);
+    }
+  }
+
+  private void stopFakeServers() {
+    for (HelixManager helixManager : _helixManagers) {
+      helixManager.disconnect();
+    }
+  }
+
+  /**
+   * Test in no-downtime mode when there are sufficient
+   * common hosts between current and target ideal states
+   * to keep up the min number of serving replicas. In this
+   * case, the algorithm in {@link TableRebalancer} that
+   * makes changes to ideal state to get to a target ideal
+   * state can do the transition in one go.
+   *
+   * Scenario:
+   *
+   * Current ideal state
+   *
+   * segment1: {host1:online, host2:online, host3:online}
+   * segment2: {host1:online, host2:online, host3:online}
+   *
+   * We add 2 additional hosts: host4 and host5
+   * The rebalancer will use the table config to get the
+   * appropriate rebalance strategy (default strategy in this case)
+   * and come up with the target ideal state
+   *
+   * segment1: {host1:online, host3:online, host5:online}
+   * segment2: {host1:online, host2:online, host4:online}
+   *
+   * The test specifies the min replicas to keep up as 2.
+   * With the above setup, the rebalancer can set to target
+   * partition state for each segment with a direct update
+   * (because of common hosts that still satisfy the min
+   * replica requirement). Since this happens once for each
+   * segment before we get to target ideal state, the
+   * number of direct transitions are 2.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPinotTableRebalancerWithDirectTransitions()
+      throws Exception {
+    _stateTransitionStats = new StateTransitionStats();
+
+    // write ideal state
+    createIdealState(2, NUM_INITIAL_SERVERS);
+
+    // add additional servers to trigger the rebalance strategy,
+    // otherwise rebalance will be a NO-OP
+    startFakeServers(2, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT + 
NUM_INITIAL_SERVERS);
+
+    // rebalance
+    final PinotTableRebalancer tableRebalancer = new 
PinotTableRebalancer(ZKSTR, _clusterName, false, true, false, 2);
+    tableRebalancer.rebalance(getTableName(), "OFFLINE");
+    final TableRebalancer.RebalancerStats stats = 
tableRebalancer.getRebalancerStats();
+
+    // check the algorithm stats on how we moved from current
+    // to target ideal state
+    Assert.assertEquals(0, stats.getDryRun());
+    Assert.assertEquals(2, stats.getDirectTransitions());
+    Assert.assertEquals(0, stats.getIncrementalTransitions());
+
+    // as part of rebalancing, host2 lost segment1 and host3
+    // lost segment2 -- so 2 transitions from ON to OFF and
+    // OFF to DROP
+    Assert.assertEquals(2, _stateTransitionStats.offFromOn);
+    Assert.assertEquals(2, _stateTransitionStats.dropFromOff);
+
+    // clear for next test to avoid mixing state transition stats
+    clearIdealState(2);
+  }
+
+  /**
+   * Test in no-downtime mode when there are not sufficient
+   * common hosts between current and target ideal states
+   * to keep up the min number of serving replicas. In this
+   * case, the algorithm in {@link TableRebalancer} that
+   * makes changes to ideal state to get to a target ideal
+   * state does the changes incrementally while satisfying
+   * the min replica requirement
+   *
+   * Scenario:
+   *
+   * Current ideal state
+   *
+   * segment1: {host1:online, host2:online, host3:online}
+   * segment2: {host1:online, host2:online, host3:online}
+   * segment3: {host1:online, host2:online, host3:online}
+   * segment4: {host1:online, host2:online, host3:online}
+   *
+   * We add 3 additional hosts: host4, host5 and host6
+   * The rebalancer will use the table config to get the
+   * appropriate rebalance strategy (default strategy in this case)
+   * and come up with the target ideal state
+   *
+   * segment1: {host1:online, host3:online, host5:online}
+   * segment2: {host2:online, host4:online, host6:online}
+   * segment3: {host1:online, host3:online, host5:online}
+   * segment4: {host2:online, host4:online, host6:online}
+   *
+   * The test specifies the min replicas to keep up as 2.
+   * With the above setup, the rebalancer can set to target
+   * partition state for each segment with a direct update
+   * (because of common hosts that still satisfy the min
+   * replica requirement) only for segments 1 and 3.
+   *
+   * so far direct transitions 2
+   *
+   * for segments 2 and 4, we have to do one incremental
+   * change each by removing a current server and adding
+   * a new server while keeping up with requirement of
+   * 2 min serving replicas
+   *
+   * so far incremental transitions 2
+   *
+   * Since we have looked at all segments once, the updated
+   * ideal state is persisted in ZK, we wait for external view
+   * to converge and next iteration of rebalancing begins by
+   * checking if we have reached target.
+   * We haven't so we go over each segment again.
+   *
+   * Now for each segment we can do direct update -- once for
+   * each of the 4 segments while still satisfying
+   * the min replica requirement
+   *
+   * So, direct transitions: 2+4 = 6, increment transitions = 2
+   * @throws Exception
+   */
+  @Test
+  public void testPinotTableRebalancerWithIncrementalTransitions()
+      throws Exception {
+    _stateTransitionStats = new StateTransitionStats();
+
+    // write ideal state
+    createIdealState(4, NUM_INITIAL_SERVERS);
+
+    // add additional servers to trigger the rebalance strategy,
+    // otherwise rebalance will be a NO-OP
+    startFakeServers(3, CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT + 
NUM_INITIAL_SERVERS);
+
+    // rebalance
+    final PinotTableRebalancer tableRebalancer = new 
PinotTableRebalancer(ZKSTR, _clusterName, false, true, false, 2);
+    tableRebalancer.rebalance(getTableName(), "OFFLINE");
+    TableRebalancer.RebalancerStats stats = 
tableRebalancer.getRebalancerStats();
+
+    // verify the algorithm stats on how we moved from
+    // current to target ideal state
+    Assert.assertEquals(0, stats.getDryRun());
+    Assert.assertEquals(6, stats.getDirectTransitions());
+    Assert.assertEquals(2, stats.getIncrementalTransitions());
+
+    // as part of rebalancing, host2 lost segment1 and segment3,
+    // host1 and host3 lost segment2 and segment4 -- so 6
+    // transitions from ON to OFF and OFF to DROP
+    Assert.assertEquals(6, _stateTransitionStats.offFromOn);
+    Assert.assertEquals(6, _stateTransitionStats.dropFromOff);
+  }
+
+  private void clearIdealState(final int numSegments) throws Exception {
+    final HelixDataAccessor dataAccessor = 
_helixManagers.get(0).getHelixDataAccessor();
+    final String tableNameWithType = getTableName() + "_OFFLINE";
+    final PropertyKey idealStateKey = 
dataAccessor.keyBuilder().idealStates(tableNameWithType);
+    final IdealState idealState = dataAccessor.getProperty(idealStateKey);
+    for (int i = 0; i < numSegments; i++) {
+      final String segmentID = "segment" + i;
+      if (idealState.getInstanceStateMap(segmentID) != null) {
+        idealState.getInstanceStateMap(segmentID).clear();
+      }
+    }
+    final ZkBaseDataAccessor zkBaseDataAccessor = (ZkBaseDataAccessor) 
dataAccessor.getBaseDataAccessor();
+    zkBaseDataAccessor.set(idealStateKey.getPath(), idealState.getRecord(), 
idealState.getRecord().getVersion(),
+        AccessOption.PERSISTENT);
+
+    // should be enough for the callbacks to come and go away
+    Thread.sleep(2000);
 
 Review comment:
   done -- using a new table in each test

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to