This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a3bed7c06d Fix the server admin endpoint cache to reflect the config
changes (#9734)
a3bed7c06d is described below
commit a3bed7c06d06786377ec7ad6d84e9f769cb8f3dd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Nov 4 20:01:02 2022 -0700
Fix the server admin endpoint cache to reflect the config changes (#9734)
---
.../helix/core/PinotHelixResourceManager.java | 110 ++++++++++++---------
.../PinotHelixResourceManagerStatelessTest.java | 50 ++++++++--
2 files changed, 103 insertions(+), 57 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 86f7b896f9..3e7ed24d5d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -34,7 +34,6 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,7 +45,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@@ -68,9 +66,13 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.api.listeners.BatchMode;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
@@ -260,6 +262,30 @@ public class PinotHelixResourceManager {
_deletedSegmentsRetentionInDays);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore,
_isSingleTenantCluster);
+ // Add listener on instance config changes to invalidate
_instanceAdminEndpointCache
+ try {
+ helixZkManager.addInstanceConfigChangeListener(new
InstanceConfigChangeListener() {
+ @BatchMode(enabled = false)
+ @PreFetch(enabled = false)
+ @Override
+ public void onInstanceConfigChange(List<InstanceConfig>
instanceConfigs, NotificationContext context) {
+ NotificationContext.Type type = context.getType();
+ if (type == NotificationContext.Type.INIT || type ==
NotificationContext.Type.FINALIZE
+ || context.getIsChildChange()) {
+ // Invalid all entries when the change is not within the instance
config (e.g. set up the listener, add or
+ // delete an instance config)
+ _instanceAdminEndpointCache.invalidateAll();
+ } else {
+ String pathChanged = context.getPathChanged();
+ String instanceName =
pathChanged.substring(pathChanged.lastIndexOf('/') + 1);
+ _instanceAdminEndpointCache.invalidate(instanceName);
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while adding
InstanceConfigChangeListener");
+ }
+
// Initialize TableCache
HelixConfigScope helixConfigScope =
new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(_helixClusterName).build();
@@ -1336,7 +1362,7 @@ public class PinotHelixResourceManager {
} else {
// TODO: Add the reason of the incompatibility
throw new SchemaBackwardIncompatibleException(
- String.format("New schema: %s is not backward-compatible with
the existing schema", schemaName));
+ String.format("New schema: %s is not backward-compatible with the
existing schema", schemaName));
}
}
ZKMetadataProvider.setSchema(_propertyStore, schema);
@@ -1706,18 +1732,18 @@ public class PinotHelixResourceManager {
InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType :
instancePartitionsTypesToAssign) {
- boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
- instancePartitionsType);
+ boolean hasPreConfiguredInstancePartitions =
+ TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}",
instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
} else {
- String referenceInstancePartitionsName =
-
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
- instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
- referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
+ String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
+ instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName,
+
instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
@@ -1919,7 +1945,6 @@ public class PinotHelixResourceManager {
// Remove all stored segments for the table
-
// Remove all stored segments for the table
Long retentionPeriodMs = retentionPeriod != null ?
TimeUtils.convertPeriodToMillis(retentionPeriod) : null;
_segmentDeletionManager.removeSegmentsFromStore(realtimeTableName,
getSegmentsFor(realtimeTableName, false),
@@ -2037,8 +2062,8 @@ public class PinotHelixResourceManager {
ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null,
-1);
Map<String, Map<String, String>> controllerJobs =
tableJobsRecord.getMapFields();
return controllerJobs.entrySet().stream().filter(
- job ->
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
-
.equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ job ->
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (ZkNoNodeException e) {
LOGGER.warn("Could not find controller job node for table : {}",
tableNameWithType, e);
}
@@ -2060,10 +2085,8 @@ public class PinotHelixResourceManager {
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT.toString());
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
- Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
- Integer.toString(numMessagesSent));
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numMessagesSent));
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentName);
return addReloadJobToZK(jobId, jobMetadata);
}
@@ -2079,12 +2102,9 @@ public class PinotHelixResourceManager {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
- ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
- Long.toString(System.currentTimeMillis()));
- jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
- Integer.toString(numberOfMessagesSent));
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numberOfMessagesSent));
return addReloadJobToZK(jobId, jobMetadata);
}
@@ -2096,14 +2116,10 @@ public class PinotHelixResourceManager {
Map<String, Map<String, String>> tasks =
tableJobsZnRecord.getMapFields();
tasks.put(jobId, jobMetadata);
if (tasks.size() >
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
- tasks = tasks.entrySet().stream().sorted(new
Comparator<Map.Entry<String, Map<String, String>>>() {
- @Override
- public int compare(Map.Entry<String, Map<String, String>> v1,
Map.Entry<String, Map<String, String>> v2) {
- return Long.compare(
-
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
-
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)));
- }
- }).collect(Collectors.toList()).subList(0,
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+ tasks = tasks.entrySet().stream().sorted((v1, v2) -> Long.compare(
+
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+ .collect(Collectors.toList()).subList(0,
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
.stream().collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
tableJobsZnRecord.setMapFields(tasks);
@@ -2281,8 +2297,7 @@ public class PinotHelixResourceManager {
}
}
- public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata,
- DateTimeFieldSpec timeColumnFieldSpec) {
+ public void updateZkTimeInterval(SegmentZKMetadata segmentZKMetadata,
DateTimeFieldSpec timeColumnFieldSpec) {
ZKMetadataUtils.updateSegmentZKTimeInterval(segmentZKMetadata,
timeColumnFieldSpec);
}
@@ -2432,8 +2447,8 @@ public class PinotHelixResourceManager {
private static Set<String> parseInstanceSet(IdealState idealState, String
segmentName,
@Nullable String targetInstance) {
Set<String> instanceSet = idealState.getInstanceSet(segmentName);
- Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet),
- "Could not find segment: %s in ideal state", segmentName);
+ Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), "Could
not find segment: %s in ideal state",
+ segmentName);
if (targetInstance != null) {
return instanceSet.contains(targetInstance) ?
Collections.singleton(targetInstance) : Collections.emptySet();
} else {
@@ -2445,11 +2460,9 @@ public class PinotHelixResourceManager {
* This util is similar to {@link HelixAdmin#resetPartition(String, String,
String, List)}.
* However instead of resetting only the ERROR state to its initial state.
we reset all state regardless.
*/
- private void resetPartitionAllState(String instanceName, String resourceName,
- Set<String> resetPartitionNames) {
+ private void resetPartitionAllState(String instanceName, String
resourceName, Set<String> resetPartitionNames) {
LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
- resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName,
- instanceName, _helixClusterName);
+ resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName, instanceName, _helixClusterName);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -2470,20 +2483,19 @@ public class PinotHelixResourceManager {
// get current state.
String sessionId = liveInstance.getEphemeralOwner();
- CurrentState curState =
- accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
resourceName));
+ CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
resourceName));
// check there is no pending messages for the partitions exist
List<Message> messages =
accessor.getChildValues(keyBuilder.messages(instanceName), true);
for (Message message : messages) {
- if
(!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
|| !sessionId
- .equals(message.getTgtSessionId()) ||
!resourceName.equals(message.getResourceName())
+ if
(!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
|| !sessionId.equals(
+ message.getTgtSessionId()) ||
!resourceName.equals(message.getResourceName())
|| !resetPartitionNames.contains(message.getPartitionName())) {
continue;
}
- throw new RuntimeException(String.format("Can't reset state for %s.%s on
%s, "
- + "because a pending message %s exists for resource %s",
resourceName, resetPartitionNames, instanceName,
- message.toString(), message.getResourceName()));
+ throw new RuntimeException(
+ String.format("Can't reset state for %s.%s on %s, because a pending
message %s exists for resource %s",
+ resourceName, resetPartitionNames, instanceName, message,
message.getResourceName()));
}
String adminName = null;
@@ -2495,8 +2507,8 @@ public class PinotHelixResourceManager {
adminName = "UNKNOWN";
}
- List<Message> resetMessages = new ArrayList<Message>();
- List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+ List<Message> resetMessages = new ArrayList<>();
+ List<PropertyKey> messageKeys = new ArrayList<>();
for (String partitionName : resetPartitionNames) {
// send currentState to initialState message
String msgId = UUID.randomUUID().toString();
@@ -3113,9 +3125,9 @@ public class PinotHelixResourceManager {
String instanceAdminEndpoint;
try {
instanceAdminEndpoint = _instanceAdminEndpointCache.get(instance);
- } catch (ExecutionException e) {
+ } catch (Exception e) {
String errorMessage =
- String.format("ExecutionException when getting instance admin
endpoint for instance: %s. Error message: %s",
+ String.format("Caught exception while getting instance admin
endpoint for instance: %s. Error message: %s",
instance, e.getMessage());
LOGGER.error(errorMessage, e);
throw new InvalidConfigException(errorMessage);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 6d81f4d064..5e9def8a56 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -39,6 +39,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
@@ -143,24 +144,57 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
}
@Test
- public void testGetInstanceEndpoints()
+ public void testGetDataInstanceAdminEndpoints()
throws Exception {
Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
- BiMap<String, String> endpoints =
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
- // Check that we have endpoints for all instances.
- assertEquals(endpoints.size(), NUM_SERVER_INSTANCES);
-
- // Check actual endpoint names
- for (Map.Entry<String, String> entry : endpoints.entrySet()) {
+ BiMap<String, String> adminEndpoints =
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
+ assertEquals(adminEndpoints.size(), NUM_SERVER_INSTANCES);
+ for (Map.Entry<String, String> entry : adminEndpoints.entrySet()) {
String key = entry.getKey();
int port = Server.DEFAULT_ADMIN_API_PORT +
Integer.parseInt(key.substring("Server_localhost_".length()));
assertEquals(entry.getValue(), "http://localhost:" + port);
}
+
+ // Add a new server
+ String serverName = "Server_localhost_" + NUM_SERVER_INSTANCES;
+ Instance instance = new Instance("localhost", NUM_SERVER_INSTANCES,
InstanceType.SERVER,
+ Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0,
12345, 0, 0, false);
+ _helixResourceManager.addInstance(instance, false);
+ adminEndpoints =
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+ assertEquals(adminEndpoints.size(), 1);
+ assertEquals(adminEndpoints.get(serverName), "http://localhost:12345");
+
+ // Modify the admin port for the new added server
+ instance = new Instance("localhost", NUM_SERVER_INSTANCES,
InstanceType.SERVER,
+ Collections.singletonList(Helix.UNTAGGED_SERVER_INSTANCE), null, 0,
23456, 0, 0, false);
+ _helixResourceManager.updateInstance(serverName, instance, false);
+ // Admin endpoint is updated through the instance config change callback,
which happens asynchronously
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ BiMap<String, String> endpoints =
+
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+ assertEquals(endpoints.size(), 1);
+ return endpoints.get(serverName).equals("http://localhost:23456");
+ } catch (InvalidConfigException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to update the admin port");
+
+ // Remove the new added server
+ _helixResourceManager.dropInstance(serverName);
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+
_helixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(serverName));
+ return false;
+ } catch (InvalidConfigException e) {
+ return true;
+ }
+ }, 60_000L, "Failed to remove the admin endpoint");
}
@Test
- public void testGetInstanceConfigs() {
+ public void testAddRemoveInstance() {
Set<String> serverInstances =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
for (String instanceName : serverInstances) {
InstanceConfig instanceConfig =
_helixResourceManager.getHelixInstanceConfig(instanceName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]