Repository: apex-core Updated Branches: refs/heads/master 1f78515fc -> 2ba608444
APEXCORE-602: group events by cause Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ba60844 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ba60844 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ba60844 Branch: refs/heads/master Commit: 2ba6084440bdfc23d246cbb397ef46662d3a1688 Parents: 1f78515 Author: priya <[email protected]> Authored: Thu Feb 23 17:25:56 2017 +0530 Committer: priya <[email protected]> Committed: Wed Jun 28 22:17:43 2017 +0530 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 13 +- .../stram/StreamingContainerManager.java | 29 ++- .../stram/StreamingContainerParent.java | 20 +- .../com/datatorrent/stram/api/StramEvent.java | 83 ++++--- .../org/apache/apex/stram/GroupingManager.java | 232 +++++++++++++++++++ .../org/apache/apex/stram/GroupingRequest.java | 184 +++++++++++++++ .../apach/apex/stram/GroupingManagerTest.java | 148 ++++++++++++ .../apach/apex/stram/GroupingRequestTest.java | 81 +++++++ 8 files changed, 746 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index ed9248a..09478eb 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -49,6 +49,8 @@ import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher; import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator; import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator; import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator; +import org.apache.apex.stram.GroupingManager; +import org.apache.apex.stram.GroupingRequest.EventGroupId; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; @@ -170,6 +172,7 @@ public class StreamingAppMasterService extends CompositeService private StramDelegationTokenManager delegationTokenManager = null; private AppDataPushAgent appDataPushAgent; private ApexPluginDispatcher apexPluginDispatcher; + private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance(); public StreamingAppMasterService(ApplicationAttemptId appAttemptID) { @@ -972,7 +975,8 @@ public class StreamingAppMasterService extends CompositeService launchContainer.run(); // communication with NMs is now async // record container start event - StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString()); + StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(), + allocatedContainer.getNodeId().toString(), groupingManager.getEventGroupIdForAffectedContainer(allocatedContainer.getId().toString())); ev.setTimestamp(timestamp); dnmgr.recordEventAsync(ev); } @@ -997,6 +1001,7 @@ public class StreamingAppMasterService extends CompositeService UserGroupInformation ugi = UserGroupInformation.getLoginUser(); delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName()); } + EventGroupId groupId = null; int exitStatus = containerStatus.getExitStatus(); if (0 != exitStatus) { if (allocatedContainer != null) { @@ -1039,7 +1044,9 @@ public class StreamingAppMasterService extends CompositeService // Recoverable failure or process killed (externally or via stop request by AM) // also occurs when a container was released by the application but never assigned/launched LOG.debug("Container {} failed or killed.", containerStatus.getContainerId()); - dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString()); + String containerIdStr = containerStatus.getContainerId().toString(); + dnmgr.scheduleContainerRestart(containerIdStr); + groupId = groupingManager.getEventGroupIdForAffectedContainer(containerIdStr); // } } else { // container completed successfully @@ -1057,7 +1064,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.removeContainerAgent(containerIdStr); // record container stop event - StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus()); + StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus(), groupId); ev.setReason(containerStatus.getDiagnostics()); dnmgr.recordEventAsync(ev); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 510a146..8d2406f 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -69,6 +69,8 @@ import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.plugin.ApexPluginDispatcher; import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; import org.apache.apex.engine.util.CascadeStorageAgent; +import org.apache.apex.stram.GroupingManager; +import org.apache.apex.stram.GroupingRequest.EventGroupId; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -260,6 +262,7 @@ public class StreamingContainerManager implements PlanContext //logical operator name to latest counters. exists for backward compatibility. private final Map<String, Object> latestLogicalCounters = Maps.newHashMap(); public transient ApexPluginDispatcher apexPluginDispatcher = new NoOpApexPluginDispatcher(); + private final GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance(); private final LinkedHashMap<String, ContainerInfo> completedContainers = new LinkedHashMap<String, ContainerInfo>() { @@ -799,7 +802,7 @@ public class StreamingContainerManager implements PlanContext if (sca.lastHeartbeatMillis != -1) { String msg = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); LOG.warn(msg); - StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), msg, null); + StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), msg, null, null); stramEvent.setReason(msg); recordEventAsync(stramEvent); sca.lastHeartbeatMillis = -1; @@ -1163,6 +1166,9 @@ public class StreamingContainerManager implements PlanContext } includeLocalUpstreamOperators(ctx); + groupingManager.addOrModifyGroupingRequest(containerId, ctx.visited); + groupingManager.removeProcessedGroupingRequests(); + // redeploy cycle for all affected operators LOG.info("Affected operators {}", ctx.visited); deploy(Collections.<PTContainer>emptySet(), ctx.visited, Sets.newHashSet(cs.container), ctx.visited); @@ -1204,7 +1210,7 @@ public class StreamingContainerManager implements PlanContext if (containerAgent != null) { // record operator stop for this container for (PTOperator oper : containerAgent.container.getOperators()) { - StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId); + StramEvent ev = new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), containerId, groupingManager.getEventGroupIdForContainer(containerId)); recordEventAsync(ev); } containerAgent.container.setFinishedTime(System.currentTimeMillis()); @@ -1279,6 +1285,8 @@ public class StreamingContainerManager implements PlanContext return null; } + groupingManager.addNewContainerToGroupingRequest(container.getExternalId(), resource.containerId); + pendingAllocation.remove(container); container.setState(PTContainer.State.ALLOCATED); if (container.getExternalId() != null) { @@ -1381,13 +1389,16 @@ public class StreamingContainerManager implements PlanContext sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); // record operator stop event - recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); + recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), null)); break; case FAILED: processOperatorFailure(oper); sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); - recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); + + EventGroupId groupId = groupingManager.getEventGroupIdForContainer(oper.getContainer().getExternalId()); + recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), + oper.getContainer().getExternalId(), groupId)); break; case ACTIVE: default: @@ -1397,8 +1408,9 @@ public class StreamingContainerManager implements PlanContext break; case PENDING_UNDEPLOY: if (ds == null) { + EventGroupId groupId = groupingManager.moveOperatorFromUndeployListToDeployList(oper); // operator no longer deployed in container - recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); + recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), groupId)); oper.setState(State.PENDING_DEPLOY); sca.deployOpers.add(oper); } else { @@ -1418,7 +1430,9 @@ public class StreamingContainerManager implements PlanContext oper.setState(PTOperator.State.ACTIVE); oper.stats.lastHeartbeat = null; // reset on redeploy oper.stats.lastWindowIdChangeTms = clock.getTime(); - recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(), container.getExternalId())); + EventGroupId groupId = groupingManager.getEventGroupIdForOperatorToDeploy(oper.getId()); + recordEventAsync(new StramEvent.StartOperatorEvent(oper.getName(), oper.getId(), container.getExternalId(), groupId)); + groupingManager.removeOperatorFromGroupingRequest(oper.getId()); } break; default: @@ -1427,7 +1441,7 @@ public class StreamingContainerManager implements PlanContext // operator was removed and needs to be undeployed from container sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); - recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId())); + recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId(), null)); } } } @@ -2421,6 +2435,7 @@ public class StreamingContainerManager implements PlanContext // operator will be deployed after it has been undeployed, if still referenced by the container if (oper.getState() != PTOperator.State.PENDING_UNDEPLOY) { oper.setState(PTOperator.State.PENDING_DEPLOY); + groupingManager.addOperatorToDeploy(oper.getContainer().getExternalId(), oper); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java index 76f89bd..8401931 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java @@ -20,12 +20,15 @@ package com.datatorrent.stram; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.log.LogFileInformation; - +import org.apache.apex.stram.GroupingManager; +import org.apache.apex.stram.GroupingRequest; +import org.apache.apex.stram.GroupingRequest.EventGroupId; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtocolSignature; @@ -176,20 +179,29 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit @Override public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException { + EventGroupId groupId = getGroupIdForNewGroupingRequest(containerId); if (operators == null || operators.length == 0) { - dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo)); + dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo, groupId)); } else { for (int operator : operators) { OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator); if (operatorInfo != null) { - dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg, - logFileInfo)); + dagManager.recordEventAsync( + new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg, logFileInfo, groupId)); } } } log(containerId, msg); } + //create new group the deploy request, request data will be populated when sub-dag restart happens + private EventGroupId getGroupIdForNewGroupingRequest(String containerId) + { + GroupingManager groupingManager = GroupingManager.getGroupingManagerInstance(); + GroupingRequest groupingRequest = groupingManager.addOrModifyGroupingRequest(containerId, Collections.EMPTY_SET); + return groupingRequest.getEventGroupId(); + } + @Override public StreamingContainerContext getInitContext(String containerId) throws IOException http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java index 8af90bc..6224856 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java +++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java @@ -21,6 +21,7 @@ package com.datatorrent.stram.api; import java.util.concurrent.atomic.AtomicLong; import org.apache.apex.log.LogFileInformation; +import org.apache.apex.stram.GroupingRequest.EventGroupId; import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest; @@ -38,6 +39,7 @@ public abstract class StramEvent private String reason; private LogLevel logLevel; private LogFileInformation logFileInformation; + private EventGroupId groupId; public abstract String getType(); @@ -48,9 +50,15 @@ public abstract class StramEvent protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation) { + this(logLevel, logFileInformation, null); + } + + protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation, EventGroupId groupId) + { id = nextId.getAndIncrement(); this.logLevel = logLevel; this.logFileInformation = logFileInformation; + this.groupId = groupId; } public long getId() @@ -98,6 +106,16 @@ public abstract class StramEvent this.logFileInformation = logFileInformation; } + public EventGroupId getGroupId() + { + return groupId; + } + + public void setGroupId(EventGroupId groupId) + { + this.groupId = groupId; + } + public static enum LogLevel { TRACE, @@ -114,12 +132,12 @@ public abstract class StramEvent public OperatorEvent(String operatorName, LogLevel logLevel) { - this(operatorName, logLevel, null); + this(operatorName, logLevel, null, null); } - public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation) + public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation, EventGroupId groupId) { - super(logLevel, logFileInformation); + super(logLevel, logFileInformation, groupId); this.operatorName = operatorName; } @@ -231,13 +249,18 @@ public abstract class StramEvent public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel) { - this(operatorName, operatorId, logLevel, null); + this(operatorName, operatorId, logLevel, null, null); + } + + public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel, EventGroupId groupId) + { + this(operatorName, operatorId, logLevel, null, groupId); } public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel, - LogFileInformation logFileInformation) + LogFileInformation logFileInformation, EventGroupId groupId) { - super(operatorName, logLevel, logFileInformation); + super(operatorName, logLevel, logFileInformation, groupId); this.operatorId = operatorId; } @@ -292,14 +315,14 @@ public abstract class StramEvent { private String containerId; - public StartOperatorEvent(String operatorName, int operatorId, String containerId) + public StartOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId groupId) { - this(operatorName, operatorId, containerId, LogLevel.INFO); + this(operatorName, operatorId, containerId, LogLevel.INFO, groupId); } - public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel) + public StartOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel, EventGroupId groupId) { - super(operatorName, operatorId, logLevel); + super(operatorName, operatorId, logLevel, groupId); this.containerId = containerId; } @@ -325,14 +348,14 @@ public abstract class StramEvent { private String containerId; - public StopOperatorEvent(String operatorName, int operatorId, String containerId) + public StopOperatorEvent(String operatorName, int operatorId, String containerId, EventGroupId groupId) { - this(operatorName, operatorId, containerId, LogLevel.WARN); + this(operatorName, operatorId, containerId, LogLevel.WARN, groupId); } - public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel) + public StopOperatorEvent(String operatorName, int operatorId, String containerId, LogLevel logLevel, EventGroupId groupId) { - super(operatorName, operatorId, logLevel); + super(operatorName, operatorId, logLevel, groupId); this.containerId = containerId; } @@ -404,14 +427,14 @@ public abstract class StramEvent String containerId; String containerNodeId; - public StartContainerEvent(String containerId, String containerNodeId) + public StartContainerEvent(String containerId, String containerNodeId, EventGroupId groupId) { - this(containerId, containerNodeId, LogLevel.INFO); + this(containerId, containerNodeId, LogLevel.INFO, groupId); } - public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel) + public StartContainerEvent(String containerId, String containerNodeId, LogLevel logLevel, EventGroupId groupId) { - super(logLevel); + super(logLevel, null, groupId); this.containerId = containerId; this.containerNodeId = containerNodeId; } @@ -449,14 +472,14 @@ public abstract class StramEvent String containerId; int exitStatus; - public StopContainerEvent(String containerId, int exitStatus) + public StopContainerEvent(String containerId, int exitStatus, EventGroupId groupId) { - this(containerId, exitStatus, LogLevel.WARN); + this(containerId, exitStatus, LogLevel.WARN, groupId); } - public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel) + public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel, EventGroupId groupId) { - super(logLevel); + super(logLevel, null, groupId); this.containerId = containerId; this.exitStatus = exitStatus; } @@ -528,15 +551,15 @@ public abstract class StramEvent private String errorMessage; public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage, - LogFileInformation logFileInformation) + LogFileInformation logFileInformation, EventGroupId groupId) { - this(operatorName, operatorId, containerId, errorMessage, logFileInformation, LogLevel.ERROR); + this(operatorName, operatorId, containerId, errorMessage, logFileInformation, groupId, LogLevel.ERROR); } public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage, - LogFileInformation logFileInformation, LogLevel logLevel) + LogFileInformation logFileInformation, EventGroupId groupId, LogLevel logLevel) { - super(operatorName, operatorId, logLevel, logFileInformation); + super(operatorName, operatorId, logLevel, logFileInformation, groupId); this.containerId = containerId; this.errorMessage = errorMessage; } @@ -574,15 +597,15 @@ public abstract class StramEvent private String containerId; private String errorMessage; - public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation) + public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation, EventGroupId groupId) { - this(containerId, errorMessage, logFileInformation, LogLevel.ERROR); + this(containerId, errorMessage, logFileInformation, groupId, LogLevel.ERROR); } - public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation, + public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation, EventGroupId groupId, LogLevel logLevel) { - super(logLevel, logFileInformation); + super(logLevel, logFileInformation, groupId); this.containerId = containerId; this.errorMessage = errorMessage; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/org/apache/apex/stram/GroupingManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java new file mode 100644 index 0000000..b160dd6 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/stram/GroupingManager.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.stram; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.stram.GroupingRequest.EventGroupId; + +import com.google.common.collect.Maps; + +import com.datatorrent.stram.plan.physical.PTOperator; + +/** + * This class manages tracking ids of deploy/undeploy for containers and + * operators. + * + */ +public class GroupingManager +{ + private static final GroupingManager groupingManager = new GroupingManager(); + private Map<String, GroupingRequest> groupingRequests = Maps.newHashMap(); + + public static GroupingManager getGroupingManagerInstance() + { + return groupingManager; + } + + /** + * Retruns all available grouping requests with StrAM + * @return groupingRequests + */ + public Map<String, GroupingRequest> getGroupingRequests() + { + return groupingRequests; + } + + /** + * Returns grouping request for container + * @param containerId + * @return groupingRequest + */ + public GroupingRequest getGroupingRequest(String containerId) + { + return groupingRequests.get(containerId); + } + + /** + * Returns deploy/undeploy group Id for container + * @param containerId + * @return groupId <br/> + * <b>Note:</b> groupId 0 indicates and independent event, with no + * group + */ + public EventGroupId getEventGroupIdForContainer(String containerId) + { + EventGroupId groupId = null; + if (groupingRequests.get(containerId) != null) { + groupId = groupingRequests.get(containerId).getEventGroupId(); + } + return groupId; + } + + /** + * Returns deploy/undeploy group Id for container This could be a new + * container allocated during redeploy process + * @param containerId + * @return groupId <br/> + * <b>Note:</b> groupId 0 indicates and indipendent event, with no + * group + */ + public EventGroupId getEventGroupIdForAffectedContainer(String containerId) + { + EventGroupId groupId = getEventGroupIdForContainer(containerId); + if (groupId != null) { + return groupId; + } + for (GroupingRequest request : getGroupingRequests().values()) { + if (request.getAffectedContainers().contains(containerId)) { + groupId = request.getEventGroupId(); + } + } + return groupId; + } + + /** + * Returns grouping groupId for operator which is to undergo deploy. Operators + * undergoing deploy for first time will have groupId as 0 + * @param operatorId + * @return groupId <br/> + * <b>Note:</b> groupId 0 indicates and indipendent event, with no + * group + */ + public EventGroupId getEventGroupIdForOperatorToDeploy(int operatorId) + { + for (GroupingRequest request : getGroupingRequests().values()) { + if (request.getOperatorsToDeploy().contains(operatorId)) { + return request.getEventGroupId(); + } + } + return null; + } + + /** + * Adds operator to deploy. The operator is added to request associated with containerId + * @param containerIs + * @param operator + */ + public void addOperatorToDeploy(String containerId, PTOperator oper) + { + GroupingRequest request = getGroupingRequest(containerId); + if (request != null) { + request.addOperatorToDeploy(oper.getId()); + } + } + + /** + * Removes operator from grouping request + */ + public boolean removeOperatorFromGroupingRequest(int operatorId) + { + for (GroupingRequest request : getGroupingRequests().values()) { + if (request.getOperatorsToDeploy().contains((operatorId))) { + return request.removeOperatorToDeploy(operatorId); + } + } + return false; + } + + /** + * Remove groupingRequest from StrAM if it has no more pending operators to deploy + * @param containerId + */ + public void removeProcessedGroupingRequests() + { + for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) { + if (request.getValue().getOperatorsToDeploy().size() == 0 + && request.getValue().getOperatorsToUndeploy().size() == 0) { + LOG.info("Removing for :" + request.getKey()); + groupingRequests.remove(request.getKey()); + } + } + + } + + /** + * Create groupingRequest to group deploy/undeploy of related container/operator + * events under one groupId to find related events. + * To start will all related operators are added to opertorsToUndeploy list, + * they will eventually move to operatorsToDeploy when operator undergo redeploy cycle. + * @param containerId + * @param affectedOperators + */ + public GroupingRequest addOrModifyGroupingRequest(String containerId, Set<PTOperator> affectedOperators) + { + GroupingRequest request = groupingRequests.get(containerId); + if (request == null) { + request = new GroupingRequest(); + groupingRequests.put(containerId, request); + } + for (PTOperator oper : affectedOperators) { + request.addOperatorToUndeploy(oper.getId()); + request.addAffectedContainer(oper.getContainer().getExternalId()); + } + return request; + } + + /** + * Add affectedContainerId to deploy request, if container is deployed as part + * of redeploy process of groupLeaderContainer + * @param groupLeaderContainerId + * @param affectedContainerId + */ + public void addNewContainerToGroupingRequest(String groupLeaderContainerId, String affectedContainerId) + { + if (groupLeaderContainerId != null && affectedContainerId != null) { + GroupingRequest request = getGroupingRequest(groupLeaderContainerId); + if (request != null) { + request.addAffectedContainer(affectedContainerId); + } + } + } + + /** + * When operator state changes from PENDING_UNDEPLOY to PENDING_DEPLOY move + * operator from operatorsToUndeploy to operatorsToDeploy + * @param operator + * @return groupId + */ + public EventGroupId moveOperatorFromUndeployListToDeployList(PTOperator oper) + { + EventGroupId groupId = null; + for (GroupingRequest request : groupingRequests.values()) { + if (request.getOperatorsToUndeploy().contains(oper.getId())) { + groupId = request.getEventGroupId(); + request.removeOperatorToUndeploy(oper.getId()); + request.addOperatorToDeploy(oper.getId()); + } + } + return groupId; + } + + /** + * Clear all grouping requests + */ + public void clearAllGroupingRequests() + { + groupingRequests.clear(); + } + + private static final Logger LOG = LoggerFactory.getLogger(GroupingManager.class); +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java new file mode 100644 index 0000000..d107a38 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.stram; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Sets; + +import com.datatorrent.stram.util.AbstractWritableAdapter; + +/** + * Grouping request keeps track of operators whose start/stop events should be grouped. + */ +public class GroupingRequest +{ + private EventGroupId eventGroupId; + private Set<Integer> operatorsToDeploy = Sets.newHashSet(); + private Set<Integer> operatorsToUndeploy = Sets.newHashSet(); + private Set<String> affectedContainers = Sets.newHashSet(); + + public GroupingRequest() + { + eventGroupId = EventGroupId.newEventGroupId(); + } + + public GroupingRequest(EventGroupId groupId) + { + this.eventGroupId = groupId; + } + + /** + * Gets EventGroupId + * @return eventGroupId + */ + public EventGroupId getEventGroupId() + { + return eventGroupId; + } + + /** + * Gets operators to deploy as part of deploy request + * @return operatorsToDeploy + */ + public Set<Integer> getOperatorsToDeploy() + { + return operatorsToDeploy; + } + + /** + * Gets operators to undeploy as part of deploy request + * @return operatorsToUndeploy + */ + public Set<Integer> getOperatorsToUndeploy() + { + return operatorsToUndeploy; + } + + /** + * Gets containers affected by deploy request + * @return affectedContainers + */ + public Set<String> getAffectedContainers() + { + return affectedContainers; + } + + /** + * Adds operator to deploy request's list of operators to deploy + * @param operatorId + */ + public void addOperatorToDeploy(int operatorId) + { + operatorsToDeploy.add(operatorId); + } + + /** + * Removes operator from deploy request's list of operators to deploy + * @param operatorId + * @return ifRemoved + */ + public boolean removeOperatorToDeploy(int operatorId) + { + return operatorsToDeploy.remove(operatorId); + } + + /** + * Adds operator to deploy request's list of operators to undeploy + * @param operatorId + */ + public void addOperatorToUndeploy(int operatorId) + { + operatorsToUndeploy.add(operatorId); + } + + /** + * Removes operator from deploy request's list of operators to undeploy + * @param operatorId + * @return ifRemoved + */ + public boolean removeOperatorToUndeploy(int operatorId) + { + return operatorsToUndeploy.remove(operatorId); + } + + /** + * Adds container to deploy request's list of affected containers. + * @param containerId + */ + public void addAffectedContainer(String containerId) + { + affectedContainers.add(containerId); + } + + /** + * EventGroupId is used to club relevant events. Events triggered by common + * cause are considered as relevant events. + * + */ + public static class EventGroupId extends AbstractWritableAdapter + { + private static final long serialVersionUID = 1L; + private static final AtomicInteger idSequence = new AtomicInteger(); + private int groupId; + + public static EventGroupId newEventGroupId() + { + EventGroupId id = new EventGroupId(); + id.groupId = idSequence.incrementAndGet(); + return id; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + groupId; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + EventGroupId other = (EventGroupId)obj; + if (groupId != other.groupId) { + return false; + } + return true; + } + + @Override + public String toString() + { + return "EventGroupId [groupId=" + groupId + "]"; + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java new file mode 100644 index 0000000..7e33e97 --- /dev/null +++ b/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apach.apex.stram; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.apex.stram.GroupingManager; +import org.apache.apex.stram.GroupingRequest; + +import com.google.common.collect.ImmutableSet; + +import com.datatorrent.stram.plan.physical.PTContainer; +import com.datatorrent.stram.plan.physical.PTOperator; + +import static org.mockito.Mockito.when; + +public class GroupingManagerTest +{ + + @Mock + private PTOperator oper1; + @Mock + private PTOperator oper2; + @Mock + private PTContainer testContainer; + private String affectedContainerId = "container_4"; + private GroupingManager underTest; + + @Before + public void setup() + { + underTest = GroupingManager.getGroupingManagerInstance(); + MockitoAnnotations.initMocks(this); + + when(oper1.getId()).thenReturn(1); + when(oper2.getId()).thenReturn(2); + when(oper1.getContainer()).thenReturn(testContainer); + when(oper2.getContainer()).thenReturn(testContainer); + when(testContainer.getExternalId()).thenReturn(affectedContainerId); + } + + @Test + public void testAddNewDeploy() + { + String failedContainerId = "container_1"; + underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1, oper2)); + Assert.assertEquals(1, underTest.getGroupingRequests().size()); + GroupingRequest request = underTest.getGroupingRequest(failedContainerId); + Assert.assertTrue(request.getAffectedContainers().contains(affectedContainerId)); + Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper1.getId())); + Assert.assertTrue(request.getOperatorsToUndeploy().contains(oper2.getId())); + } + + @Test + public void testAddOperatorToGroupingRequest() + { + String failedContainerId = "container_1"; + underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1)); + GroupingRequest request = underTest.getGroupingRequest(failedContainerId); + Assert.assertFalse(request.getOperatorsToDeploy().contains(oper2.getId())); + underTest.addOperatorToDeploy(failedContainerId, oper2); + Assert.assertTrue(request.getOperatorsToDeploy().contains(oper2.getId())); + } + + @Test + public void testGetDeployGroupIdForContainer() + { + String failedContainerId = "container_1"; + underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1)); + GroupingRequest request = underTest.getGroupingRequest(failedContainerId); + + Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForContainer(failedContainerId)); + } + + @Test + public void testGetDeployGroupIdForOperator() + { + String failedContainerId = "container_1"; + underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1)); + underTest.addOperatorToDeploy(failedContainerId, oper1); //consider operator moved from PENDING_UNDEPLOY to DENDING_DEPLOY state + GroupingRequest request = underTest.getGroupingRequest(failedContainerId); + + Assert.assertEquals(request.getEventGroupId(), underTest.getEventGroupIdForOperatorToDeploy(oper1.getId())); + } + + @Test + public void testMoveOperatorFromUndeployListToDeployList() + { + String failedContainerId = "container_1"; + underTest.addOrModifyGroupingRequest(failedContainerId, ImmutableSet.of(oper1)); + underTest.moveOperatorFromUndeployListToDeployList(oper1); + GroupingRequest request = underTest.getGroupingRequest(failedContainerId); + + Assert.assertFalse(request.getOperatorsToUndeploy().contains(oper1.getId())); + Assert.assertTrue(request.getOperatorsToDeploy().contains(oper1.getId())); + } + + @Test + public void testAddNewContainerToGroupingRequest() + { + String groupLeaderContainerId = "container_1"; + String newAffectedContainerId = "container_11"; + underTest.addOrModifyGroupingRequest(groupLeaderContainerId, ImmutableSet.of(oper1)); + underTest.addNewContainerToGroupingRequest(groupLeaderContainerId, newAffectedContainerId); + + GroupingRequest request = underTest.getGroupingRequest(groupLeaderContainerId); + Assert.assertTrue(request.getAffectedContainers().contains(newAffectedContainerId)); + } + + @Test + public void testRemoveProcessedGroupingRequest() + { + underTest.addOrModifyGroupingRequest(affectedContainerId, ImmutableSet.of(oper1)); + Assert.assertEquals(1, underTest.getGroupingRequests().size()); + underTest.moveOperatorFromUndeployListToDeployList(oper1); //move from updeploy to deploy list + underTest.removeOperatorFromGroupingRequest(oper1.getId()); + underTest.removeProcessedGroupingRequests(); + Assert.assertEquals(0, underTest.getGroupingRequests().size()); + + } + + @After + public void teardown() + { + underTest.clearAllGroupingRequests(); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ba60844/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java new file mode 100644 index 0000000..3417715 --- /dev/null +++ b/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apach.apex.stram; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.apex.stram.GroupingRequest; + +public class GroupingRequestTest +{ + private GroupingRequest underTest; + + @Before + public void setup() + { + underTest = new GroupingRequest(); + } + + @Test + public void testAddAffectedContainer() + { + String affectedContainerId = "container_000001"; + underTest.addAffectedContainer(affectedContainerId); + Assert.assertTrue(underTest.getAffectedContainers().contains(affectedContainerId)); + } + + @Test + public void testAddOperatorToUndeploy() + { + int operatorId = 1; + underTest.addOperatorToUndeploy(operatorId); + Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId)); + } + + @Test + public void testAddOperatorToDeploy() + { + int operatorId = 1; + underTest.addOperatorToDeploy(operatorId); + Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId)); + } + + @Test + public void testRemoveOperatorToUndeploy() + { + int operatorId = 1; + underTest.addOperatorToUndeploy(operatorId); + Assert.assertTrue(underTest.getOperatorsToUndeploy().contains(operatorId)); + underTest.removeOperatorToUndeploy(operatorId); + Assert.assertFalse(underTest.getOperatorsToUndeploy().contains(operatorId)); + } + + @Test + public void testRemoveOperatorToDeploy() + { + int operatorId = 1; + underTest.addOperatorToDeploy(operatorId); + Assert.assertTrue(underTest.getOperatorsToDeploy().contains(operatorId)); + underTest.removeOperatorToDeploy(operatorId); + Assert.assertFalse(underTest.getOperatorsToDeploy().contains(operatorId)); + } + +}
