This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c25e5ea Use ZkCacheBaseDataAccessor to cache instance configs in
PinotHelixResourceManager (#3633)
c25e5ea is described below
commit c25e5eaaab2478cf2a07d5bb438ee14e784c4113
Author: Jialiang Li <[email protected]>
AuthorDate: Mon Jan 7 10:04:35 2019 -0800
Use ZkCacheBaseDataAccessor to cache instance configs in
PinotHelixResourceManager (#3633)
* Use ZkCacheBaseDataAccessor to cache instance configs in
PinotHelixResourceManager;
* Unit tests added.
---
.../helix/core/PinotHelixResourceManager.java | 35 ++++----
.../helix/core/PinotHelixResourceManagerTest.java | 96 ++++++++++++++++++++++
2 files changed, 117 insertions(+), 14 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 423db8c..d359c6f 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -84,6 +84,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
@@ -93,7 +94,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -128,6 +132,7 @@ public class PinotHelixResourceManager {
private HelixAdmin _helixAdmin;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private HelixDataAccessor _helixDataAccessor;
+ private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
private Builder _keyBuilder;
private SegmentDeletionManager _segmentDeletionManager;
private TableRebalancer _tableRebalancer;
@@ -168,6 +173,13 @@ public class PinotHelixResourceManager {
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
+ // Cache instance zk paths.
+ BaseDataAccessor<ZNRecord> baseDataAccessor =
_helixDataAccessor.getBaseDataAccessor();
+
+ String instanceConfigs =
PropertyPathBuilder.instanceConfig(_helixClusterName);
+ _cacheInstanceConfigsDataAccessor =
+ new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>)
baseDataAccessor, instanceConfigs,
+ null, Collections.singletonList(instanceConfigs));
_keyBuilder = _helixDataAccessor.keyBuilder();
_segmentDeletionManager = new SegmentDeletionManager(_dataDir,
_helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore,
_isSingleTenantCluster);
@@ -249,23 +261,17 @@ public class PinotHelixResourceManager {
*/
@Nonnull
public List<String> getAllInstances() {
- return _helixAdmin.getInstancesInCluster(_helixClusterName);
- }
-
- /**
- * Get key builder
- * @return
- */
- @Nonnull
- public Builder getKeyBuilder() {
- return _keyBuilder;
+ return _cacheInstanceConfigsDataAccessor.getChildNames("/",
AccessOption.PERSISTENT);
}
/**
* Returns the config for all the Helix instances in the cluster.
*/
public List<InstanceConfig> getAllHelixInstanceConfigs() {
- return HelixHelper.getInstanceConfigs(_helixZkManager);
+ List<ZNRecord> znRecords =
_cacheInstanceConfigsDataAccessor.getChildren("/", null,
AccessOption.PERSISTENT);
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(znRecords.size());
+ znRecords.forEach(znRecord -> instanceConfigs.add(new
InstanceConfig(znRecord)));
+ return instanceConfigs;
}
/**
@@ -276,7 +282,8 @@ public class PinotHelixResourceManager {
*/
@Nonnull
public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) {
- return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId);
+ ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" +
instanceId, null, AccessOption.PERSISTENT);
+ return new InstanceConfig(znRecord);
}
/**
@@ -1519,7 +1526,7 @@ public class PinotHelixResourceManager {
ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
LOGGER.info("Sending timeboundary refresh message for segment {} of table
{}:{} to recipients {}", segmentName,
- rawTableName, refreshMessage, recipientCriteria);
+ rawTableName, refreshMessage, recipientCriteria);
// Helix sets the timeoutMs argument specified in 'send' call as the
processing timeout of the message.
int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage,
null, timeoutMs);
if (nMsgsSent > 0) {
@@ -1529,7 +1536,7 @@ public class PinotHelixResourceManager {
// May be the case when none of the brokers are up yet. That is OK,
because when they come up they will get
// the latest time boundary info.
LOGGER.warn("Unable to send timeboundary refresh message for {} of table
{}, nMsgs={}", segmentName,
- offlineTableName, nMsgsSent);
+ offlineTableName, nMsgsSent);
}
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index f93bf73..fe0c808 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -30,9 +30,16 @@ import com.linkedin.pinot.common.utils.ZkStarter;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.ControllerRequestBuilderUtil;
import com.linkedin.pinot.controller.helix.ControllerTest;
+import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,6 +54,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
private static final String TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+ private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
+ private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
private final String _helixClusterName = getHelixClusterName();
@@ -79,6 +88,87 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
@Test
+ public void testGetInstanceConfigs() throws Exception {
+ Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
+ for (String server : servers) {
+ InstanceConfig cachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(server);
+ InstanceConfig realInstanceConfig =
_helixAdmin.getInstanceConfig(_helixClusterName, server);
+ Assert.assertEquals(cachedInstanceConfig, realInstanceConfig);
+ }
+
+ ZkClient zkClient = new ZkClient(_helixResourceManager.getHelixZkURL(),
CONNECTION_TIMEOUT_IN_MILLISECOND,
+ CONNECTION_TIMEOUT_IN_MILLISECOND, new ZNRecordSerializer());
+
+ modifyExistingInstanceConfig(zkClient);
+ addAndRemoveNewInstanceConfig(zkClient);
+
+ zkClient.close();
+ }
+
+ private void modifyExistingInstanceConfig(ZkClient zkClient) throws
InterruptedException {
+ String instanceName = "Server_localhost_" + new
Random().nextInt(NUM_INSTANCES);
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ Assert.assertTrue(zkClient.exists(instanceConfigPath));
+ ZNRecord znRecord = zkClient.readData(instanceConfigPath, null);
+
+ InstanceConfig cachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(instanceName);
+ String originalPort = cachedInstanceConfig.getPort();
+ Assert.assertNotNull(originalPort);
+ String newPort = Long.toString(System.currentTimeMillis());
+ Assert.assertTrue(!newPort.equals(originalPort));
+
+ // Set new port to this instance config.
+
znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(),
newPort);
+ zkClient.writeData(instanceConfigPath, znRecord);
+
+ long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ InstanceConfig latestCachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(instanceName);
+ String latestPort = latestCachedInstanceConfig.getPort();
+ while (!newPort.equals(latestPort) && System.currentTimeMillis() <
maxTime) {
+ Thread.sleep(100L);
+ latestCachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(instanceName);
+ latestPort = latestCachedInstanceConfig.getPort();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when
waiting for adding instance config");
+
+ // Set original port back to this instance config.
+
znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(),
originalPort);
+ zkClient.writeData(instanceConfigPath, znRecord);
+ }
+
+ private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws
Exception {
+ int biggerRandomNumber = NUM_INSTANCES + new
Random().nextInt(NUM_INSTANCES);
+ String instanceName = "Server_localhost_" +
String.valueOf(biggerRandomNumber);
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ Assert.assertFalse(zkClient.exists(instanceConfigPath));
+ List<String> instances = _helixResourceManager.getAllInstances();
+ Assert.assertFalse(instances.contains(instanceName));
+
+ // Add new ZNode.
+ ZNRecord znRecord = new ZNRecord(instanceName);
+ zkClient.createPersistent(instanceConfigPath, znRecord);
+
+ List<String> latestAllInstances = _helixResourceManager.getAllInstances();
+ long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ while (!latestAllInstances.contains(instanceName) &&
System.currentTimeMillis() < maxTime) {
+ Thread.sleep(100L);
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when
waiting for adding instance config");
+
+ // Remove new ZNode.
+ zkClient.delete(instanceConfigPath);
+
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ while (latestAllInstances.contains(instanceName) &&
System.currentTimeMillis() < maxTime) {
+ Thread.sleep(100L);
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when
waiting for removing instance config");
+ }
+
+ @Test
public void testRebuildBrokerResourceFromHelixTags() throws Exception {
// Create broker tenant on 3 Brokers
Tenant brokerTenant =
@@ -165,6 +255,12 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
}
+ @Test
+ public void testGetDataInstanceAdminEndpoints() {
+ Set<String> fakeInstances = new HashSet<>();
+ new Random().nextInt(NUM_INSTANCES);
+ }
+
@AfterClass
public void tearDown() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]