http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java new file mode 100644 index 0000000..e2a3add --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT; +import static org.apache.ignite.IgniteSystemProperties.getLong; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; +import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; + +/** + * Service deployment manager. + * + * @see ServiceDeploymentTask + * @see ServiceDeploymentActions + */ +public class ServiceDeploymentManager { + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Services discovery messages listener. */ + private final DiscoveryEventListener discoLsnr = new ServiceDiscoveryListener(); + + /** Services communication messages listener. */ + private final GridMessageListener commLsnr = new ServiceCommunicationListener(); + + /** Services deployments tasks. */ + private final Map<ServiceDeploymentProcessId, ServiceDeploymentTask> tasks = new ConcurrentHashMap<>(); + + /** Discovery events received while cluster state transition was in progress. */ + private final List<PendingEventHolder> pendingEvts = new ArrayList<>(); + + /** Topology version of latest deployment task's event. */ + private final AtomicReference<AffinityTopologyVersion> readyTopVer = + new AtomicReference<>(AffinityTopologyVersion.NONE); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Deployment worker. */ + private final ServicesDeploymentWorker depWorker; + + /** Default dump operation limit. */ + private final long dfltDumpTimeoutLimit; + + /** + * @param ctx Grid kernal context. + */ + ServiceDeploymentManager(@NotNull GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + ctx.event().addDiscoveryEventListener(discoLsnr, + EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); + + ctx.io().addMessageListener(TOPIC_SERVICES, commLsnr); + + depWorker = new ServicesDeploymentWorker(); + + long limit = getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT, 0); + + dfltDumpTimeoutLimit = limit <= 0 ? 30 * 60_000 : limit; + } + + /** + * Starts processing of services deployments tasks. + */ + void startProcessing() { + assert depWorker.runner() == null : "Method shouldn't be called twice during lifecycle;"; + + new IgniteThread(ctx.igniteInstanceName(), "services-deployment-worker", depWorker).start(); + } + + /** + * Stops processing of services deployments tasks. + * + * @param stopErr Cause error of deployment manager stop. + */ + void stopProcessing(IgniteCheckedException stopErr) { + busyLock.block(); // Will not release it. + + ctx.event().removeDiscoveryEventListener(discoLsnr); + + ctx.io().removeMessageListener(commLsnr); + + U.cancel(depWorker); + + U.join(depWorker, log); + + depWorker.tasksQueue.clear(); + + pendingEvts.clear(); + + tasks.values().forEach(t -> t.completeError(stopErr)); + + tasks.clear(); + } + + /** + * @return Ready topology version. + */ + public AffinityTopologyVersion readyTopologyVersion() { + return readyTopVer.get(); + } + + /** + * Special handler for local discovery events for which the regular events are not generated, e.g. local join and + * client reconnect events. + * + * @param evt Discovery event. + * @param discoCache Discovery cache. + * @param depActions Service deployment actions. + */ + void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache, ServiceDeploymentActions depActions) { + checkClusterStateAndAddTask(evt, discoCache, depActions); + } + + /** + * Invokes {@link GridWorker#blockingSectionBegin()} for service deployment worker. + * <p/> + * Should be called from service deployment worker thread. + */ + void deployerBlockingSectionBegin() { + assert depWorker != null && Thread.currentThread() == depWorker.runner(); + + depWorker.blockingSectionBegin(); + } + + /** + * Invokes {@link GridWorker#blockingSectionEnd()} for service deployment worker. + * <p/> + * Should be called from service deployment worker thread. + */ + void deployerBlockingSectionEnd() { + assert depWorker != null && Thread.currentThread() == depWorker.runner(); + + depWorker.blockingSectionEnd(); + } + + /** + * Checks cluster state and handles given event. + * <pre> + * - if cluster is active, then adds event in deployment queue; + * - if cluster state in transition, them adds to pending events; + * - if cluster is inactive, then ignore event; + * </pre> + * <b>Should be called from discovery thread.</b> + * + * @param evt Discovery event. + * @param discoCache Discovery cache. + * @param depActions Services deployment actions. + */ + private void checkClusterStateAndAddTask(@NotNull DiscoveryEvent evt, @NotNull DiscoCache discoCache, + @Nullable ServiceDeploymentActions depActions) { + if (discoCache.state().transition()) + pendingEvts.add(new PendingEventHolder(evt, discoCache.version(), depActions)); + else if (discoCache.state().active()) + addTask(evt, discoCache.version(), depActions); + else if (log.isDebugEnabled()) + log.debug("Ignore event, cluster is inactive, evt=" + evt); + } + + /** + * Adds deployment task with given deployment process id. + * </p> + * <b>Should be called from discovery thread.</b> + * + * @param evt Discovery event. + * @param topVer Topology version. + * @param depActions Services deployment actions. + */ + private void addTask(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersion topVer, + @Nullable ServiceDeploymentActions depActions) { + final ServiceDeploymentProcessId depId = deploymentId(evt, topVer); + + ServiceDeploymentTask task = tasks.computeIfAbsent(depId, + t -> new ServiceDeploymentTask(ctx, depId)); + + if (!task.onEnqueued()) { + if (log.isDebugEnabled()) { + log.debug("Service deployment process hasn't been started for discovery event, because of " + + "a task with the same deployment process id is already added (possible cause is message's" + + " double delivering), evt=" + evt); + } + + return; + } + + assert task.event() == null && task.topologyVersion() == null; + + task.onEvent(evt, topVer, depActions); + + depWorker.tasksQueue.add(task); + } + + /** + * Creates service deployment process id. + * + * @param evt Discovery event. + * @param topVer Topology version. + * @return Services deployment process id. + */ + private ServiceDeploymentProcessId deploymentId(@NotNull DiscoveryEvent evt, + @NotNull AffinityTopologyVersion topVer) { + return evt instanceof DiscoveryCustomEvent ? + new ServiceDeploymentProcessId(((DiscoveryCustomEvent)evt).customMessage().id()) : + new ServiceDeploymentProcessId(topVer); + } + + /** + * Clones some instances of {@link DiscoveryCustomEvent} to capture necessary data, to avoid custom messages's + * nullifying by {@link GridDhtPartitionsExchangeFuture#onDone}. + * + * @param evt Discovery event. + * @return Discovery event to process. + */ + private DiscoveryCustomEvent copyIfNeeded(@NotNull DiscoveryCustomEvent evt) { + DiscoveryCustomMessage msg = evt.customMessage(); + + assert msg != null : "DiscoveryCustomMessage has been nullified concurrently, evt=" + evt; + + if (msg instanceof ServiceChangeBatchRequest) + return evt; + + DiscoveryCustomEvent cp = new DiscoveryCustomEvent(); + + cp.node(evt.node()); + cp.customMessage(msg); + cp.eventNode(evt.eventNode()); + cp.affinityTopologyVersion(evt.affinityTopologyVersion()); + + return cp; + } + + /** + * Services discovery messages high priority listener. + * <p/> + * The listener should be notified earlier then PME's listener because of a custom message of {@link + * DiscoveryCustomEvent} may be nullified in PME before the listener will be able to capture it. + */ + private class ServiceDiscoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache) { + if (!enterBusy()) + return; + + final UUID snd = evt.eventNode().id(); + final int evtType = evt.type(); + + assert snd != null : "Event's node id shouldn't be null."; + assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED + || evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was received, evt=" + evt; + + try { + if (evtType == EVT_DISCOVERY_CUSTOM_EVT) { + DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage(); + + if (msg instanceof ChangeGlobalStateFinishMessage) { + ChangeGlobalStateFinishMessage msg0 = (ChangeGlobalStateFinishMessage)msg; + + if (msg0.clusterActive()) + pendingEvts.forEach(t -> addTask(t.evt, t.topVer, t.depActions)); + else if (log.isDebugEnabled()) + pendingEvts.forEach(t -> log.debug("Ignore event, cluster is inactive: " + t.evt)); + + pendingEvts.clear(); + } + else { + if (msg instanceof ServiceClusterDeploymentResultBatch) { + ServiceClusterDeploymentResultBatch msg0 = (ServiceClusterDeploymentResultBatch)msg; + + if (log.isDebugEnabled()) { + log.debug("Received services full deployments message : " + + "[locId=" + ctx.localNodeId() + ", snd=" + snd + ", msg=" + msg0 + ']'); + } + + ServiceDeploymentProcessId depId = msg0.deploymentId(); + + assert depId != null; + + ServiceDeploymentTask task = tasks.get(depId); + + if (task != null) // May be null in case of double delivering + task.onReceiveFullDeploymentsMessage(msg0); + } + else if (msg instanceof CacheAffinityChangeMessage) + addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), null); + else { + ServiceDeploymentActions depActions = null; + + if (msg instanceof ChangeGlobalStateMessage) + depActions = ((ChangeGlobalStateMessage)msg).servicesDeploymentActions(); + else if (msg instanceof ServiceChangeBatchRequest) { + depActions = ((ServiceChangeBatchRequest)msg) + .servicesDeploymentActions(); + } + else if (msg instanceof DynamicCacheChangeBatch) + depActions = ((DynamicCacheChangeBatch)msg).servicesDeploymentActions(); + + if (depActions != null) + addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), depActions); + } + } + } + else { + if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) + tasks.values().forEach(t -> t.onNodeLeft(snd)); + + checkClusterStateAndAddTask(evt, discoCache, null); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } + } + + /** + * Pending event's holder. + */ + private static class PendingEventHolder { + /** Discovery event. */ + private DiscoveryEvent evt; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Services deployemnt actions. */ + private ServiceDeploymentActions depActions; + + /** + * @param evt Discovery event. + * @param topVer Topology version. + * @param depActions Services deployment actions. + */ + private PendingEventHolder(DiscoveryEvent evt, + AffinityTopologyVersion topVer, ServiceDeploymentActions depActions) { + this.evt = evt; + this.topVer = topVer; + this.depActions = depActions; + } + } + + /** + * Services messages communication listener. + */ + private class ServiceCommunicationListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (!enterBusy()) + return; + + try { + if (msg instanceof ServiceSingleNodeDeploymentResultBatch) { + ServiceSingleNodeDeploymentResultBatch msg0 = (ServiceSingleNodeDeploymentResultBatch)msg; + + if (log.isDebugEnabled()) { + log.debug("Received services single deployments message : " + + "[locId=" + ctx.localNodeId() + ", snd=" + nodeId + ", msg=" + msg0 + ']'); + } + + tasks.computeIfAbsent(msg0.deploymentId(), + t -> new ServiceDeploymentTask(ctx, msg0.deploymentId())) + .onReceiveSingleDeploymentsMessage(nodeId, msg0); + } + } + finally { + leaveBusy(); + } + } + } + + /** + * Services deployment worker. + */ + private class ServicesDeploymentWorker extends GridWorker { + /** Queue to process. */ + private final LinkedBlockingQueue<ServiceDeploymentTask> tasksQueue = new LinkedBlockingQueue<>(); + + /** {@inheritDoc} */ + private ServicesDeploymentWorker() { + super(ctx.igniteInstanceName(), "services-deployment-worker", + ServiceDeploymentManager.this.log, ctx.workersRegistry()); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + Throwable err = null; + + try { + ServiceDeploymentTask task; + + while (!isCancelled()) { + onIdle(); + + blockingSectionBegin(); + + try { + task = tasksQueue.take(); + } + finally { + blockingSectionEnd(); + } + + if (isCancelled()) + Thread.currentThread().interrupt(); + + task.init(); + + final long dumpTimeout = 2 * ctx.config().getNetworkTimeout(); + + long dumpCnt = 0; + long nextDumpTime = 0; + + while (true) { + try { + blockingSectionBegin(); + + try { + task.waitForComplete(dumpTimeout); + } + finally { + blockingSectionEnd(); + } + + taskPostProcessing(task); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + if (isCancelled()) + return; + + if (nextDumpTime <= U.currentTimeMillis()) { + log.warning("Failed to wait service deployment process or timeout had been" + + " reached, timeout=" + dumpTimeout + ", task=" + task); + + long nextTimeout = dumpTimeout * (2 + dumpCnt++); + + nextDumpTime = U.currentTimeMillis() + Math.min(nextTimeout, dfltDumpTimeoutLimit); + } + } + catch (ClusterTopologyServerNotFoundException e) { + U.error(log, e); + + taskPostProcessing(task); + + break; + } + } + } + } + catch (InterruptedException | IgniteInterruptedCheckedException e) { + Thread.currentThread().interrupt(); + + if (!isCancelled()) + err = e; + } + catch (Throwable t) { + err = t; + } + finally { + if (err == null && !isCancelled()) + err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly."); + + if (err instanceof OutOfMemoryError) + ctx.failure().process(new FailureContext(CRITICAL_ERROR, err)); + else if (err != null) + ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); + } + } + + /** + * Does additional actions after task's completion. + */ + private void taskPostProcessing(ServiceDeploymentTask task) { + AffinityTopologyVersion readyVer = readyTopVer.get(); + + readyTopVer.compareAndSet(readyVer, task.topologyVersion()); + + tasks.remove(task.deploymentId()); + } + } + + /** + * Enters busy state. + * + * @return {@code true} if entered to busy state. + */ + private boolean enterBusy() { + return busyLock.enterBusy(); + } + + /** + * Leaves busy state. + */ + private void leaveBusy() { + busyLock.leaveBusy(); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java new file mode 100644 index 0000000..711c302 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.nio.ByteBuffer; +import java.util.Objects; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Service deployment process' identifier. + */ +public class ServiceDeploymentProcessId implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version. */ + @Nullable private AffinityTopologyVersion topVer; + + /** Request's id. */ + @Nullable private IgniteUuid reqId; + + /** + * Empty constructor for marshalling purposes. + */ + public ServiceDeploymentProcessId() { + } + + /** + * @param topVer Topology version. + */ + ServiceDeploymentProcessId(@NotNull AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** + * @param reqId Request's id. + */ + ServiceDeploymentProcessId(@NotNull IgniteUuid reqId) { + this.reqId = reqId; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Requests id. + */ + public IgniteUuid requestId() { + return reqId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeIgniteUuid("reqId", reqId)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + reqId = reader.readIgniteUuid("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(ServiceDeploymentProcessId.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 167; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ServiceDeploymentProcessId id = (ServiceDeploymentProcessId)o; + + return F.eq(topVer, id.topVer) && F.eq(reqId, id.reqId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(topVer, reqId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceDeploymentProcessId.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java new file mode 100644 index 0000000..d41e5aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.services.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; + +/** + * Service deployment request. + */ +public class ServiceDeploymentRequest extends ServiceChangeAbstractRequest { + /** */ + private static final long serialVersionUID = 0L; + + /** Service configuration. */ + private final ServiceConfiguration cfg; + + /** + * @param srvcId Service id. + * @param cfg Service configuration. + */ + public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg) { + super(srvcId); + + this.cfg = cfg; + } + + /** + * @return Service configuration. + */ + public ServiceConfiguration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceDeploymentRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java new file mode 100644 index 0000000..ea0114b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.services.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL; + +/** + * Services deployment task. + * + * @see ServiceDeploymentActions + * @see ServiceSingleNodeDeploymentResultBatch + * @see ServiceClusterDeploymentResultBatch + */ +class ServiceDeploymentTask { + /** Task's completion future. */ + private final GridFutureAdapter<?> completeStateFut = new GridFutureAdapter<>(); + + /** Task's completion of initialization future. */ + private final GridFutureAdapter<?> initTaskFut = new GridFutureAdapter<>(); + + /** Task's completion of remaining nodes ids initialization future. */ + private final GridFutureAdapter<?> initCrdFut = new GridFutureAdapter<>(); + + /** Coordinator initialization actions mutex. */ + private final Object initCrdMux = new Object(); + + /** Remaining nodes to received services single deployments message. */ + @GridToStringInclude + private final Set<UUID> remaining = new HashSet<>(); + + /** Added in deployment queue flag. */ + private final AtomicBoolean addedInQueue = new AtomicBoolean(false); + + /** Single deployments messages to process. */ + @GridToStringInclude + private final Map<UUID, ServiceSingleNodeDeploymentResultBatch> singleDepsMsgs = new HashMap<>(); + + /** Expected services assignments. */ + @GridToStringExclude + private final Map<IgniteUuid, Map<UUID, Integer>> expDeps = new HashMap<>(); + + /** Deployment errors. */ + @GridToStringExclude + private final Map<IgniteUuid, Collection<Throwable>> depErrors = new HashMap<>(); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Service processor. */ + private final IgniteServiceProcessor srvcProc; + + /** Deployment process id. */ + @GridToStringInclude + private final ServiceDeploymentProcessId depId; + + /** Coordinator node id. */ + @GridToStringExclude + private volatile UUID crdId; + + /** Cause discovery event. */ + @GridToStringInclude + private volatile DiscoveryEvent evt; + + /** Topology version. */ + @GridToStringInclude + private volatile AffinityTopologyVersion evtTopVer; + + /** Services deployment actions. */ + private volatile ServiceDeploymentActions depActions; + + /** + * @param ctx Kernal context. + * @param depId Service deployment process id. + */ + protected ServiceDeploymentTask(GridKernalContext ctx, ServiceDeploymentProcessId depId) { + assert ctx.service() instanceof IgniteServiceProcessor; + + this.depId = depId; + this.ctx = ctx; + + srvcProc = (IgniteServiceProcessor)ctx.service(); + log = ctx.log(getClass()); + } + + /** + * Handles discovery event receiving. + * + * @param evt Discovery event. + * @param evtTopVer Topology version. + * @param depActions Services deployment actions. + */ + protected void onEvent(@NotNull DiscoveryEvent evt, @NotNull AffinityTopologyVersion evtTopVer, + @Nullable ServiceDeploymentActions depActions) { + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED + || evt.type() == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event type, evt=" + evt; + + this.evt = evt; + this.evtTopVer = evtTopVer; + this.depActions = depActions; + } + + /** + * Initializes deployment task. + * + * @throws IgniteCheckedException In case of an error. + */ + protected void init() throws IgniteCheckedException { + if (isCompleted() || initTaskFut.isDone()) + return; + + assert evt != null && evtTopVer != null : "Illegal state to perform task's initialization :" + this; + + if (log.isDebugEnabled()) { + log.debug("Started services deployment task init: [depId=" + depId + + ", locId=" + ctx.localNodeId() + ", evt=" + evt + ']'); + } + + try { + if (depActions != null && depActions.deactivate()) { + srvcProc.onDeActivate(ctx); + + completeSuccess(); + + return; + } + + if (depActions == null) { + Map<IgniteUuid, ServiceInfo> toDeploy = new HashMap<>(); + + final int evtType = evt.type(); + + if (evtType == EVT_DISCOVERY_CUSTOM_EVT) { + DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage(); + + if (msg instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg0 = (CacheAffinityChangeMessage)msg; + + Map<IgniteUuid, ServiceInfo> services = srvcProc.deployedServices(); + + if (!services.isEmpty()) { + Map<Integer, Map<Integer, List<UUID>>> change = msg0.assignmentChange(); + + if (change != null) { + Set<String> names = new HashSet<>(); + + ctx.cache().cacheDescriptors().forEach((name, desc) -> { + if (change.containsKey(desc.groupId())) + names.add(name); + }); + + services.forEach((srvcId, desc) -> { + if (names.contains(desc.cacheName())) + toDeploy.put(srvcId, desc); + }); + } + } + } + } + else { + assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED; + + final ClusterNode eventNode = evt.eventNode(); + + final Map<IgniteUuid, ServiceInfo> deployedServices = srvcProc.deployedServices(); + + if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) { + deployedServices.forEach((srvcId, desc) -> { + if (desc.topologySnapshot().containsKey(eventNode.id()) || + (desc.cacheName() != null && !eventNode.isClient())) // If affinity service + toDeploy.put(srvcId, desc); + }); + } + else { + toDeploy.putAll(deployedServices); + + toDeploy.putAll(srvcProc.servicesReceivedFromJoin(eventNode.id())); + } + } + + if (toDeploy.isEmpty()) { + completeSuccess(); + + if (log.isDebugEnabled()) + log.debug("No services deployment deployment action required."); + + return; + } + + depActions = new ServiceDeploymentActions(); + + depActions.servicesToDeploy(toDeploy); + } + + ClusterNode crd = srvcProc.coordinator(); + + if (crd == null) { + onAllServersLeft(); + + return; + } + + crdId = crd.id(); + + if (crd.isLocal()) + initCoordinator(evtTopVer); + + processDeploymentActions(depActions); + } + catch (Exception e) { + log.error("Error occurred while initializing deployment task, err=" + e.getMessage(), e); + + completeError(e); + + throw new IgniteCheckedException(e); + } + finally { + if (!initTaskFut.isDone()) + initTaskFut.onDone(); + + if (log.isDebugEnabled()) { + log.debug("Finished services deployment future init: [depId=" + deploymentId() + + ", locId=" + ctx.localNodeId() + ']'); + } + } + } + + /** + * @param depActions Services deployment actions. + */ + @SuppressWarnings("ErrorNotRethrown") + private void processDeploymentActions(@NotNull ServiceDeploymentActions depActions) { + srvcProc.updateDeployedServices(depActions); + + depActions.servicesToUndeploy().forEach((srvcId, desc) -> { + srvcProc.deployment().deployerBlockingSectionBegin(); + + try { + srvcProc.undeploy(srvcId); + } + finally { + srvcProc.deployment().deployerBlockingSectionEnd(); + } + }); + + if (!depActions.servicesToDeploy().isEmpty()) { + final Collection<UUID> evtTopNodes = F.nodeIds(ctx.discovery().nodes(evtTopVer)); + + depActions.servicesToDeploy().forEach((srvcId, desc) -> { + try { + ServiceConfiguration cfg = desc.configuration(); + + TreeMap<UUID, Integer> oldTop = filterDeadNodes(evtTopNodes, desc.topologySnapshot()); + + Map<UUID, Integer> top = reassign(srvcId, cfg, evtTopVer, oldTop); + + expDeps.put(srvcId, top); + + Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0); + + if (expCnt > srvcProc.localInstancesCount(srvcId)) { + srvcProc.deployment().deployerBlockingSectionBegin(); + + try { + srvcProc.redeploy(srvcId, cfg, top); + } + finally { + srvcProc.deployment().deployerBlockingSectionEnd(); + } + } + } + catch (IgniteCheckedException e) { + depErrors.computeIfAbsent(srvcId, c -> new ArrayList<>()).add(e); + } + }); + } + + createAndSendSingleDeploymentsMessage(depId, depErrors); + } + + /** + * Prepares the coordinator to manage deployment process. + * + * @param topVer Topology version to initialize {@link #remaining} collection. + */ + private void initCoordinator(AffinityTopologyVersion topVer) { + synchronized (initCrdMux) { + if (initCrdFut.isDone()) + return; + + try { + for (ClusterNode node : ctx.discovery().nodes(topVer)) { + if (ctx.discovery().alive(node) && !singleDepsMsgs.containsKey(node.id())) + remaining.add(node.id()); + } + } + catch (Exception e) { + log.error("Error occurred while initializing remaining collection.", e); + + initCrdFut.onDone(e); + } + finally { + if (!initCrdFut.isDone()) + initCrdFut.onDone(); + } + } + } + + /** + * @param depId Deployment process id. + * @param errors Deployment errors. + */ + private void createAndSendSingleDeploymentsMessage(ServiceDeploymentProcessId depId, + final Map<IgniteUuid, Collection<Throwable>> errors) { + assert crdId != null : "Coordinator should be defined at this point, locId=" + ctx.localNodeId(); + + try { + Set<IgniteUuid> depServicesIds = new HashSet<>(); + + if (evt.type() == EVT_NODE_JOINED) { + UUID evtNodeId = evt.eventNode().id(); + + expDeps.forEach((srvcId, top) -> { + if (top.containsKey(evtNodeId)) + depServicesIds.add(srvcId); + }); + } + else + depServicesIds.addAll(expDeps.keySet()); + + Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results = new HashMap<>(); + + for (IgniteUuid srvcId : depServicesIds) { + ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult( + srvcProc.localInstancesCount(srvcId)); + + attachDeploymentErrors(depRes, errors.get(srvcId)); + + results.put(srvcId, depRes); + } + + errors.forEach((srvcId, err) -> { + if (results.containsKey(srvcId)) + return; + + ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult( + srvcProc.localInstancesCount(srvcId)); + + attachDeploymentErrors(depRes, err); + + results.put(srvcId, depRes); + }); + + ServiceSingleNodeDeploymentResultBatch msg = new ServiceSingleNodeDeploymentResultBatch(depId, results); + + if (ctx.localNodeId().equals(crdId)) + onReceiveSingleDeploymentsMessage(ctx.localNodeId(), msg); + else + ctx.io().sendToGridTopic(crdId, TOPIC_SERVICES, msg, SERVICE_POOL); + + if (log.isDebugEnabled()) + log.debug("Send services single deployments message, msg=" + msg); + } + catch (IgniteCheckedException e) { + log.error("Failed to send services single deployments message to coordinator over communication spi.", e); + } + } + + /** + * Handles received single node services map message. + * + * @param snd Sender node id. + * @param msg Single services map message. + */ + protected void onReceiveSingleDeploymentsMessage(UUID snd, ServiceSingleNodeDeploymentResultBatch msg) { + assert depId.equals(msg.deploymentId()) : "Wrong message's deployment process id, msg=" + msg; + + initCrdFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> { + if (isCompleted()) + return; + + synchronized (initCrdMux) { + if (remaining.remove(snd)) { + singleDepsMsgs.put(snd, msg); + + if (remaining.isEmpty()) + onAllReceived(); + } + else if (log.isDebugEnabled()) + log.debug("Unexpected service single deployments received, msg=" + msg); + } + }); + } + + /** + * Handles received full services map message. + * + * @param msg Full services map message. + */ + protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBatch msg) { + assert depId.equals(msg.deploymentId()) : "Wrong message's deployment process id, msg=" + msg; + + initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> { + if (isCompleted()) + return; + + ctx.closure().runLocalSafe(() -> { + try { + ServiceDeploymentActions depResults = msg.servicesDeploymentActions(); + + assert depResults != null : "Services deployment actions should be attached."; + + final Map<IgniteUuid, Map<UUID, Integer>> fullTops = depResults.deploymentTopologies(); + final Map<IgniteUuid, Collection<byte[]>> fullErrors = depResults.deploymentErrors(); + + depActions.deploymentTopologies(fullTops); + depActions.deploymentErrors(fullErrors); + + srvcProc.updateServicesTopologies(fullTops); + + final Map<IgniteUuid, ServiceInfo> services = srvcProc.deployedServices(); + + fullTops.forEach((srvcId, top) -> { + Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0); + + if (expCnt < srvcProc.localInstancesCount(srvcId)) { // Undeploy exceed instances + ServiceInfo desc = services.get(srvcId); + + assert desc != null; + + ServiceConfiguration cfg = desc.configuration(); + + try { + srvcProc.redeploy(srvcId, cfg, top); + } + catch (IgniteCheckedException e) { + log.error("Error occured during cancel exceed service instances: " + + "[srvcId=" + srvcId + ", name=" + desc.name() + ']', e); + } + } + }); + + completeSuccess(); + } + catch (Throwable t) { + log.error("Failed to process services full deployments message, msg=" + msg, t); + + completeError(t); + } + }); + }); + } + + /** + * Completes initiating futures. + * + * @param err Error to complete initiating. + */ + private void completeInitiatingFuture(final Throwable err) { + if (depActions == null) + return; + + depActions.servicesToDeploy().forEach((srvcId, desc) -> { + if (err != null) { + srvcProc.completeInitiatingFuture(true, srvcId, err); + + return; + } + + Collection<byte[]> errors = depActions.deploymentErrors().get(srvcId); + + if (errors == null) { + srvcProc.completeInitiatingFuture(true, srvcId, null); + + return; + } + + Throwable depErr = null; + + for (byte[] error : errors) { + try { + Throwable t = U.unmarshal(ctx, error, null); + + if (depErr == null) + depErr = t; + else + depErr.addSuppressed(t); + } + catch (IgniteCheckedException e) { + log.error("Failed to unmarshal deployment error.", e); + } + } + + srvcProc.completeInitiatingFuture(true, srvcId, depErr); + }); + + for (IgniteUuid reqSrvcId : depActions.servicesToUndeploy().keySet()) + srvcProc.completeInitiatingFuture(false, reqSrvcId, err); + } + + /** + * Creates services full deployments message and send it over discovery. + */ + private void onAllReceived() { + assert !isCompleted(); + + Collection<ServiceClusterDeploymentResult> fullResults = buildFullDeploymentsResults(singleDepsMsgs); + + try { + ServiceClusterDeploymentResultBatch msg = new ServiceClusterDeploymentResultBatch(depId, fullResults); + + ctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + log.error("Failed to send services full deployments message across the ring.", e); + } + } + + /** + * Reassigns service to nodes. + * + * @param cfg Service configuration. + * @param topVer Topology version. + * @param oldTop Previous topology snapshot. + * @throws IgniteCheckedException In case of an error. + */ + private Map<UUID, Integer> reassign(IgniteUuid srvcId, ServiceConfiguration cfg, + AffinityTopologyVersion topVer, TreeMap<UUID, Integer> oldTop) throws IgniteCheckedException { + try { + Map<UUID, Integer> top = srvcProc.reassign(srvcId, cfg, topVer, oldTop); + + if (top.isEmpty()) + throw new IgniteCheckedException("Failed to determine suitable nodes to deploy service."); + + if (log.isDebugEnabled()) + log.debug("Calculated service assignment : [srvcId=" + srvcId + ", srvcTop=" + top + ']'); + + return top; + } + catch (Throwable e) { + throw new IgniteCheckedException("Failed to calculate assignments for service, cfg=" + cfg, e); + } + } + + /** + * Filters dead nodes from given service topology snapshot using given ids. + * + * @param evtTopNodes Ids being used to filter. + * @param top Service topology snapshot. + * @return Filtered service topology snapshot. + */ + private TreeMap<UUID, Integer> filterDeadNodes(Collection<UUID> evtTopNodes, Map<UUID, Integer> top) { + TreeMap<UUID, Integer> filtered = new TreeMap<>(); + + if (F.isEmpty(top)) + return filtered; + + top.forEach((nodeId, cnt) -> { + // We can't just use 'ctx.discovery().alive(UUID)', because during the deployment process discovery + // topology may be changed and results may be different on some set of nodes. + if (evtTopNodes.contains(nodeId)) + filtered.put(nodeId, cnt); + }); + + return filtered; + } + + /** + * Processes single deployments messages to build full deployment results. + * + * @param singleDepsMsgs Services single deployments messages. + * @return Services full deployments results. + */ + private Collection<ServiceClusterDeploymentResult> buildFullDeploymentsResults( + Map<UUID, ServiceSingleNodeDeploymentResultBatch> singleDepsMsgs) { + final Map<IgniteUuid, Map<UUID, ServiceSingleNodeDeploymentResult>> singleResults = new HashMap<>(); + + singleDepsMsgs.forEach((nodeId, msg) -> msg.results().forEach((srvcId, res) -> { + Map<UUID, ServiceSingleNodeDeploymentResult> depResults = singleResults + .computeIfAbsent(srvcId, r -> new HashMap<>()); + + int cnt = res.count(); + + if (cnt != 0) { + Map<UUID, Integer> expTop = expDeps.get(srvcId); + + if (expTop != null) { + Integer expCnt = expTop.get(nodeId); + + cnt = expCnt == null ? 0 : Math.min(cnt, expCnt); + } + } + + if (cnt == 0 && res.errors().isEmpty()) + return; + + ServiceSingleNodeDeploymentResult singleDepRes = new ServiceSingleNodeDeploymentResult(cnt); + + if (!res.errors().isEmpty()) + singleDepRes.errors(res.errors()); + + depResults.put(nodeId, singleDepRes); + })); + + final Collection<ServiceClusterDeploymentResult> fullResults = new ArrayList<>(); + + singleResults.forEach((srvcId, dep) -> { + ServiceClusterDeploymentResult res = new ServiceClusterDeploymentResult(srvcId, dep); + + fullResults.add(res); + }); + + return fullResults; + } + + /** + * @param depRes Service single deployments results. + * @param errors Deployment errors. + */ + private void attachDeploymentErrors(@NotNull ServiceSingleNodeDeploymentResult depRes, + @Nullable Collection<Throwable> errors) { + if (F.isEmpty(errors)) + return; + + Collection<byte[]> errorsBytes = new ArrayList<>(); + + for (Throwable th : errors) { + try { + byte[] arr = U.marshal(ctx, th); + + errorsBytes.add(arr); + } + catch (IgniteCheckedException e) { + log.error("Failed to marshal deployment error, err=" + th, e); + } + } + + depRes.errors(errorsBytes); + } + + /** + * Handles a node leaves topology. + * + * @param nodeId Left node id. + */ + protected void onNodeLeft(UUID nodeId) { + initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> { + if (isCompleted()) + return; + + final boolean crdChanged = nodeId.equals(crdId); + + if (crdChanged) { + ClusterNode crd = srvcProc.coordinator(); + + if (crd != null) { + crdId = crd.id(); + + if (crd.isLocal()) + initCoordinator(evtTopVer); + + createAndSendSingleDeploymentsMessage(depId, depErrors); + } + else + onAllServersLeft(); + } + else if (ctx.localNodeId().equals(crdId)) { + synchronized (initCrdMux) { + boolean rmvd = remaining.remove(nodeId); + + if (rmvd && remaining.isEmpty()) { + singleDepsMsgs.remove(nodeId); + + onAllReceived(); + } + } + } + }); + } + + /** + * Handles case when all server nodes have left the grid. + */ + private void onAllServersLeft() { + assert ctx.clientNode(); + + completeError(new ClusterTopologyServerNotFoundException("Failed to resolve coordinator to continue services " + + "deployment process: [locId=" + ctx.localNodeId() + "client=" + ctx.clientNode() + "evt=" + evt + ']')); + } + + /** + * @return Cause discovery event. + */ + public DiscoveryEvent event() { + return evt; + } + + /** + * Returns cause of deployment process topology version. + * + * @return Cause of deployment process topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return evtTopVer; + } + + /** + * Returns services deployment process id of the task. + * + * @return Services deployment process id. + */ + public ServiceDeploymentProcessId deploymentId() { + return depId; + } + + /** + * Completes the task. + */ + public void completeSuccess() { + if (!completeStateFut.isDone()) { + completeInitiatingFuture(null); + + completeStateFut.onDone(); + } + + if (!initTaskFut.isDone()) + initTaskFut.onDone(); + + if (!initCrdFut.isDone()) + initCrdFut.onDone(); + } + + /** + * @param err Error to complete with. + */ + public void completeError(Throwable err) { + if (!completeStateFut.isDone()) { + completeInitiatingFuture(err); + + completeStateFut.onDone(err); + } + + if (!initTaskFut.isDone()) + initTaskFut.onDone(err); + + if (!initCrdFut.isDone()) + initCrdFut.onDone(err); + } + + /** + * Returns if the task completed. + * + * @return {@code true} if the task completed, otherwise {@code false}. + */ + protected boolean isCompleted() { + return completeStateFut.isDone(); + } + + /** + * Synchronously waits for completion of the task for up to the given timeout. + * + * @param timeout The maximum time to wait in milliseconds. + * @throws IgniteCheckedException In case of an error. + */ + protected void waitForComplete(long timeout) throws IgniteCheckedException { + completeStateFut.get(timeout); + } + + /** + * Handles when this task is being added in deployment queue. + * <p/> + * Introduced to avoid overhead on calling of {@link Collection#contains(Object)}}. + * + * @return {@code true} if task is has not been added previously, otherwise {@code false}. + */ + protected boolean onEnqueued() { + return addedInQueue.compareAndSet(false, true); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ServiceDeploymentTask task = (ServiceDeploymentTask)o; + + return depId.equals(task.depId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return depId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceDeploymentTask.class, this, + "locNodeId", (ctx != null ? ctx.localNodeId() : "unknown"), + "crdId", crdId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java index 4db44cb..2e14b94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java @@ -29,7 +29,11 @@ import org.jetbrains.annotations.Nullable; /** * Service descriptor. + * + * @deprecated This implementation is based on {@code GridServiceDeployment} which has been deprecated because of + * services internals use messages for deployment management instead of the utility cache, since Ignite 2.8. */ +@Deprecated public class ServiceDescriptorImpl implements ServiceDescriptor { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java new file mode 100644 index 0000000..5a3aefc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceDescriptor; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Service's information container. + */ +public class ServiceInfo implements ServiceDescriptor { + /** */ + private static final long serialVersionUID = 0L; + + /** Origin node ID. */ + private final UUID originNodeId; + + /** Service id. */ + private final IgniteUuid srvcId; + + /** Service configuration. */ + private final ServiceConfiguration cfg; + + /** Statically configured flag. */ + private final boolean staticCfg; + + /** Topology snapshot. */ + @GridToStringInclude + private volatile Map<UUID, Integer> top = Collections.emptyMap(); + + /** + * @param originNodeId Initiating node id. + * @param srvcId Service id. + * @param cfg Service configuration. + */ + public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg) { + this(originNodeId, srvcId, cfg, false); + } + + /** + * @param originNodeId Initiating node id. + * @param srvcId Service id. + * @param cfg Service configuration. + * @param staticCfg Statically configured flag. + */ + public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, @NotNull ServiceConfiguration cfg, + boolean staticCfg) { + this.originNodeId = originNodeId; + this.srvcId = srvcId; + this.cfg = cfg; + this.staticCfg = staticCfg; + } + + /** + * Sets service's new topology snapshot. + * + * @param top Topology snapshot. + */ + public void topologySnapshot(@NotNull Map<UUID, Integer> top) { + this.top = top; + } + + /** + * Returns service's configuration. + * + * @return Service configuration. + */ + public ServiceConfiguration configuration() { + return cfg; + } + + /** + * @return {@code true} if statically configured. + */ + public boolean staticallyConfigured() { + return staticCfg; + } + + /** + * Rerurns services id. + * + * @return Service id. + */ + public IgniteUuid serviceId() { + return srvcId; + } + + /** {@inheritDoc} */ + @Override public String name() { + return cfg.getName(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Class<? extends Service> serviceClass() { + if (cfg instanceof LazyServiceConfiguration) { + String clsName = ((LazyServiceConfiguration)cfg).serviceClassName(); + + try { + return (Class<? extends Service>)Class.forName(clsName); + } + catch (ClassNotFoundException e) { + throw new IgniteException("Failed to find service class: " + clsName, e); + } + } + else + return cfg.getService().getClass(); + } + + /** {@inheritDoc} */ + @Override public int totalCount() { + return cfg.getTotalCount(); + } + + /** {@inheritDoc} */ + @Override public int maxPerNodeCount() { + return cfg.getMaxPerNodeCount(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String cacheName() { + return cfg.getCacheName(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <K> K affinityKey() { + return (K)cfg.getAffinityKey(); + } + + /** {@inheritDoc} */ + @Override public UUID originNodeId() { + return originNodeId; + } + + /** {@inheritDoc} */ + @Override public Map<UUID, Integer> topologySnapshot() { + return Collections.unmodifiableMap(top); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java new file mode 100644 index 0000000..2fea452 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceDescriptor; + +/** + * Adapter for different service processor implementations. + */ +public abstract class ServiceProcessorAdapter extends GridProcessorAdapter { + /** + * @param ctx Kernal context. + */ + protected ServiceProcessorAdapter(GridKernalContext ctx) { + super(ctx); + } + + /** + * @param prj Grid projection. + * @param name Service name. + * @param srvc Service. + * @return Future. + */ + public abstract IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup prj, String name, Service srvc); + + /** + * @param name Service name. + * @param srvc Service instance. + * @return Future. + */ + public abstract IgniteInternalFuture<?> deployClusterSingleton(ClusterGroup prj, String name, Service srvc); + + /** + * @param name Service name. + * @param srvc Service. + * @param totalCnt Total count. + * @param maxPerNodeCnt Max per-node count. + * @return Future. + */ + public abstract IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, String name, Service srvc, int totalCnt, + int maxPerNodeCnt); + + /** + * @param name Service name. + * @param srvc Service. + * @param cacheName Cache name. + * @param affKey Affinity key. + * @return Future. + */ + public abstract IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service srvc, String cacheName, + Object affKey); + + /** + * @param prj Grid projection. + * @param cfgs Service configurations. + * @return Future for deployment. + */ + public abstract IgniteInternalFuture<?> deployAll(ClusterGroup prj, Collection<ServiceConfiguration> cfgs); + + /** + * @param name Service name. + * @return Future. + */ + public abstract IgniteInternalFuture<?> cancel(String name); + + /** + * @return Future. + */ + public abstract IgniteInternalFuture<?> cancelAll(); + + /** + * @param servicesNames Name of services to deploy. + * @return Future. + */ + public abstract IgniteInternalFuture<?> cancelAll(Collection<String> servicesNames); + + /** + * @return Collection of service descriptors. + */ + public abstract Collection<ServiceDescriptor> serviceDescriptors(); + + /** + * @param name Service name. + * @param <T> Service type. + * @return Service by specified service name. + */ + public abstract <T> T service(String name); + + /** + * @param prj Grid projection. + * @param name Service name. + * @param srvcCls Service class. + * @param sticky Whether multi-node request should be done. + * @param timeout If greater than 0 limits service acquire time. Cannot be negative. + * @param <T> Service interface type. + * @return The proxy of a service by its name and class. + * @throws IgniteException If failed to create proxy. + */ + public abstract <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> srvcCls, boolean sticky, + long timeout) throws IgniteException; + + /** + * @param name Service name. + * @param <T> Service type. + * @return Services by specified service name. + */ + public abstract <T> Collection<T> services(String name); + + /** + * @param name Service name. + * @return Service by specified service name. + */ + public abstract ServiceContextImpl serviceContext(String name); + + /** + * @param name Service name. + * @param timeout If greater than 0 limits task execution time. Cannot be negative. + * @return Service topology. + * @throws IgniteCheckedException On error. + */ + public abstract Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException; + + /** + * Callback for local join events for which the regular events are not generated. + * <p/> + * Local join event is expected in cases of joining to topology or client reconnect. + * + * @param evt Discovery event. + * @param discoCache Discovery cache. + */ + public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java new file mode 100644 index 0000000..7f9fc83 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.io.Serializable; +import java.util.ArrayList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Initial data container to be sent to newly joining node for initialization of {@link IgniteServiceProcessor}. + */ +class ServiceProcessorCommonDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Clusters registered services descriptors. */ + private final ArrayList<ServiceInfo> registeredServices; + + /** + * @param registeredServices Clusters registered services descriptors. + */ + public ServiceProcessorCommonDiscoveryData(@NotNull ArrayList<ServiceInfo> registeredServices) { + this.registeredServices = registeredServices; + } + + /** + * Returns clusters registered services descriptors. + * + * @return Registered services descriptors. + */ + public ArrayList<ServiceInfo> registeredServices() { + return registeredServices; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceProcessorCommonDiscoveryData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java new file mode 100644 index 0000000..5a8473d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.io.Serializable; +import java.util.ArrayList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Initial data of {@link IgniteServiceProcessor} to send in cluster on joining node. + */ +public class ServiceProcessorJoinNodeDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Static services configurations info. */ + public final ArrayList<ServiceInfo> staticServicesInfo; + + /** + * @param staticServicesInfo Static services configurations info. + */ + public ServiceProcessorJoinNodeDiscoveryData(@NotNull ArrayList<ServiceInfo> staticServicesInfo) { + this.staticServicesInfo = staticServicesInfo; + } + + /** + * @return Static services configurations info. + */ + public ArrayList<ServiceInfo> services() { + return staticServicesInfo; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceProcessorJoinNodeDiscoveryData.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java new file mode 100644 index 0000000..1f003d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.BYTE_ARR; + +/** + * Service single node deployment result. + * <p/> + * Contains count of deployed service instances on single node and deployment errors if exist. + */ +public class ServiceSingleNodeDeploymentResult implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Count of service's instances. */ + private int cnt; + + /** Serialized exceptions. */ + private Collection<byte[]> errors; + + /** + * Empty constructor for marshalling purposes. + */ + public ServiceSingleNodeDeploymentResult() { + } + + /** + * @param cnt Count of service's instances. + */ + public ServiceSingleNodeDeploymentResult(int cnt) { + this.cnt = cnt; + } + + /** + * @return Count of service's instances. + */ + public int count() { + return cnt; + } + + /** + * @param cnt Count of service's instances. + */ + public void count(int cnt) { + this.cnt = cnt; + } + + /** + * @return Serialized exceptions. + */ + @NotNull public Collection<byte[]> errors() { + return errors != null ? errors : Collections.emptyList(); + } + + /** + * @param errors Serialized exceptions. + */ + public void errors(Collection<byte[]> errors) { + this.errors = errors; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("cnt", cnt)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeCollection("errors", errors, BYTE_ARR)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cnt = reader.readInt("cnt"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errors = reader.readCollection("errors", BYTE_ARR); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(ServiceSingleNodeDeploymentResult.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 169; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ServiceSingleNodeDeploymentResult.class, this); + } +}