This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9316610 Clean up test utils, disconnect fake instances (#4476)
9316610 is described below
commit 93166101f7d1d97c6042f519c2b019106361268b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Jul 29 18:50:18 2019 -0700
Clean up test utils, disconnect fake instances (#4476)
- Remove redundant util classes
- Move controller test related methods into ControllerTest for easier usage
- Disconnect fake instances created in the test
- Update state model to include the missing transitions
- Fix the flakiness in ValidationManagerTest
- No change for the test logic
---
.../broker/broker/HelixBrokerStarterTest.java | 9 +-
.../helix/ControllerRequestBuilderUtil.java | 152 -----------
.../EmptyBrokerOnlineOfflineStateModelFactory.java | 70 -----
...EmptySegmentOnlineOfflineStateModelFactory.java | 81 ------
.../pinot/controller/api/PinotFileUploadTest.java | 39 ++-
.../api/PinotSegmentRestletResourceTest.java | 29 +--
.../api/PinotTableRestletResourceTest.java | 11 +-
.../api/PinotTenantRestletResourceTest.java | 12 +-
.../pinot/controller/api/TableViewsTest.java | 12 +-
.../helix/ControllerInstanceToggleTest.java | 33 ++-
.../controller/helix/ControllerSentinelTestV2.java | 45 ++--
.../controller/helix/ControllerTenantTest.java | 111 ++++----
.../pinot/controller/helix/ControllerTest.java | 281 +++++++++++++++++++--
.../pinot/controller/helix/HelixHelperTest.java | 6 +-
.../controller/helix/PinotResourceManagerTest.java | 43 ++--
.../helix/core/PinotHelixResourceManagerTest.java | 57 ++---
.../ReplicaGroupRebalanceStrategyTest.java | 33 +--
.../sharding/SegmentAssignmentStrategyTest.java | 133 ++++------
.../validation/ValidationManagerTest.java | 49 ++--
.../tests/BaseClusterIntegrationTest.java | 20 +-
.../tests/BaseClusterIntegrationTestSet.java | 17 +-
.../pinot/integration/tests/ClusterTest.java | 22 +-
.../ControllerPeriodicTasksIntegrationTests.java | 85 ++++---
.../tests/OfflineClusterIntegrationTest.java | 18 +-
.../tests/SegmentCompletionIntegrationTest.java | 10 +-
.../server/starter/helix/HelixServerStarter.java | 6 +-
26 files changed, 605 insertions(+), 779 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 8ecff02..df53801 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.util.TestUtils;
@@ -77,11 +76,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
_brokerStarter = new HelixBrokerStarter(brokerConf, getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR);
_brokerStarter.start();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, NUM_BROKERS - 1,
- true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, NUM_SERVERS, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS - 1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addTime(TIME_COLUMN_NAME, TimeUnit.DAYS,
FieldSpec.DataType.INT).build();
@@ -207,6 +203,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
@AfterClass
public void tearDown() {
+ stopFakeInstances();
_brokerStarter.shutdown();
stopController();
stopZk();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
deleted file mode 100644
index bedc11a..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.config.TagNameUtils;
-import org.apache.pinot.common.config.Tenant;
-import org.apache.pinot.common.config.Tenant.TenantBuilder;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.TenantRole;
-
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
-import static
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
-
-
-public class ControllerRequestBuilderUtil {
- private ControllerRequestBuilderUtil() {
- }
-
- public static void addFakeBrokerInstancesToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- int numInstances)
- throws Exception {
- addFakeBrokerInstancesToAutoJoinHelixCluster(helixClusterName, zkServer,
numInstances, false);
- }
-
- public static void addFakeBrokerInstancesToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- int numInstances, boolean isSingleTenant)
- throws Exception {
- for (int i = 0; i < numInstances; ++i) {
- final String brokerId = "Broker_localhost_" + i;
- final HelixManager helixZkManager =
- HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId,
InstanceType.PARTICIPANT, zkServer);
- final StateMachineEngine stateMachineEngine =
helixZkManager.getStateMachineEngine();
- final StateModelFactory<?> stateModelFactory = new
EmptyBrokerOnlineOfflineStateModelFactory();
- stateMachineEngine
-
.registerStateModelFactory(EmptyBrokerOnlineOfflineStateModelFactory.getStateModelDef(),
stateModelFactory);
- helixZkManager.connect();
- if (isSingleTenant) {
-
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName,
brokerId,
-
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
- } else {
-
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName,
brokerId, UNTAGGED_BROKER_INSTANCE);
- }
- }
- }
-
- public static void addFakeDataInstancesToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- int numInstances)
- throws Exception {
- addFakeDataInstancesToAutoJoinHelixCluster(helixClusterName, zkServer,
numInstances, false,
- CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
- }
-
- public static void addFakeDataInstancesToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- int numInstances, boolean isSingleTenant)
- throws Exception {
- addFakeDataInstancesToAutoJoinHelixCluster(helixClusterName, zkServer,
numInstances, isSingleTenant,
- CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
- }
-
- public static void addFakeDataInstancesToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- int numInstances, boolean isSingleTenant, int adminPort)
- throws Exception {
-
- for (int i = 0; i < numInstances; ++i) {
- final String instanceId = "Server_localhost_" + i;
- addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer,
instanceId, isSingleTenant, adminPort + i);
- }
- }
-
- public static void addFakeDataInstanceToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- String instanceId)
- throws Exception {
- addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer,
instanceId, false,
- CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
- }
-
- public static void addFakeDataInstanceToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- String instanceId, boolean isSingleTenant)
- throws Exception {
- addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer,
instanceId, isSingleTenant,
- CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
- }
-
- public static void addFakeDataInstanceToAutoJoinHelixCluster(String
helixClusterName, String zkServer,
- String instanceId, boolean isSingleTenant, int adminPort)
- throws Exception {
- final HelixManager helixZkManager =
- HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId,
InstanceType.PARTICIPANT, zkServer);
- final StateMachineEngine stateMachineEngine =
helixZkManager.getStateMachineEngine();
- final StateModelFactory<?> stateModelFactory = new
EmptySegmentOnlineOfflineStateModelFactory();
- stateMachineEngine
-
.registerStateModelFactory(EmptySegmentOnlineOfflineStateModelFactory.getStateModelDef(),
stateModelFactory);
- helixZkManager.connect();
- if (isSingleTenant) {
-
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName,
instanceId,
-
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
-
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName,
instanceId,
-
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
- } else {
-
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName,
instanceId, UNTAGGED_SERVER_INSTANCE);
- }
- HelixConfigScope scope =
- new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
helixClusterName)
- .forParticipant(instanceId).build();
- Map<String, String> props = new HashMap<>();
- props.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY,
String.valueOf(adminPort));
- helixZkManager.getClusterManagmentTool().setConfig(scope, props);
- }
-
- public static String buildBrokerTenantCreateRequestJSON(String tenantName,
int numberOfInstances)
- throws JsonProcessingException {
- Tenant tenant = new
TenantBuilder(tenantName).setRole(TenantRole.BROKER).setTotalInstances(numberOfInstances)
- .setOfflineInstances(0).setRealtimeInstances(0).build();
- return JsonUtils.objectToString(tenant);
- }
-
- public static String buildServerTenantCreateRequestJSON(String tenantName,
int numberOfInstances,
- int offlineInstances, int realtimeInstances)
- throws JsonProcessingException {
- Tenant tenant = new
TenantBuilder(tenantName).setRole(TenantRole.SERVER).setTotalInstances(numberOfInstances)
-
.setOfflineInstances(offlineInstances).setRealtimeInstances(realtimeInstances).build();
- return JsonUtils.objectToString(tenant);
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
deleted file mode 100644
index b7ae591..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptyBrokerOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
-
- @Override
- public StateModel createNewStateModel(String partitionName) {
- final EmptyBrokerOnlineOfflineStateModel SegmentOnlineOfflineStateModel =
new EmptyBrokerOnlineOfflineStateModel();
- return SegmentOnlineOfflineStateModel;
- }
-
- public EmptyBrokerOnlineOfflineStateModelFactory() {
- }
-
- public static String getStateModelDef() {
- return "BrokerResourceOnlineOfflineStateModel";
- }
-
- @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState =
"OFFLINE")
- public static class EmptyBrokerOnlineOfflineStateModel extends StateModel {
- private static final Logger LOGGER =
LoggerFactory.getLogger(EmptyBrokerOnlineOfflineStateModel.class);
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void onBecomeOnlineFromOffline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeOnlineFromOffline() : "
+ message);
- }
-
- @Transition(from = "ONLINE", to = "OFFLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeOfflineFromOnline() : "
+ message);
- }
-
- @Transition(from = "OFFLINE", to = "DROPPED")
- public void onBecomeDroppedFromOffline(Message message,
NotificationContext context) {
-
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOffline() :
" + message);
- }
-
- @Transition(from = "ONLINE", to = "DROPPED")
- public void onBecomeDroppedFromOnline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOnline() : "
+ message);
- }
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
deleted file mode 100644
index 48b8f05..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptySegmentOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
-
- @Override
- public StateModel createNewStateModel(String partitionName) {
- final EmptySegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel =
- new EmptySegmentOnlineOfflineStateModel();
- return SegmentOnlineOfflineStateModel;
- }
-
- public EmptySegmentOnlineOfflineStateModelFactory() {
- }
-
- public static String getStateModelDef() {
- return "SegmentOnlineOfflineStateModel";
- }
-
- @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState =
"OFFLINE")
- public static class EmptySegmentOnlineOfflineStateModel extends StateModel {
- private static final Logger LOGGER =
LoggerFactory.getLogger(EmptySegmentOnlineOfflineStateModel.class);
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void onBecomeOnlineFromOffline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() :
" + message);
- }
-
- @Transition(from = "OFFLINE", to = "CONSUMING")
- public void onBecomeConsumingFromOffline(Message message,
NotificationContext context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline()
: " + message);
- }
-
- @Transition(from = "ONLINE", to = "OFFLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() :
" + message);
- }
-
- @Transition(from = "OFFLINE", to = "DROPPED")
- public void onBecomeDroppedFromOffline(Message message,
NotificationContext context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() :
" + message);
- }
-
- @Transition(from = "ONLINE", to = "DROPPED")
- public void onBecomeDroppedFromOnline(Message message, NotificationContext
context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() :
" + message);
- }
-
- @Transition(from = "CONSUMING", to = "DROPPED")
- public void onBecomeDroppedFromConsuming(Message message,
NotificationContext context) {
-
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming()
: " + message);
- }
- }
-}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
index 6d1ba67..d0c7511 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
@@ -25,8 +25,6 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -41,6 +39,20 @@ import org.testng.annotations.Test;
public class PinotFileUploadTest extends ControllerTest {
private static final String TABLE_NAME = "testTable";
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+ startController();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(5, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(5, true);
+
+ // Adding table
+ TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME)
+
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
+ _helixResourceManager.addTable(tableConfig);
+ }
+
@Test
public void testUploadBogusData()
throws Exception {
@@ -54,28 +66,9 @@ public class PinotFileUploadTest extends ControllerTest {
Assert.assertTrue(statusCode >= 400 && statusCode < 500, "Status code = "
+ statusCode);
}
- @BeforeClass
- public void setUp()
- throws Exception {
- startZk();
- startController();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 5, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 5, true);
-
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
- 5);
-
- // Adding table
- TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME)
-
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
- _helixResourceManager.addTable(tableConfig);
- }
-
@AfterClass
- public void tearDown()
- throws Exception {
+ public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 65d3d00..5f1dd94 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -25,8 +25,6 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
@@ -36,22 +34,15 @@ import org.testng.annotations.Test;
public class PinotSegmentRestletResourceTest extends ControllerTest {
- private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
- private final static String TABLE_NAME = "testTable";
+ private static final String TABLE_NAME = "testTable";
@BeforeClass
public void setUp()
throws Exception {
startZk();
startController();
-
-
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_SERVER, 1, true);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_SERVER, 1, true);
-
- while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size() == 0) {
- Thread.sleep(100);
- }
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size(),
1);
@@ -59,13 +50,6 @@ public class PinotSegmentRestletResourceTest extends
ControllerTest {
1);
}
- @AfterClass
- public void tearDown()
- throws Exception {
- stopController();
- stopZk();
- }
-
@Test
public void testSegmentCrcApi()
throws Exception {
@@ -123,4 +107,11 @@ public class PinotSegmentRestletResourceTest extends
ControllerTest {
}
Assert.assertEquals(crcMap.size(), expectedSize);
}
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 71dd4c0..6e4155d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -25,9 +25,7 @@ import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -60,12 +58,8 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
startController(config);
_createTableUrl = _controllerRequestURLBuilder.forTableCreate();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_BROKER_INSTANCES, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_SERVER_INSTANCES, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
@@ -308,6 +302,7 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
index b5aa4cc..fa38d8a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -81,12 +79,8 @@ public class PinotTenantRestletResourceTest extends
ControllerTest {
// Add a table to the server
String createTableUrl = _controllerRequestURLBuilder.forTableCreate();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_BROKER_INSTANCES, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_SERVER_INSTANCES, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setServerTenant("DefaultTenant");
@@ -101,6 +95,8 @@ public class PinotTenantRestletResourceTest extends
ControllerTest {
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("DefaultTenant")));
assertEquals(tableList.get("tables").size(), 1, "Expected 1 table");
assertEquals(tableList.get("tables").get(0).asText(), "mytable_OFFLINE");
+
+ stopFakeInstances();
}
@AfterClass
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 0194d42..5ce8ced 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -26,9 +26,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.api.resources.TableViews;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -52,13 +50,8 @@ public class TableViewsTest extends ControllerTest {
throws Exception {
startZk();
startController();
-
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_BROKER_INSTANCES, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_SERVER_INSTANCES, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
// Create the offline table and add one segment
TableConfig tableConfig =
@@ -178,6 +171,7 @@ public class TableViewsTest extends ControllerTest {
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
index 731fa3f..da923c6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
@@ -40,17 +39,13 @@ public class ControllerInstanceToggleTest extends
ControllerTest {
private static final String BROKER_TAG_NAME =
TagNameUtils.getBrokerTagForTenant(null);
private static final int NUM_INSTANCES = 3;
- private final String _helixClusterName = getHelixClusterName();
-
@BeforeClass
- public void setup()
+ public void setUp()
throws Exception {
startZk();
startController();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, true);
}
@Test
@@ -62,41 +57,42 @@ public class ControllerInstanceToggleTest extends
ControllerTest {
.setNumReplicas(NUM_INSTANCES).build().toJsonConfigString();
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
tableJSONConfigString);
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getPartitionSet().size(), 1);
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_INSTANCES);
// Add segments
for (int i = 0; i < NUM_INSTANCES; i++) {
_helixResourceManager
.addNewSegment(RAW_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
- Assert.assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName,
OFFLINE_TABLE_NAME).getNumPartitions(),
- i + 1);
+ Assert
+
.assertEquals(_helixAdmin.getResourceIdealState(getHelixClusterName(),
OFFLINE_TABLE_NAME).getNumPartitions(),
+ i + 1);
}
// Disable server instances
int numEnabledInstances = NUM_INSTANCES;
- for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, SERVER_TAG_NAME)) {
+ for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
SERVER_TAG_NAME)) {
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName),
"disable");
checkNumOnlineInstancesFromExternalView(OFFLINE_TABLE_NAME,
--numEnabledInstances);
}
// Enable server instances
- for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, SERVER_TAG_NAME)) {
+ for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
SERVER_TAG_NAME)) {
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName),
"ENABLE");
checkNumOnlineInstancesFromExternalView(OFFLINE_TABLE_NAME,
++numEnabledInstances);
}
// Disable broker instances
- for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, BROKER_TAG_NAME)) {
+ for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
BROKER_TAG_NAME)) {
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName),
"Disable");
checkNumOnlineInstancesFromExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
--numEnabledInstances);
}
// Enable broker instances
- for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, BROKER_TAG_NAME)) {
+ for (String instanceName :
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
BROKER_TAG_NAME)) {
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName),
"Enable");
checkNumOnlineInstancesFromExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
++numEnabledInstances);
}
@@ -104,7 +100,7 @@ public class ControllerInstanceToggleTest extends
ControllerTest {
// Delete table
sendDeleteRequest(_controllerRequestURLBuilder.forTableDelete(RAW_TABLE_NAME));
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getPartitionSet().size(), 0);
}
@@ -112,7 +108,7 @@ public class ControllerInstanceToggleTest extends
ControllerTest {
throws InterruptedException {
long endTime = System.currentTimeMillis() + 10_000L;
while (System.currentTimeMillis() < endTime) {
- ExternalView resourceExternalView =
_helixAdmin.getResourceExternalView(_helixClusterName, resourceName);
+ ExternalView resourceExternalView =
_helixAdmin.getResourceExternalView(getHelixClusterName(), resourceName);
Set<String> instanceSet =
HelixHelper.getOnlineInstanceFromExternalView(resourceExternalView);
if (instanceSet.size() == expectedNumOnlineInstances) {
return;
@@ -124,6 +120,7 @@ public class ControllerInstanceToggleTest extends
ControllerTest {
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
index 97bf934..e00eea8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -31,23 +30,14 @@ import org.testng.annotations.Test;
public class ControllerSentinelTestV2 extends ControllerTest {
- private final String _helixClusterName = getHelixClusterName();
@BeforeClass
- public void setup()
+ public void setUp()
throws Exception {
startZk();
startController();
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, 20, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, 20, true);
- }
-
- @AfterClass
- public void tearDown() {
- stopController();
- stopZk();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(20, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(20, true);
}
@Test
@@ -60,35 +50,40 @@ public class ControllerSentinelTestV2 extends
ControllerTest {
.build().toJsonConfigString();
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
tableJSONConfigString);
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getPartitionSet().size(), 1);
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getInstanceSet(tableName + "_OFFLINE").size(), 20);
// Adding segments
for (int i = 0; i < 10; ++i) {
- Assert
- .assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName,
tableName + "_OFFLINE").getNumPartitions(),
- i);
+ Assert.assertEquals(
+ _helixAdmin.getResourceIdealState(getHelixClusterName(), tableName +
"_OFFLINE").getNumPartitions(), i);
_helixResourceManager
.addNewSegment(tableName,
SegmentMetadataMockUtils.mockSegmentMetadata(tableName), "downloadUrl");
- Assert
- .assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName,
tableName + "_OFFLINE").getNumPartitions(),
- i + 1);
+ Assert.assertEquals(
+ _helixAdmin.getResourceIdealState(getHelixClusterName(), tableName +
"_OFFLINE").getNumPartitions(), i + 1);
}
// Delete table
sendDeleteRequest(_controllerRequestURLBuilder.forTableDelete(tableName));
Assert.assertEquals(
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getPartitionSet().size(), 0);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(),
20);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(),
20);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getOfflineTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(),
20);
}
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
index 59923d5..7bd3c56 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -42,18 +41,14 @@ public class ControllerTenantTest extends ControllerTest {
private static final int NUM_REALTIME_SERVERS_PER_TAG = 1;
private static final int NUM_SERVERS_PER_TAG = NUM_OFFLINE_SERVERS_PER_TAG +
NUM_REALTIME_SERVERS_PER_TAG;
- private final String _helixClusterName = getHelixClusterName();
-
@BeforeClass
public void setUp()
throws Exception {
startZk();
startController();
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, false);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
}
@Test
@@ -62,15 +57,13 @@ public class ControllerTenantTest extends ControllerTest {
// Create broker tenants
for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
String brokerTenant = BROKER_TAG_PREFIX + i;
- String payload =
-
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(brokerTenant,
NUM_BROKERS_PER_TAG);
- sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getBrokerTagForTenant(brokerTenant))
- .size(), NUM_BROKERS_PER_TAG);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
- .size(), NUM_INSTANCES - i * NUM_BROKERS_PER_TAG);
+ createBrokerTenant(brokerTenant, NUM_BROKERS_PER_TAG);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+ NUM_BROKERS_PER_TAG);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+ NUM_INSTANCES - i * NUM_BROKERS_PER_TAG);
}
// Get broker tenants
@@ -90,26 +83,25 @@ public class ControllerTenantTest extends ControllerTest {
// Update broker tenants
for (int i = 0; i <= NUM_INSTANCES - (NUM_BROKER_TAGS - 1) *
NUM_BROKERS_PER_TAG; i++) {
String brokerTenant = BROKER_TAG_PREFIX + 1;
- String payload =
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(brokerTenant,
i);
- sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getBrokerTagForTenant(brokerTenant))
- .size(), i);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
- .size(), NUM_INSTANCES - (NUM_BROKER_TAGS - 1) *
NUM_BROKERS_PER_TAG - i);
+ updateBrokerTenant(brokerTenant, i);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+ i);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+ NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * NUM_BROKERS_PER_TAG - i);
}
// Delete broker tenants
for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
String brokerTenant = BROKER_TAG_PREFIX + i;
sendDeleteRequest(_controllerRequestURLBuilder.forBrokerTenantDelete(brokerTenant));
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getBrokerTagForTenant(brokerTenant))
- .size(), 0);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
- .size(), NUM_INSTANCES - (NUM_BROKER_TAGS - i) *
NUM_BROKERS_PER_TAG);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+ 0);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+ NUM_INSTANCES - (NUM_BROKER_TAGS - i) * NUM_BROKERS_PER_TAG);
}
}
@@ -119,19 +111,16 @@ public class ControllerTenantTest extends ControllerTest {
// Create server tenants
for (int i = 1; i <= NUM_SERVER_TAGS; i++) {
String serverTenant = SERVER_TAG_PREFIX + i;
- String payload = ControllerRequestBuilderUtil
- .buildServerTenantCreateRequestJSON(serverTenant,
NUM_SERVERS_PER_TAG, NUM_OFFLINE_SERVERS_PER_TAG,
- NUM_REALTIME_SERVERS_PER_TAG);
- sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getOfflineTagForTenant(serverTenant))
- .size(), NUM_OFFLINE_SERVERS_PER_TAG);
+ createServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG,
NUM_REALTIME_SERVERS_PER_TAG);
Assert.assertEquals(_helixAdmin
- .getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
- NUM_REALTIME_SERVERS_PER_TAG);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
- .size(), NUM_INSTANCES - i * NUM_SERVERS_PER_TAG);
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getOfflineTagForTenant(serverTenant))
+ .size(), NUM_OFFLINE_SERVERS_PER_TAG);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+ .size(), NUM_REALTIME_SERVERS_PER_TAG);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+ NUM_INSTANCES - i * NUM_SERVERS_PER_TAG);
}
// Get server tenants
@@ -152,39 +141,37 @@ public class ControllerTenantTest extends ControllerTest {
// Note: server tenants cannot scale down
for (int i = 0; i <= (NUM_INSTANCES - NUM_SERVER_TAGS *
NUM_SERVERS_PER_TAG) / 2; i++) {
String serverTenant = SERVER_TAG_PREFIX + 1;
- String payload = ControllerRequestBuilderUtil
- .buildServerTenantCreateRequestJSON(serverTenant,
NUM_SERVERS_PER_TAG + i * 2,
- NUM_OFFLINE_SERVERS_PER_TAG + i, NUM_REALTIME_SERVERS_PER_TAG +
i);
- sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getOfflineTagForTenant(serverTenant))
- .size(), NUM_OFFLINE_SERVERS_PER_TAG + i);
+ updateServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG + i,
NUM_REALTIME_SERVERS_PER_TAG + i);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getOfflineTagForTenant(serverTenant))
+ .size(), NUM_OFFLINE_SERVERS_PER_TAG + i);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+ .size(), NUM_REALTIME_SERVERS_PER_TAG + i);
Assert.assertEquals(_helixAdmin
- .getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
- NUM_REALTIME_SERVERS_PER_TAG + i);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
- .size(), NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG -
i * 2);
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+ NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG - i * 2);
}
// Delete server tenants
for (int i = 1; i < NUM_SERVER_TAGS; i++) {
String serverTenant = SERVER_TAG_PREFIX + i;
sendDeleteRequest(_controllerRequestURLBuilder.forServerTenantDelete(serverTenant));
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getOfflineTagForTenant(serverTenant))
- .size(), 0);
Assert.assertEquals(_helixAdmin
- .getInstancesInClusterWithTag(_helixClusterName,
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
- 0);
- Assert.assertEquals(
- _helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
- .size(), NUM_INSTANCES - (NUM_SERVER_TAGS - i) *
NUM_SERVERS_PER_TAG);
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getOfflineTagForTenant(serverTenant))
+ .size(), 0);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+ .size(), 0);
+ Assert.assertEquals(_helixAdmin
+ .getInstancesInClusterWithTag(getHelixClusterName(),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+ NUM_INSTANCES - (NUM_SERVER_TAGS - i) * NUM_SERVERS_PER_TAG);
}
}
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 4d49192..ab1ea56 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -28,7 +29,10 @@ import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.PutMethod;
@@ -39,21 +43,40 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.config.Tenant;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TenantRole;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.Instance.ADMIN_PORT_KEY;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
+import static
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+
/**
* Base class for controller tests.
@@ -63,6 +86,10 @@ public abstract class ControllerTest {
protected static final int DEFAULT_CONTROLLER_PORT = 18998;
protected static final String DEFAULT_DATA_DIR =
new File(FileUtils.getTempDirectoryPath(), "test-controller-" +
System.currentTimeMillis()).getAbsolutePath();
+ protected static final String BROKER_INSTANCE_ID_PREFIX =
"Broker_localhost_";
+ protected static final String SERVER_INSTANCE_ID_PREFIX =
"Server_localhost_";
+
+ protected final List<HelixManager> _fakeInstanceHelixManagers = new
ArrayList<>();
protected int _controllerPort;
protected String _controllerBaseApiUrl;
@@ -120,7 +147,10 @@ public abstract class ControllerTest {
_controllerRequestURLBuilder =
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = config.getDataDir();
- startControllerStarter(config);
+ _controllerStarter = getControllerStarter(config);
+ _controllerStarter.start();
+ _helixResourceManager = _controllerStarter.getHelixResourceManager();
+ _helixManager = _controllerStarter.getHelixControllerManager();
// HelixResourceManager is null in Helix only mode, while HelixManager is
null in Pinot only mode.
switch (_controllerStarter.getControllerMode()) {
@@ -143,27 +173,205 @@ public abstract class ControllerTest {
}
}
- protected void startControllerStarter(ControllerConf config) {
- _controllerStarter = getControllerStarter(config);
- _controllerStarter.start();
- _helixResourceManager = _controllerStarter.getHelixResourceManager();
- _helixManager = _controllerStarter.getHelixControllerManager();
- }
-
protected ControllerStarter getControllerStarter(ControllerConf config) {
return new ControllerStarter(config);
}
protected void stopController() {
- stopControllerStarter();
+ _controllerStarter.stop();
+ _controllerStarter = null;
FileUtils.deleteQuietly(new File(_controllerDataDir));
}
- protected void stopControllerStarter() {
- Assert.assertNotNull(_controllerStarter);
+ protected void addFakeBrokerInstancesToAutoJoinHelixCluster(int
numInstances, boolean isSingleTenant)
+ throws Exception {
+ for (int i = 0; i < numInstances; i++) {
+ addFakeBrokerInstanceToAutoJoinHelixCluster(BROKER_INSTANCE_ID_PREFIX +
i, isSingleTenant);
+ }
+ }
- _controllerStarter.stop();
- _controllerStarter = null;
+ protected void addFakeBrokerInstanceToAutoJoinHelixCluster(String
instanceId, boolean isSingleTenant)
+ throws Exception {
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(getHelixClusterName(), instanceId,
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+ helixManager.getStateMachineEngine()
+
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+ FakeBrokerResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+ helixManager.connect();
+ HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+ if (isSingleTenant) {
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId,
TagNameUtils.getBrokerTagForTenant(null));
+ } else {
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId,
UNTAGGED_BROKER_INSTANCE);
+ }
+ _fakeInstanceHelixManagers.add(helixManager);
+ }
+
+ public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
+ private static final String STATE_MODEL_DEF =
"BrokerResourceOnlineOfflineStateModel";
+ private static final FakeBrokerResourceOnlineOfflineStateModelFactory
FACTORY_INSTANCE =
+ new FakeBrokerResourceOnlineOfflineStateModelFactory();
+ private static final FakeBrokerResourceOnlineOfflineStateModel
STATE_MODEL_INSTANCE =
+ new FakeBrokerResourceOnlineOfflineStateModel();
+
+ private FakeBrokerResourceOnlineOfflineStateModelFactory() {
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String
partitionName) {
+ return STATE_MODEL_INSTANCE;
+ }
+
+ @SuppressWarnings("unused")
+ @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState
= "OFFLINE")
+ public static class FakeBrokerResourceOnlineOfflineStateModel extends
StateModel {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FakeBrokerResourceOnlineOfflineStateModel.class);
+
+ private FakeBrokerResourceOnlineOfflineStateModel() {
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void onBecomeOnlineFromOffline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void onBecomeOfflineFromError(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+ }
+ }
+ }
+
+ protected void addFakeServerInstancesToAutoJoinHelixCluster(int
numInstances, boolean isSingleTenant)
+ throws Exception {
+ addFakeServerInstancesToAutoJoinHelixCluster(numInstances, isSingleTenant,
DEFAULT_ADMIN_API_PORT);
+ }
+
+ protected void addFakeServerInstancesToAutoJoinHelixCluster(int
numInstances, boolean isSingleTenant,
+ int baseAdminPort)
+ throws Exception {
+ for (int i = 0; i < numInstances; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, isSingleTenant, baseAdminPort + i);
+ }
+ }
+
+ protected void addFakeServerInstanceToAutoJoinHelixCluster(String
instanceId, boolean isSingleTenant)
+ throws Exception {
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, isSingleTenant,
DEFAULT_ADMIN_API_PORT);
+ }
+
+ protected void addFakeServerInstanceToAutoJoinHelixCluster(String
instanceId, boolean isSingleTenant, int adminPort)
+ throws Exception {
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(getHelixClusterName(), instanceId,
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+ helixManager.getStateMachineEngine()
+
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+ FakeSegmentOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+ helixManager.connect();
+ HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+ if (isSingleTenant) {
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId,
TagNameUtils.getOfflineTagForTenant(null));
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId,
TagNameUtils.getRealtimeTagForTenant(null));
+ } else {
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceId,
UNTAGGED_SERVER_INSTANCE);
+ }
+ HelixConfigScope configScope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
getHelixClusterName())
+ .forParticipant(instanceId).build();
+ helixAdmin.setConfig(configScope, Collections.singletonMap(ADMIN_PORT_KEY,
Integer.toString(adminPort)));
+ _fakeInstanceHelixManagers.add(helixManager);
+ }
+
+ public static class FakeSegmentOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
+ private static final String STATE_MODEL_DEF =
"SegmentOnlineOfflineStateModel";
+ private static final FakeSegmentOnlineOfflineStateModelFactory
FACTORY_INSTANCE =
+ new FakeSegmentOnlineOfflineStateModelFactory();
+ private static final FakeSegmentOnlineOfflineStateModel
STATE_MODEL_INSTANCE =
+ new FakeSegmentOnlineOfflineStateModel();
+
+ private FakeSegmentOnlineOfflineStateModelFactory() {
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String
partitionName) {
+ return STATE_MODEL_INSTANCE;
+ }
+
+ @SuppressWarnings("unused")
+ @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'CONSUMING', 'DROPPED'}",
initialState = "OFFLINE")
+ public static class FakeSegmentOnlineOfflineStateModel extends StateModel {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSegmentOnlineOfflineStateModel.class);
+
+ private FakeSegmentOnlineOfflineStateModel() {
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void onBecomeOnlineFromOffline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+ }
+
+ @Transition(from = "OFFLINE", to = "CONSUMING")
+ public void onBecomeConsumingFromOffline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeConsumingFromOffline(): {}", message);
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOffline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "OFFLINE")
+ public void onBecomeOfflineFromOnline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+ }
+
+ @Transition(from = "ONLINE", to = "DROPPED")
+ public void onBecomeDroppedFromOnline(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+ }
+
+ @Transition(from = "CONSUMING", to = "OFFLINE")
+ public void onBecomeOfflineFromConsuming(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromConsuming(): {}", message);
+ }
+
+ @Transition(from = "CONSUMING", to = "ONLINE")
+ public void onBecomeOnlineFromConsuming(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOnlineFromConsuming(): {}", message);
+ }
+
+ @Transition(from = "CONSUMING", to = "DROPPED")
+ public void onBecomeDroppedFromConsuming(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeDroppedFromConsuming(): {}", message);
+ }
+
+ @Transition(from = "ERROR", to = "OFFLINE")
+ public void onBecomeOfflineFromError(Message message,
NotificationContext context) {
+ LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+ }
+ }
+ }
+
+ protected void stopFakeInstances() {
+ for (HelixManager helixManager : _fakeInstanceHelixManagers) {
+ helixManager.disconnect();
+ }
+ _fakeInstanceHelixManagers.clear();
}
protected Schema createDummySchema(String tableName) {
@@ -193,6 +401,45 @@ public abstract class ControllerTest {
Assert.assertEquals(postMethod.getStatusCode(), 200);
}
+ protected String getBrokerTenantRequestPayload(String tenantName, int
numBrokers)
+ throws JsonProcessingException {
+ Tenant tenant =
+ new
Tenant.TenantBuilder(tenantName).setRole(TenantRole.BROKER).setTotalInstances(numBrokers).build();
+ return JsonUtils.objectToString(tenant);
+ }
+
+ protected void createBrokerTenant(String tenantName, int numBrokers)
+ throws IOException {
+ sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(),
+ getBrokerTenantRequestPayload(tenantName, numBrokers));
+ }
+
+ protected void updateBrokerTenant(String tenantName, int numBrokers)
+ throws IOException {
+ sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(),
+ getBrokerTenantRequestPayload(tenantName, numBrokers));
+ }
+
+ protected String getServerTenantRequestPayload(String tenantName, int
numOfflineServers, int numRealtimeServers)
+ throws JsonProcessingException {
+ Tenant tenant = new
Tenant.TenantBuilder(tenantName).setRole(TenantRole.SERVER)
+ .setTotalInstances(numOfflineServers +
numRealtimeServers).setOfflineInstances(numOfflineServers)
+ .setRealtimeInstances(numRealtimeServers).build();
+ return JsonUtils.objectToString(tenant);
+ }
+
+ protected void createServerTenant(String tenantName, int numOfflineServers,
int numRealtimeServers)
+ throws IOException {
+ sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(),
+ getServerTenantRequestPayload(tenantName, numOfflineServers,
numRealtimeServers));
+ }
+
+ protected void updateServerTenant(String tenantName, int numOfflineServers,
int numRealtimeServers)
+ throws IOException {
+ sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(),
+ getServerTenantRequestPayload(tenantName, numOfflineServers,
numRealtimeServers));
+ }
+
public static String sendGetRequest(String urlString)
throws IOException {
return constructResponse(new URL(urlString).openStream());
@@ -211,7 +458,7 @@ public abstract class ControllerTest {
if (payload != null && !payload.isEmpty()) {
httpConnection.setDoOutput(true);
try (BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(httpConnection.getOutputStream(), "UTF-8"))) {
+ new OutputStreamWriter(httpConnection.getOutputStream(),
StandardCharsets.UTF_8))) {
writer.write(payload, 0, payload.length());
writer.flush();
}
@@ -226,8 +473,8 @@ public abstract class ControllerTest {
httpConnection.setDoOutput(true);
httpConnection.setRequestMethod("PUT");
- try (
- BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(httpConnection.getOutputStream(), "UTF-8"))) {
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(httpConnection.getOutputStream(),
StandardCharsets.UTF_8))) {
writer.write(payload);
writer.flush();
}
@@ -246,7 +493,7 @@ public abstract class ControllerTest {
private static String constructResponse(InputStream inputStream)
throws IOException {
- try (BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream, "UTF-8"))) {
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
StringBuilder responseBuilder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
index 191241d..a297b27 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
@@ -35,7 +35,6 @@ import org.testng.annotations.Test;
public class HelixHelperTest extends ControllerTest {
public static final String RESOURCE_NAME = "potato_OFFLINE";
public static final String INSTANCE_NAME = "Server_1.2.3.4_1234";
- private String helixClusterName;
@BeforeClass
public void setUp() {
@@ -46,8 +45,7 @@ public class HelixHelperTest extends ControllerTest {
idealState.setStateModelDefRef("OnlineOffline");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
idealState.setReplicas("0");
- helixClusterName = getHelixClusterName();
- _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
+ _helixAdmin.addResource(getHelixClusterName(), RESOURCE_NAME, idealState);
}
/**
@@ -68,7 +66,7 @@ public class HelixHelperTest extends ControllerTest {
}
}, RetryPolicies.noDelayRetryPolicy(1));
- IdealState resourceIdealState =
_helixAdmin.getResourceIdealState(helixClusterName, RESOURCE_NAME);
+ IdealState resourceIdealState =
_helixAdmin.getResourceIdealState(getHelixClusterName(), RESOURCE_NAME);
for (int i = 0; i < numSegments; i++) {
Assert.assertEquals(resourceIdealState.getInstanceStateMap("segment_" +
i).get(INSTANCE_NAME), "ONLINE");
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index d65e818..06b5de8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -22,13 +22,11 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -37,28 +35,23 @@ import org.testng.annotations.Test;
public class PinotResourceManagerTest extends ControllerTest {
- private final static String TABLE_NAME = "testTable";
-
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
- private ZkClient _zkClient;
+ private static final String TABLE_NAME = "testTable";
@BeforeClass
public void setUp()
throws Exception {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
-
+ startZk();
startController();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 1, true);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, 1, true);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(), 1);
- Assert
-
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size(), 1);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
+ 1);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size(),
+ 1);
Assert
-
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_REALTIME").size(), 1);
+
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_REALTIME").size(),
+ 1);
// Adding table
TableConfig tableConfig =
@@ -78,11 +71,13 @@ public class PinotResourceManagerTest extends
ControllerTest {
Assert.assertTrue(_helixResourceManager.updateZkMetadata("testTable_OFFLINE",
segmentZKMetadata));
// Update ZK metadata
- Assert.assertEquals(
- _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
"testSegment").getVersion(), 0);
+ Assert
+
.assertEquals(_helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
"testSegment").getVersion(),
+ 0);
Assert.assertTrue(_helixResourceManager.updateZkMetadata("testTable_OFFLINE",
segmentZKMetadata, 0));
- Assert.assertEquals(
- _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
"testSegment").getVersion(), 1);
+ Assert
+
.assertEquals(_helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
"testSegment").getVersion(),
+ 1);
Assert.assertFalse(_helixResourceManager.updateZkMetadata("testTable_OFFLINE",
segmentZKMetadata, 0));
}
@@ -151,8 +146,8 @@ public class PinotResourceManagerTest extends
ControllerTest {
@AfterClass
public void tearDown() {
- _helixResourceManager.stop();
- _zkClient.close();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
+ stopFakeInstances();
+ stopController();
+ stopZk();
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index fc07811..d130e31 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -51,9 +51,7 @@ import
org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.TenantRole;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -82,8 +80,6 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
private static final long TIMEOUT_IN_MS = 10_000L;
- private final String _helixClusterName = getHelixClusterName();
-
@BeforeClass
public void setUp()
throws Exception {
@@ -91,13 +87,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
ControllerConf config = getDefaultControllerConfiguration();
config.setTenantIsolationEnabled(false);
startController(config);
-
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
- false);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
- BASE_SERVER_ADMIN_PORT);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false,
BASE_SERVER_ADMIN_PORT);
// Create server tenant on all Servers
Tenant serverTenant =
@@ -114,7 +105,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
BiMap<String, String> endpoints =
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
for (int i = 0; i < NUM_INSTANCES; i++) {
- Assert.assertTrue(endpoints.inverse().containsKey("localhost:" +
String.valueOf(BASE_SERVER_ADMIN_PORT + i)));
+ Assert.assertTrue(endpoints.inverse().containsKey("localhost:" +
(BASE_SERVER_ADMIN_PORT + i)));
}
}
@@ -124,7 +115,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
for (String server : servers) {
InstanceConfig cachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(server);
- InstanceConfig realInstanceConfig =
_helixAdmin.getInstanceConfig(_helixClusterName, server);
+ InstanceConfig realInstanceConfig =
_helixAdmin.getInstanceConfig(getHelixClusterName(), server);
Assert.assertEquals(cachedInstanceConfig, realInstanceConfig);
}
@@ -140,7 +131,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
private void modifyExistingInstanceConfig(ZkClient zkClient)
throws InterruptedException {
String instanceName = "Server_localhost_" + new
Random().nextInt(NUM_INSTANCES);
- String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(getHelixClusterName(), instanceName);
Assert.assertTrue(zkClient.exists(instanceConfigPath));
ZNRecord znRecord = zkClient.readData(instanceConfigPath, null);
@@ -172,8 +163,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
throws Exception {
int biggerRandomNumber = NUM_INSTANCES + new
Random().nextInt(NUM_INSTANCES);
- String instanceName = "Server_localhost_" +
String.valueOf(biggerRandomNumber);
- String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ String instanceName = "Server_localhost_" + biggerRandomNumber;
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(getHelixClusterName(), instanceName);
Assert.assertFalse(zkClient.exists(instanceConfigPath));
List<String> instances = _helixResourceManager.getAllInstances();
Assert.assertFalse(instances.contains(instanceName));
@@ -217,19 +208,20 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
// Check that the BrokerResource ideal state has 3 Brokers assigned to the
table
IdealState idealState = _helixResourceManager.getHelixAdmin()
- .getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ .getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(),
3);
// Untag all Brokers assigned to broker tenant
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
- _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
// Rebuilding the broker tenant should update the ideal state size
_helixResourceManager.rebuildBrokerResourceFromHelixTags(OFFLINE_TABLE_NAME);
- idealState = _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ idealState =
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(),
0);
// Create broker tenant on 5 Brokers
@@ -238,7 +230,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
// Rebuilding the broker tenant should update the ideal state size
_helixResourceManager.rebuildBrokerResourceFromHelixTags(OFFLINE_TABLE_NAME);
- idealState = _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ idealState =
+ _helixAdmin.getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(),
5);
// Delete the table
@@ -289,13 +282,13 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
String testBrokerInstance =
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME).iterator().next();
- _helixAdmin.addInstanceTag(_helixClusterName, testBrokerInstance,
"wrong_tag");
+ _helixAdmin.addInstanceTag(getHelixClusterName(), testBrokerInstance,
"wrong_tag");
brokerTenantNames = _helixResourceManager.getAllBrokerTenantNames();
Assert.assertEquals(brokerTenantNames.size(), 1);
Assert.assertEquals(brokerTenantNames.iterator().next(),
BROKER_TENANT_NAME);
- _helixAdmin.removeInstanceTag(_helixClusterName, testBrokerInstance,
"wrong_tag");
+ _helixAdmin.removeInstanceTag(getHelixClusterName(), testBrokerInstance,
"wrong_tag");
// Server tenant is already created during setup.
Set<String> serverTenantNames =
_helixResourceManager.getAllServerTenantNames();
@@ -304,13 +297,13 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
String testServerInstance =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME).iterator().next();
- _helixAdmin.addInstanceTag(_helixClusterName, testServerInstance,
"wrong_tag");
+ _helixAdmin.addInstanceTag(getHelixClusterName(), testServerInstance,
"wrong_tag");
serverTenantNames = _helixResourceManager.getAllServerTenantNames();
Assert.assertEquals(serverTenantNames.size(), 1);
Assert.assertEquals(serverTenantNames.iterator().next(),
SERVER_TENANT_NAME);
- _helixAdmin.removeInstanceTag(_helixClusterName, testServerInstance,
"wrong_tag");
+ _helixAdmin.removeInstanceTag(getHelixClusterName(), testServerInstance,
"wrong_tag");
}
@Test
@@ -465,8 +458,9 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
- _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(brokerTag));
- _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ _helixAdmin
+ .removeInstanceTag(getHelixClusterName(), brokerInstance,
TagNameUtils.getBrokerTagForTenant(brokerTag));
+ _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
@@ -572,14 +566,15 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
public void cleanUpBrokerTags() {
// Untag all Brokers for other tests
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
- _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
index 84c8b50..2350b87 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
@@ -31,9 +31,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
import
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
import org.testng.Assert;
@@ -49,9 +47,9 @@ public class ReplicaGroupRebalanceStrategyTest extends
ControllerTest {
private static final int INITIAL_NUM_SEGMENTS = 20;
private static final String TABLE_NAME = "testReplicaRebalanceReplace";
- private final static String PARTITION_COLUMN = "memberId";
- private final static String OFFLINE_TENENT_NAME = "DefaultTenant_OFFLINE";
- private final static String NEW_SEGMENT_PREFIX = "new_segment_";
+ private static final String PARTITION_COLUMN = "memberId";
+ private static final String OFFLINE_TENENT_NAME = "DefaultTenant_OFFLINE";
+ private static final String NEW_SEGMENT_PREFIX = "new_segment_";
private final TableConfig.Builder _offlineBuilder = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE);
@@ -63,12 +61,8 @@ public class ReplicaGroupRebalanceStrategyTest extends
ControllerTest {
ControllerConf config = getDefaultControllerConfiguration();
config.setTableMinReplicas(MIN_NUM_REPLICAS);
startController(config);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_BROKER_INSTANCES, true);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR,
- NUM_SERVER_INSTANCES, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
@@ -78,9 +72,7 @@ public class ReplicaGroupRebalanceStrategyTest extends
ControllerTest {
// Join 4 more servers as untagged
String[] instanceNames = {"Server_localhost_a", "Server_localhost_b",
"Server_localhost_c", "Server_localhost_d"};
for (String instanceName : instanceNames) {
- ControllerRequestBuilderUtil
- .addFakeDataInstanceToAutoJoinHelixCluster(getHelixClusterName(),
ZkStarter.DEFAULT_ZK_STR, instanceName,
- true);
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceName, true);
_helixAdmin.removeInstanceTag(getHelixClusterName(), instanceName,
OFFLINE_TENENT_NAME);
}
} catch (Exception e) {
@@ -88,12 +80,6 @@ public class ReplicaGroupRebalanceStrategyTest extends
ControllerTest {
}
}
- @AfterClass
- public void tearDown() {
- stopController();
- stopZk();
- }
-
@Test
public void testReplicaGroupRebalanceStrategy()
throws Exception {
@@ -324,4 +310,11 @@ public class ReplicaGroupRebalanceStrategyTest extends
ControllerTest {
_helixResourceManager
.setExistingTableConfig(tableConfig, tableNameWithType,
CommonConstants.Helix.TableType.OFFLINE);
}
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 836824c..5376911 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.IndexingConfig;
@@ -35,71 +33,39 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
import
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class SegmentAssignmentStrategyTest extends ControllerTest {
- private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
- private final static String TABLE_NAME_BALANCED = "testResourceBalanced";
- private final static String TABLE_NAME_RANDOM = "testResourceRandom";
- private final static String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT =
"testReplicaGroupPartitionAssignment";
- private final static String TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP =
"testTableLevelReplicaGroup";
- private final static String TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP =
"testPartitionLevelReplicaGroup";
-
- private static final Random random = new Random();
- private final static String PARTITION_COLUMN = "memberId";
- private final static int NUM_REPLICA = 2;
- private ZkClient _zkClient;
- private final int _numServerInstance = 10;
- private final int _numBrokerInstance = 1;
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
+ private static final String TABLE_NAME_BALANCED = "testResourceBalanced";
+ private static final String TABLE_NAME_RANDOM = "testResourceRandom";
+ private static final String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT =
"testReplicaGroupPartitionAssignment";
+ private static final String TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP =
"testTableLevelReplicaGroup";
+ private static final String TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP =
"testPartitionLevelReplicaGroup";
+ private static final String PARTITION_COLUMN = "memberId";
+ private static final int NUM_REPLICAS = 2;
+ private static final int NUM_SERVERS = 10;
+ private static final int NUM_BROKERS = 1;
+ private static final Random RANDOM = new Random();
+
private ReplicaGroupPartitionAssignmentGenerator
_partitionAssignmentGenerator;
- @BeforeTest
- public void setup()
+ @BeforeClass
+ public void setUp()
throws Exception {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- _zkClient = new ZkClient(ZK_SERVER);
- final String zkPath = "/" + getHelixClusterName();
- if (_zkClient.exists(zkPath)) {
- _zkClient.deleteRecursive(zkPath);
- }
+ startZk();
startController();
- HelixManager helixZkManager = _helixResourceManager.getHelixZkManager();
-
- _partitionAssignmentGenerator =
- new
ReplicaGroupPartitionAssignmentGenerator(helixZkManager.getHelixPropertyStore());
-
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_SERVER, _numServerInstance, true);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_SERVER, _numBrokerInstance, true);
- Thread.sleep(100);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size(),
- _numServerInstance);
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_REALTIME").size(),
- _numServerInstance);
-
-
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
- _numBrokerInstance);
- }
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
- @AfterTest
- public void tearDown() {
- _helixResourceManager.stop();
- _zkClient.close();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
+ _partitionAssignmentGenerator = new
ReplicaGroupPartitionAssignmentGenerator(_propertyStore);
}
@Test
@@ -108,7 +74,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
// Adding table
TableConfig tableConfig =
new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_RANDOM)
-
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
+
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS).build();
_helixResourceManager.addTable(tableConfig);
// Wait for the table addition
@@ -125,8 +91,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
while (!allSegmentsPushedToIdealState(TABLE_NAME_RANDOM, i + 1)) {
Thread.sleep(100);
}
- final Set<String> taggedInstances =
-
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+ final Set<String> taggedInstances =
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
final Map<String, Integer> instanceToNumSegmentsMap = new HashMap<>();
for (final String instance : taggedInstances) {
instanceToNumSegmentsMap.put(instance, 0);
@@ -135,7 +100,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
.getResourceIdealState(getHelixClusterName(),
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
Assert.assertEquals(idealState.getPartitionSet().size(), i + 1);
for (final String segmentId : idealState.getPartitionSet()) {
- Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(),
NUM_REPLICA);
+ Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(),
NUM_REPLICAS);
}
}
}
@@ -163,8 +128,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
Thread.sleep(100);
}
- final Set<String> taggedInstances =
-
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+ final Set<String> taggedInstances =
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
final Map<String, Integer> instance2NumSegmentsMap = new HashMap<>();
for (final String instance : taggedInstances) {
instance2NumSegmentsMap.put(instance, 0);
@@ -177,9 +141,9 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
}
}
final int totalSegments = (numSegments) * numReplicas;
- final int minNumSegmentsPerInstance = totalSegments / _numServerInstance;
+ final int minNumSegmentsPerInstance = totalSegments / NUM_SERVERS;
int maxNumSegmentsPerInstance = minNumSegmentsPerInstance;
- if ((minNumSegmentsPerInstance * _numServerInstance) < totalSegments) {
+ if ((minNumSegmentsPerInstance * NUM_SERVERS) < totalSegments) {
maxNumSegmentsPerInstance = maxNumSegmentsPerInstance + 1;
}
for (final String instance : instance2NumSegmentsMap.keySet()) {
@@ -200,7 +164,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
// Adding a table without replica group
TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
-
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
+
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS).build();
_helixResourceManager.addTable(tableConfig);
// Check that partition assignment does not exist
@@ -216,7 +180,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true);
TableConfig replicaGroupTableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICA)
+
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -257,13 +221,10 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
Map<String, String> streamConfigMap =
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
// Adding a table without replica group
- TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
- TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
- .setSegmentAssignmentStrategy("RandomAssignmentStrategy")
- .setNumReplicas(NUM_REPLICA)
- .setStreamConfigs(streamConfigMap)
- .setLLC(true)
- .build();
+ TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
+ .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
+
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS)
+ .setStreamConfigs(streamConfigMap).setLLC(true).build();
try {
_helixResourceManager.addTable(tableConfig);
} catch (Exception e) {
@@ -282,20 +243,17 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition);
replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true);
- TableConfig replicaGroupTableConfig =
- new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
- TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
- .setNumReplicas(NUM_REPLICA)
-
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy")
- .setStreamConfigs(streamConfigMap)
- .setLLC(true)
- .build();
+ TableConfig replicaGroupTableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
+
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS)
+
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").setStreamConfigs(streamConfigMap)
+ .setLLC(true).build();
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
// Check that the replica group partition assignment is created
try {
- _helixResourceManager.setExistingTableConfig(replicaGroupTableConfig,
tableNameWithType, CommonConstants.Helix.TableType.REALTIME);
+ _helixResourceManager
+ .setExistingTableConfig(replicaGroupTableConfig, tableNameWithType,
CommonConstants.Helix.TableType.REALTIME);
} catch (Exception e) {
// ignore
}
@@ -333,7 +291,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
// Create table config
TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-
.setTableName(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICA)
+
.setTableName(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -373,9 +331,9 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
@Test
public void testPartitionLevelReplicaGroupSegmentAssignmentStrategy()
throws Exception {
- int totalPartitionNumber = random.nextInt(8) + 2;
- int numInstancesPerPartition = random.nextInt(5) + 1;
- int numSegments = random.nextInt(10) + 10;
+ int totalPartitionNumber = RANDOM.nextInt(8) + 2;
+ int numInstancesPerPartition = RANDOM.nextInt(5) + 1;
+ int numSegments = RANDOM.nextInt(10) + 10;
// Create the configuration for segment assignment strategy.
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new
ReplicaGroupStrategyConfig();
@@ -392,7 +350,7 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
// Create table config
TableConfig tableConfig = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-
.setTableName(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICA)
+
.setTableName(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICAS)
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -436,4 +394,11 @@ public class SegmentAssignmentStrategyTest extends
ControllerTest {
return idealState != null && idealState.getPartitionSet() != null
&& idealState.getPartitionSet().size() == segmentNum;
}
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 55c13a5..3f7097e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -21,11 +21,11 @@ package org.apache.pinot.controller.validation;
import java.util.ArrayList;
import java.util.List;
import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -33,11 +33,10 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.util.TestUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -54,33 +53,23 @@ import static org.testng.Assert.assertEquals;
* Tests for the ValidationManagers.
*/
public class ValidationManagerTest extends ControllerTest {
- private static final String ZK_STR = ZkStarter.DEFAULT_ZK_STR;
private static final String TEST_TABLE_NAME = "testTable";
private static final String TEST_TABLE_TWO = "testTable2";
private static final String TEST_SEGMENT_NAME = "testSegment";
- private ZkClient _zkClient;
-
- private ZkStarter.ZookeeperInstance _zookeeperInstance;
private TableConfig _offlineTableConfig;
- private HelixManager _helixManager;
@BeforeClass
public void setUp()
throws Exception {
- _zookeeperInstance = ZkStarter.startLocalZkServer();
- _zkClient = new ZkClient(ZK_STR);
- Thread.sleep(1000);
-
+ startZk();
startController();
-
-
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_STR, 2, true);
-
ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
ZK_STR, 2, true);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(2, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(2, true);
_offlineTableConfig =
new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
.build();
- _helixManager = _helixResourceManager.getHelixZkManager();
_helixResourceManager.addTable(_offlineTableConfig);
}
@@ -124,8 +113,7 @@ public class ValidationManagerTest extends ControllerTest {
}
@Test
- public void testPushTimePersistence()
- throws Exception {
+ public void testPushTimePersistence() {
SegmentMetadata segmentMetadata =
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
_helixResourceManager.addNewSegment(TEST_TABLE_NAME, segmentMetadata,
"http://dummy/");
@@ -138,11 +126,16 @@ public class ValidationManagerTest extends ControllerTest
{
assertEquals(offlineSegmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
// Refresh the segment
+ // NOTE: In order to send the refresh message, the segment need to be in
the ExternalView
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView externalView = _helixAdmin
+ .getResourceExternalView(getHelixClusterName(),
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME));
+ return externalView != null &&
externalView.getPartitionSet().contains(TEST_SEGMENT_NAME);
+ }, 30_000L, "Failed to find the segment in the ExternalView");
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
_helixResourceManager.refreshSegment(TEST_TABLE_NAME, segmentMetadata,
offlineSegmentZKMetadata);
- offlineSegmentZKMetadata =
- _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
+ offlineSegmentZKMetadata =
_helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
// Check that the segment still has the same push time
assertEquals(offlineSegmentZKMetadata.getPushTime(), pushTime);
// Check that the refresh time is in the last 30 seconds
@@ -186,13 +179,6 @@ public class ValidationManagerTest extends ControllerTest {
15);
}
- @AfterClass
- public void shutDown() {
- _helixResourceManager.stop();
- _zkClient.close();
- ZkStarter.stopLocalZkServer(_zookeeperInstance);
- }
-
@Test
public void testComputeNumMissingSegments() {
Interval jan1st = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new
DateTime(2015, 1, 1, 23, 59, 59));
@@ -226,4 +212,11 @@ public class ValidationManagerTest extends ControllerTest {
jan1st2nd4th5th.add(jan5th);
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th,
Duration.standardDays(1)), 1);
}
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 6bb7acd..c5e93bf 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -43,8 +43,8 @@ import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
@@ -84,7 +84,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
private Connection _h2Connection;
private QueryGenerator _queryGenerator;
-
/**
* The following getters can be overridden to change default settings.
*/
@@ -324,9 +323,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
@Override
public void run() {
try {
- ClusterIntegrationTestUtils
- .pushAvroIntoKafka(avroFiles,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn());
+ ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles,
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+ getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn());
} catch (Exception e) {
// Ignored
}
@@ -336,8 +334,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected void startKafka() {
- _kafkaStarters =
- KafkaStarterUtils.startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+ _kafkaStarters = KafkaStarterUtils
+ .startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
_kafkaStarters.get(0)
.createTopic(getKafkaTopic(),
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
@@ -423,14 +421,14 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
addRealtimeTable(getTableName(), useLlc(),
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile,
timeColumnName, timeType, schemaName,
- getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
- getInvertedIndexColumns(), getBloomFilterIndexColumns(),
getRawIndexColumns(), getTaskConfig(),
- getStreamConsumerFactoryClassName());
+ getBrokerTenant(), getServerTenant(), getLoadMode(),
getSortedColumn(), getInvertedIndexColumns(),
+ getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(),
getStreamConsumerFactoryClassName());
completeTableConfiguration();
}
- protected void completeTableConfiguration() throws IOException {
+ protected void completeTableConfiguration()
+ throws IOException {
if (isUsingNewConfigFormat()) {
CombinedConfig combinedConfig = new CombinedConfig(_offlineTableConfig,
_realtimeTableConfig, _schema);
sendPostRequest(_controllerRequestURLBuilder.forNewTableCreate(),
Serializer.serializeToString(combinedConfig));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 857a16d..83b46ba 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -141,7 +141,6 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
+ "HAVING SUM(ArrDelay) <> 6325.973 AND AVG(CAST(CRSDepTime AS
DOUBLE)) <= 1569.8755 OR SUM(TaxiIn) = 1003.87274"));
}
-
/**
* Test hardcoded queries.
* <p>NOTE:
@@ -361,14 +360,14 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
*/
public void testInstanceShutdown()
throws Exception {
- List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+ List<String> instances =
_helixAdmin.getInstancesInCluster(getHelixClusterName());
Assert.assertFalse(instances.isEmpty(), "List of instances should not be
empty");
// Mark all instances in the cluster as shutting down
for (String instance : instances) {
- InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(_clusterName, instance);
+ InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
true);
- _helixAdmin.setInstanceConfig(_clusterName, instance, instanceConfig);
+ _helixAdmin.setInstanceConfig(getHelixClusterName(), instance,
instanceConfig);
}
// Check that the routing table is empty
@@ -376,9 +375,9 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
// Mark all instances as not shutting down
for (String instance : instances) {
- InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(_clusterName, instance);
+ InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
false);
- _helixAdmin.setInstanceConfig(_clusterName, instance, instanceConfig);
+ _helixAdmin.setInstanceConfig(getHelixClusterName(), instance,
instanceConfig);
}
// Check that the routing table is not empty
@@ -394,16 +393,16 @@ public abstract class BaseClusterIntegrationTestSet
extends BaseClusterIntegrati
checkForInstanceInRoutingTable(true, instanceName);
// Mark the server instance as shutting down
- InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(_clusterName, instanceName);
+ InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(getHelixClusterName(), instanceName);
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
true);
- _helixAdmin.setInstanceConfig(_clusterName, instanceName,
instanceConfig);
+ _helixAdmin.setInstanceConfig(getHelixClusterName(), instanceName,
instanceConfig);
// Check that it is not in the routing table
checkForInstanceInRoutingTable(false, instanceName);
// Re-enable the server instance
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
false);
- _helixAdmin.setInstanceConfig(_clusterName, instanceName,
instanceConfig);
+ _helixAdmin.setInstanceConfig(getHelixClusterName(), instanceName,
instanceConfig);
// Check that it is in the routing table
checkForInstanceInRoutingTable(true, instanceName);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 60c6c64..a9dbd1d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -60,7 +60,6 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -88,7 +87,6 @@ public abstract class ClusterTest extends ControllerTest {
private static final Random RANDOM = new Random();
private static final int DEFAULT_BROKER_PORT = 18099;
- protected final String _clusterName = getHelixClusterName();
protected String _brokerBaseApiUrl;
private List<HelixBrokerStarter> _brokerStarters = new ArrayList<>();
@@ -129,7 +127,7 @@ public abstract class ClusterTest extends ControllerTest {
brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE,
Broker.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
}
overrideBrokerConf(brokerConf);
- HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf,
_clusterName, zkStr);
+ HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf,
getHelixClusterName(), zkStr, LOCAL_HOST);
brokerStarter.start();
_brokerStarters.add(brokerStarter);
}
@@ -171,7 +169,7 @@ public abstract class ClusterTest extends ControllerTest {
.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR,
Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT,
baseAdminApiPort - i);
configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort +
i);
- _serverStarters.add(new HelixServerStarter(_clusterName, zkStr,
configuration));
+ _serverStarters.add(new HelixServerStarter(getHelixClusterName(),
zkStr, configuration));
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -191,7 +189,7 @@ public abstract class ClusterTest extends ControllerTest {
config.setProperty(Helix.Instance.INSTANCE_ID_KEY,
Minion.INSTANCE_PREFIX + "minion" + i + "_" +
(Minion.DEFAULT_HELIX_PORT + i));
config.setProperty(Helix.Instance.DATA_DIR_KEY,
Minion.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
- MinionStarter minionStarter = new
MinionStarter(ZkStarter.DEFAULT_ZK_STR, _clusterName, config);
+ MinionStarter minionStarter = new
MinionStarter(ZkStarter.DEFAULT_ZK_STR, getHelixClusterName(), config);
// Register task executor factories
if (taskExecutorFactoryRegistry != null) {
@@ -499,20 +497,6 @@ public abstract class ClusterTest extends ControllerTest {
bloomFilterColumns, noDictionaryColumns, taskConfig,
streamConsumerFactoryName);
}
- protected void createBrokerTenant(String tenantName, int brokerCount)
- throws Exception {
- String request =
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(tenantName,
brokerCount);
- sendPostRequest(_controllerRequestURLBuilder.forBrokerTenantCreate(),
request);
- }
-
- protected void createServerTenant(String tenantName, int offlineServerCount,
int realtimeServerCount)
- throws Exception {
- String request = ControllerRequestBuilderUtil
- .buildServerTenantCreateRequestJSON(tenantName, offlineServerCount +
realtimeServerCount, offlineServerCount,
- realtimeServerCount);
- sendPostRequest(_controllerRequestURLBuilder.forServerTenantCreate(),
request);
- }
-
protected JsonNode getDebugInfo(final String uri)
throws Exception {
return JsonUtils.stringToJsonNode(sendGetRequest(_brokerBaseApiUrl + "/" +
uri));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 64e2470..106ad78 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -90,15 +90,19 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
}
+
public long getStatusCheckerInitialDelayInSeconds() {
return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
}
+
public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
}
+
public long getBrokerResourceValidationInitialDelayInSeconds() {
return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
}
+
public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
}
@@ -119,7 +123,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* @throws Exception
*/
@BeforeClass
- public void setUp() throws Exception {
+ public void setUp()
+ throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
startZk();
@@ -159,7 +164,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
/**
* Setup offline table, but no segments
*/
- private void setupOfflineTable(String table) throws Exception {
+ private void setupOfflineTable(String table)
+ throws Exception {
_realtimeTableConfig = null;
addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null,
SegmentVersion.v1, null, null, null, null, null);
completeTableConfiguration();
@@ -168,7 +174,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
/**
* Setup offline table, with segments from avro
*/
- private void setupOfflineTableAndSegments(String tableName, List<File>
avroFiles) throws Exception {
+ private void setupOfflineTableAndSegments(String tableName, List<File>
avroFiles)
+ throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
setTableName(tableName);
_realtimeTableConfig = null;
@@ -184,12 +191,13 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
Assert.assertNotNull(outgoingTimeUnit);
String timeType = outgoingTimeUnit.toString();
- addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME,
TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
+ addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME,
TENANT_NAME, null, SegmentVersion.v1, null, null,
+ null, null, null);
completeTableConfiguration();
ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0,
_segmentDir, _tarDir, tableName, false,
- null, null, null, executor);
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName,
false, null, null, null, executor);
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
uploadSegments(getTableName(), _tarDir);
@@ -199,7 +207,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
/**
* Setup realtime table for given tablename and topic
*/
- private void setupRealtimeTable(String table, String topic, File avroFile)
throws Exception {
+ private void setupRealtimeTable(String table, String topic, File avroFile)
+ throws Exception {
_offlineTableConfig = null;
File schemaFile = getSchemaFile();
Schema schema = Schema.fromFile(schemaFile);
@@ -233,7 +242,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* @throws Exception
*/
@BeforeGroups(groups = "segmentStatusChecker")
- public void beforeTestSegmentStatusCheckerTest(ITestContext context) throws
Exception {
+ public void beforeTestSegmentStatusCheckerTest(ITestContext context)
+ throws Exception {
String emptyTable = "table1_OFFLINE";
String disabledOfflineTable = "table2_OFFLINE";
String basicOfflineTable = getDefaultOfflineTableName();
@@ -253,7 +263,7 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
// table with disabled ideal state
setupOfflineTable(disabledOfflineTable);
- _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+ _helixAdmin.enableResource(getHelixClusterName(), disabledOfflineTable,
false);
// some segments offline
setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
@@ -282,7 +292,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* Validate that we are seeing the expected numbers
*/
@Test(groups = "segmentStatusChecker")
- public void testSegmentStatusChecker(ITestContext context) throws Exception {
+ public void testSegmentStatusChecker(ITestContext context)
+ throws Exception {
String emptyTable = (String) context.getAttribute("emptyTable");
String disabledOfflineTable = (String)
context.getAttribute("disabledOfflineTable");
String basicOfflineTable = (String)
context.getAttribute("basicOfflineTable");
@@ -292,9 +303,9 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
- TestUtils.waitForCondition(input ->
-
controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
- "SegmentStatusChecker") >= numTables, 240_000, "Timed out waiting
for SegmentStatusChecker");
+ TestUtils.waitForCondition(input -> controllerMetrics
+
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
"SegmentStatusChecker") >= numTables,
+ 240_000, "Timed out waiting for SegmentStatusChecker");
// empty table - table1_OFFLINE
// num replicas set from ideal state
@@ -344,7 +355,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
}
@AfterGroups(groups = "segmentStatusChecker")
- public void afterTestSegmentStatusChecker(ITestContext context) throws
Exception {
+ public void afterTestSegmentStatusChecker(ITestContext context)
+ throws Exception {
String emptyTable = (String) context.getAttribute("emptyTable");
String disabledOfflineTable = (String)
context.getAttribute("disabledOfflineTable");
String errorOfflineTable = (String)
context.getAttribute("errorOfflineTable");
@@ -361,7 +373,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* @throws Exception
*/
@BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups =
"segmentStatusChecker")
- public void beforeRealtimeSegmentRelocatorTest(ITestContext context) throws
Exception {
+ public void beforeRealtimeSegmentRelocatorTest(ITestContext context)
+ throws Exception {
String relocationTable = getDefaultRealtimeTableName();
context.setAttribute("relocationTable", relocationTable);
@@ -377,20 +390,22 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
}
@Test(groups = "realtimeSegmentRelocator", dependsOnGroups =
"segmentStatusChecker")
- public void testRealtimeSegmentRelocator(ITestContext context) throws
Exception {
+ public void testRealtimeSegmentRelocator(ITestContext context)
+ throws Exception {
String relocationTable = (String) context.getAttribute("relocationTable");
ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
- long taskRunCount =
controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
- ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+ long taskRunCount =
+ controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
+ .count();
TestUtils.waitForCondition(input ->
controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
.count() > taskRunCount, 60_000, "Timed out waiting for
RealtimeSegmentRelocation to run");
-
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
- "RealtimeSegmentRelocator") > 0);
+ Assert.assertTrue(controllerMetrics
+
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
"RealtimeSegmentRelocator") > 0);
// check servers for ONLINE segment and CONSUMING segments are disjoint
sets
Set<String> consuming = new HashSet<>();
@@ -425,7 +440,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
// Check that the first table we added doesn't need to be rebuilt(case
where ideal state brokers and brokers in broker resource are the same.
String table1 = (String) context.getAttribute("testTableOne");
String table2 = (String) context.getAttribute("testTableTwo");
- TableConfig tableConfigOne = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
+ TableConfig tableConfigOne =
+ new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
String partitionNameOne = tableConfigOne.getTableName();
// Ensure that the broker resource is not rebuilt.
@@ -517,27 +533,29 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* @throws Exception
*/
- @Test(groups="realtimeSegmentValidationManager", dependsOnGroups =
"offlineSegmentIntervalChecker")
- public void testRealtimeSegmentValidationManager(ITestContext context)
throws Exception {
+ @Test(groups = "realtimeSegmentValidationManager", dependsOnGroups =
"offlineSegmentIntervalChecker")
+ public void testRealtimeSegmentValidationManager(ITestContext context)
+ throws Exception {
ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
- long taskRunCount =
controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager",
- ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+ long taskRunCount = controllerMetrics
+ .getMeteredTableValue("RealtimeSegmentValidationManager",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
// Wait until the RealtimeSegmentValidationManager runs at least once.
Most likely it already ran once
// on the realtime table (default one) already setup, so we should have
the total document count on that
// realtime table.
- TestUtils.waitForCondition(input ->
-
controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
- .count() > taskRunCount, 60_000, "Timed out waiting for
RealtimeSegmentValidationManager to run");
+ TestUtils.waitForCondition(input -> controllerMetrics
+ .getMeteredTableValue("RealtimeSegmentValidationManager",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count()
+ > taskRunCount, 60_000, "Timed out waiting for
RealtimeSegmentValidationManager to run");
-
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
- "RealtimeSegmentValidationManager") > 0);
+ Assert.assertTrue(controllerMetrics
+
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
"RealtimeSegmentValidationManager")
+ > 0);
RealtimeSegmentValidationManager validationManager =
_controllerStarter.getRealtimeSegmentValidationManager();
ValidationMetrics validationMetrics =
validationManager.getValidationMetrics();
// Make sure we processed the realtime table to get the total document
count. Should have been done the first
// time RealtimeSegmentValidationManager ran on the default realtime table.
-
Assert.assertTrue(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(),
- "TotalDocumentCount")) > 0);
+ Assert.assertTrue(validationMetrics
+
.getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(),
"TotalDocumentCount")) > 0);
}
// TODO: tests for other ControllerPeriodicTasks (RetentionManagert ,
RealtimeSegmentValidationManager)
@@ -565,7 +583,8 @@ public class ControllerPeriodicTasksIntegrationTests
extends BaseClusterIntegrat
* @throws Exception
*/
@AfterClass
- public void tearDown() throws Exception {
+ public void tearDown()
+ throws Exception {
stopServer();
stopBroker();
stopController();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index dce6a5e..714986c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -136,17 +136,17 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
private void registerCallbackHandlers() {
- List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+ List<String> instances =
_helixAdmin.getInstancesInCluster(getHelixClusterName());
instances.removeIf(instance ->
(!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) &&
!instance
.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
- List<String> resourcesInCluster =
_helixAdmin.getResourcesInCluster(_clusterName);
+ List<String> resourcesInCluster =
_helixAdmin.getResourcesInCluster(getHelixClusterName());
resourcesInCluster.removeIf(
resource -> (!TableNameBuilder.isTableResource(resource) &&
!CommonConstants.Helix.BROKER_RESOURCE_INSTANCE
.equals(resource)));
for (String instance : instances) {
List<String> resourcesToMonitor = new ArrayList<>();
for (String resourceName : resourcesInCluster) {
- IdealState idealState =
_helixAdmin.getResourceIdealState(_clusterName, resourceName);
+ IdealState idealState =
_helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName);
for (String partitionName : idealState.getPartitionSet()) {
if (idealState.getInstanceSet(partitionName).contains(instance)) {
resourcesToMonitor.add(resourceName);
@@ -155,10 +155,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
}
}
_serviceStatusCallbacks.add(new
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
- .of(new
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
_clusterName,
- instance, resourcesToMonitor, 100.0),
- new
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
_clusterName,
- instance, resourcesToMonitor, 100.0))));
+ .of(new
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
+ getHelixClusterName(), instance, resourcesToMonitor, 100.0),
+ new
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
+ getHelixClusterName(), instance, resourcesToMonitor,
100.0))));
}
}
@@ -741,11 +741,11 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
// TODO: Add test to delete broker instance. Currently, stopBroker() does
not work correctly.
// Check if '/INSTANCES/<serverName>' has been erased correctly
- String instancePath = "/" + _clusterName + "/INSTANCES/" + serverName;
+ String instancePath = "/" + getHelixClusterName() + "/INSTANCES/" +
serverName;
assertFalse(_propertyStore.exists(instancePath, 0));
// Check if '/CONFIGS/PARTICIPANT/<serverName>' has been erased correctly
- String configPath = "/" + _clusterName + "/CONFIGS/PARTICIPANT/" +
serverName;
+ String configPath = "/" + getHelixClusterName() + "/CONFIGS/PARTICIPANT/"
+ serverName;
assertFalse(_propertyStore.exists(configPath, 0));
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index 560bf5a..84ea948 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -97,14 +97,14 @@ public class SegmentCompletionIntegrationTest extends
BaseClusterIntegrationTest
// Create server instance with the fake server state model
_serverHelixManager = HelixManagerFactory
- .getZKHelixManager(_clusterName, _serverInstance,
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+ .getZKHelixManager(getHelixClusterName(), _serverInstance,
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
_serverHelixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
new FakeServerSegmentStateModelFactory());
_serverHelixManager.connect();
// Add Helix tag to the server
- _serverHelixManager.getClusterManagmentTool().addInstanceTag(_clusterName,
_serverInstance,
+
_serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
_serverInstance,
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
// Initialize controller leader locator
@@ -127,7 +127,7 @@ public class SegmentCompletionIntegrationTest extends
BaseClusterIntegrationTest
@Override
public Boolean apply(@Nullable Void aVoid) {
try {
- ExternalView externalView =
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+ ExternalView externalView =
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
Map<String, String> stateMap =
externalView.getStateMap(_currentSegment);
return stateMap.get(_serverInstance)
.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
@@ -152,7 +152,7 @@ public class SegmentCompletionIntegrationTest extends
BaseClusterIntegrationTest
@Override
public Boolean apply(@Nullable Void aVoid) {
try {
- ExternalView externalView =
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+ ExternalView externalView =
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
Map<String, String> stateMap =
externalView.getStateMap(_currentSegment);
return
stateMap.get(_serverInstance).equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.OFFLINE_STATE);
} catch (Exception e) {
@@ -175,7 +175,7 @@ public class SegmentCompletionIntegrationTest extends
BaseClusterIntegrationTest
public Boolean apply(@Nullable Void aVoid) {
try {
if (!_currentSegment.equals(oldSegment)) {
- ExternalView externalView =
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+ ExternalView externalView =
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
Map<String, String> stateMap =
externalView.getStateMap(_currentSegment);
return
stateMap.get(_serverInstance).equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 6fd2ec6..e69f275 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -300,10 +300,8 @@ public class HelixServerStarter {
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags == null || instanceTags.size() == 0) {
if
(ZKMetadataProvider.getClusterTenantIsolationEnabled(_helixManager.getHelixPropertyStore()))
{
- _helixAdmin.addInstanceTag(clusterName, instanceName,
-
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
- _helixAdmin.addInstanceTag(clusterName, instanceName,
-
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+ _helixAdmin.addInstanceTag(clusterName, instanceName,
TagNameUtils.getOfflineTagForTenant(null));
+ _helixAdmin.addInstanceTag(clusterName, instanceName,
TagNameUtils.getRealtimeTagForTenant(null));
} else {
_helixAdmin.addInstanceTag(clusterName, instanceName,
UNTAGGED_SERVER_INSTANCE);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]