stefan-egli commented on code in PR #7:
URL:
https://github.com/apache/sling-org-apache-sling-discovery-oak/pull/7#discussion_r925615926
##########
src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java:
##########
@@ -181,74 +198,108 @@ private LocalClusterView
asClusterView(DiscoveryLiteDescriptor descriptor, Resou
if (activeIds==null || activeIds.length==0) {
throw new
UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "Descriptor contained
no active ids: "+descriptor.getDescriptorStr());
}
- // convert int[] to List<Integer>
- //TODO: could use Guava's Ints class here..
- List<Integer> activeIdsList = new LinkedList<Integer>();
- for (Integer integer : activeIds) {
- activeIdsList.add(integer);
- }
+
+ final List<Integer> activeIdsList = Arrays.stream( activeIds
).boxed().collect( Collectors.toList() );
// step 1: sort activeIds by their leaderElectionId
// serves two purposes: pos[0] is then leader
// and the rest are properly sorted within the cluster
- final Map<Integer, String> leaderElectionIds = new HashMap<Integer,
String>();
- PartialStartupDetector partialStartupDetector = new
PartialStartupDetector(resourceResolver, config,
- lowestSeqNum, me, getSlingId(), seqNum,
partialStartupSuppressingTimeout, logSilencer);
+
+ final ClusterReader reader = new ClusterReader(resourceResolver,
config, idMapService, seenLocalInstances);
+ final Map<Integer,InstanceInfo> regularInstances = new HashMap<>();
+ final Set<Integer> partiallyStartedClusterNodeIds = new HashSet<>();
+ boolean suppressionEnabled = isSyncTokenEnabled() &&
isPartialSuppressionEnabled();
+
+ final InstanceReadResult myInstanceResult = reader.readInstance(me,
false);
+ final InstanceInfo myInstance = myInstanceResult.getInstanceInfo();
+ if (myInstance == null) {
+ throw new
UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
myInstanceResult.getErrorMsg());
+ }
+
+ if (partialStartupSuppressingTimeout > 0
+ && partialStartupSuppressingTimeout <
System.currentTimeMillis()) {
+ // if partial suppression timeout is set and it has passed, then
don't suppress
+ suppressionEnabled = false;
+ }
+
+ if (suppressionEnabled &&
myInstance.isSyncTokenNewerOrEqual(lowestSeqNum)) {
+ // that means that the local instance did store a synctoken ever
+ // so it did successfully once go through the syncTokenService
+ //
+ // as a result we can now start suppressing
+ } else {
+ // otherwise even the local instance hasn't done a full join ever,
+ // so we shouldn't do any suppression just yet
+ suppressionEnabled = false;
+ }
+
+ // categorize the activeIds into
+ // - partiallyStarted : added to partiallyStartedClusterNodeIds
+ // - fully started : added to fullyStartedInstances
for (Integer id : activeIdsList) {
- String slingId = idMapService.toSlingId(id, resourceResolver);
- if (slingId == null) {
- idMapService.clearCache();
- if (partialStartupDetector.suppressMissingIdMap(id)) {
- continue;
- } else {
- throw new
UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
- "no slingId mapped for clusterNodeId="+id);
- }
- }
- if (partialStartupDetector.suppressMissingSyncToken(id, slingId)) {
+ if (id == me) {
+ regularInstances.put(me, myInstance);
continue;
}
- String leaderElectionId = getLeaderElectionId(resourceResolver,
- slingId);
- // SLING-6924 : leaderElectionId can be null here
- // this means that another instance is just starting up, has
already
- // created its oak lease, thus is already visible from an oak
discover-lite
- // point of view - but upper level code here in discovery.oak has
not yet
- // set the leaderElectionId. This is rare but valid case
- if (leaderElectionId == null) {
- if
(partialStartupDetector.suppressMissingLeaderElectionId(id)) {
- continue;
+ InstanceReadResult readResult = reader.readInstance(id,
suppressionEnabled);
+ InstanceInfo instanceInfo = readResult.getInstanceInfo();
+ if (instanceInfo == null && !suppressionEnabled) {
+ // retry with a fresh idmap
+ idMapService.clearCache();
+ readResult = reader.readInstance(id, suppressionEnabled);
+ instanceInfo = readResult.getInstanceInfo();
+ }
+ if (instanceInfo == null) {
+ if (suppressionEnabled) {
+ // then suppress this instance by not adding it to the
resultingInstances map
+ partiallyStartedClusterNodeIds.add(id);
} else {
- // then at this stage the clusterView is not yet
established
- // in a few moments it will but at this point not.
- // so falling back to treating this as NO_ESTABLISHED_VIEW
- // and with the heartbeat interval this situation will
- // resolve itself upon one of the next pings
- throw new
UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
- "no leaderElectionId available yet for
slingId="+slingId);
+ throw new
UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
readResult.getErrorMsg());
}
+ } else {
+ regularInstances.put(id, instanceInfo);
}
- leaderElectionIds.put(id, leaderElectionId);
}
- Collection<Integer> partiallyStartedClusterNodeIds =
partialStartupDetector.getPartiallyStartedClusterNodeIds();
- activeIdsList.removeAll(partiallyStartedClusterNodeIds);
- leaderElectionSort(activeIdsList, leaderElectionIds);
+ if (!partiallyStartedClusterNodeIds.isEmpty()) {
+ logSilencer.infoOrDebug("asClusterView : partial instances : " +
partiallyStartedClusterNodeIds);
+ activeIdsList.removeAll(partiallyStartedClusterNodeIds);
+ }
+
+ final List<Integer> sortedIds = leaderElectionSort(regularInstances);
+
+ if (sortedIds.size() != activeIdsList.size()) {
+ logger.error("asClusterView : list size mismatch : sorted = " +
sortedIds.size()
+ + ", active = " + activeIdsList.size() + " (partial = " +
partiallyStartedClusterNodeIds.size() + ")");
+ }
- for(int i=0; i<activeIdsList.size(); i++) {
- int id = activeIdsList.get(i);
+ boolean seenAllSyncTokens = true;
+ for(int i=0; i<sortedIds.size(); i++) {
+ int id = sortedIds.get(i);
boolean isLeader = i==0; // thx to sorting above [0] is leader
indeed
boolean isOwn = id==me;
- String slingId = idMapService.toSlingId(id, resourceResolver);
- if (slingId==null) {
+ InstanceInfo in = regularInstances.get(id);
+ String slingId = in == null ? null : in.getSlingId();
+ if (slingId == null) {
idMapService.clearCache();
logger.info("asClusterView: cannot resolve oak-clusterNodeId
{} to a slingId", id);
throw new Exception("Cannot resolve oak-clusterNodeId "+id+"
to a slingId");
}
+ if (!in.isSyncTokenNewerOrEqual(seqNum)) {
+ System.out.println("Not seen syncToken (" + seqNum + ") of
this instance yet : " + in);
Review Comment:
oups, fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]