http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
new file mode 100644
index 0000000..e2a3add
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT;
+import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+
+/**
+ * Service deployment manager.
+ *
+ * @see ServiceDeploymentTask
+ * @see ServiceDeploymentActions
+ */
+public class ServiceDeploymentManager {
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Services discovery messages listener. */
+    private final DiscoveryEventListener discoLsnr = new 
ServiceDiscoveryListener();
+
+    /** Services communication messages listener. */
+    private final GridMessageListener commLsnr = new 
ServiceCommunicationListener();
+
+    /** Services deployments tasks. */
+    private final Map<ServiceDeploymentProcessId, ServiceDeploymentTask> tasks 
= new ConcurrentHashMap<>();
+
+    /** Discovery events received while cluster state transition was in 
progress. */
+    private final List<PendingEventHolder> pendingEvts = new ArrayList<>();
+
+    /** Topology version of latest deployment task's event. */
+    private final AtomicReference<AffinityTopologyVersion> readyTopVer =
+        new AtomicReference<>(AffinityTopologyVersion.NONE);
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Deployment worker. */
+    private final ServicesDeploymentWorker depWorker;
+
+    /** Default dump operation limit. */
+    private final long dfltDumpTimeoutLimit;
+
+    /**
+     * @param ctx Grid kernal context.
+     */
+    ServiceDeploymentManager(@NotNull GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        ctx.event().addDiscoveryEventListener(discoLsnr,
+            EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, 
EVT_DISCOVERY_CUSTOM_EVT);
+
+        ctx.io().addMessageListener(TOPIC_SERVICES, commLsnr);
+
+        depWorker = new ServicesDeploymentWorker();
+
+        long limit = getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT, 0);
+
+        dfltDumpTimeoutLimit = limit <= 0 ? 30 * 60_000 : limit;
+    }
+
+    /**
+     * Starts processing of services deployments tasks.
+     */
+    void startProcessing() {
+        assert depWorker.runner() == null : "Method shouldn't be called twice 
during lifecycle;";
+
+        new IgniteThread(ctx.igniteInstanceName(), 
"services-deployment-worker", depWorker).start();
+    }
+
+    /**
+     * Stops processing of services deployments tasks.
+     *
+     * @param stopErr Cause error of deployment manager stop.
+     */
+    void stopProcessing(IgniteCheckedException stopErr) {
+        busyLock.block(); // Will not release it.
+
+        ctx.event().removeDiscoveryEventListener(discoLsnr);
+
+        ctx.io().removeMessageListener(commLsnr);
+
+        U.cancel(depWorker);
+
+        U.join(depWorker, log);
+
+        depWorker.tasksQueue.clear();
+
+        pendingEvts.clear();
+
+        tasks.values().forEach(t -> t.completeError(stopErr));
+
+        tasks.clear();
+    }
+
+    /**
+     * @return Ready topology version.
+     */
+    public AffinityTopologyVersion readyTopologyVersion() {
+        return readyTopVer.get();
+    }
+
+    /**
+     * Special handler for local discovery events for which the regular events 
are not generated, e.g. local join and
+     * client reconnect events.
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     * @param depActions Service deployment actions.
+     */
+    void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache, 
ServiceDeploymentActions depActions) {
+        checkClusterStateAndAddTask(evt, discoCache, depActions);
+    }
+
+    /**
+     * Invokes {@link GridWorker#blockingSectionBegin()} for service 
deployment worker.
+     * <p/>
+     * Should be called from service deployment worker thread.
+     */
+    void deployerBlockingSectionBegin() {
+        assert depWorker != null && Thread.currentThread() == 
depWorker.runner();
+
+        depWorker.blockingSectionBegin();
+    }
+
+    /**
+     * Invokes {@link GridWorker#blockingSectionEnd()} for service deployment 
worker.
+     * <p/>
+     * Should be called from service deployment worker thread.
+     */
+    void deployerBlockingSectionEnd() {
+        assert depWorker != null && Thread.currentThread() == 
depWorker.runner();
+
+        depWorker.blockingSectionEnd();
+    }
+
+    /**
+     * Checks cluster state and handles given event.
+     * <pre>
+     * - if cluster is active, then adds event in deployment queue;
+     * - if cluster state in transition, them adds to pending events;
+     * - if cluster is inactive, then ignore event;
+     * </pre>
+     * <b>Should be called from discovery thread.</b>
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     * @param depActions Services deployment actions.
+     */
+    private void checkClusterStateAndAddTask(@NotNull DiscoveryEvent evt, 
@NotNull DiscoCache discoCache,
+        @Nullable ServiceDeploymentActions depActions) {
+        if (discoCache.state().transition())
+            pendingEvts.add(new PendingEventHolder(evt, discoCache.version(), 
depActions));
+        else if (discoCache.state().active())
+            addTask(evt, discoCache.version(), depActions);
+        else if (log.isDebugEnabled())
+            log.debug("Ignore event, cluster is inactive, evt=" + evt);
+    }
+
+    /**
+     * Adds deployment task with given deployment process id.
+     * </p>
+     * <b>Should be called from discovery thread.</b>
+     *
+     * @param evt Discovery event.
+     * @param topVer Topology version.
+     * @param depActions Services deployment actions.
+     */
+    private void addTask(@NotNull DiscoveryEvent evt, @NotNull 
AffinityTopologyVersion topVer,
+        @Nullable ServiceDeploymentActions depActions) {
+        final ServiceDeploymentProcessId depId = deploymentId(evt, topVer);
+
+        ServiceDeploymentTask task = tasks.computeIfAbsent(depId,
+            t -> new ServiceDeploymentTask(ctx, depId));
+
+        if (!task.onEnqueued()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Service deployment process hasn't been started for 
discovery event, because of " +
+                    "a task with the same deployment process id is already 
added (possible cause is message's" +
+                    " double delivering), evt=" + evt);
+            }
+
+            return;
+        }
+
+        assert task.event() == null && task.topologyVersion() == null;
+
+        task.onEvent(evt, topVer, depActions);
+
+        depWorker.tasksQueue.add(task);
+    }
+
+    /**
+     * Creates service deployment process id.
+     *
+     * @param evt Discovery event.
+     * @param topVer Topology version.
+     * @return Services deployment process id.
+     */
+    private ServiceDeploymentProcessId deploymentId(@NotNull DiscoveryEvent 
evt,
+        @NotNull AffinityTopologyVersion topVer) {
+        return evt instanceof DiscoveryCustomEvent ?
+            new 
ServiceDeploymentProcessId(((DiscoveryCustomEvent)evt).customMessage().id()) :
+            new ServiceDeploymentProcessId(topVer);
+    }
+
+    /**
+     * Clones some instances of {@link DiscoveryCustomEvent} to capture 
necessary data, to avoid custom messages's
+     * nullifying by {@link GridDhtPartitionsExchangeFuture#onDone}.
+     *
+     * @param evt Discovery event.
+     * @return Discovery event to process.
+     */
+    private DiscoveryCustomEvent copyIfNeeded(@NotNull DiscoveryCustomEvent 
evt) {
+        DiscoveryCustomMessage msg = evt.customMessage();
+
+        assert msg != null : "DiscoveryCustomMessage has been nullified 
concurrently, evt=" + evt;
+
+        if (msg instanceof ServiceChangeBatchRequest)
+            return evt;
+
+        DiscoveryCustomEvent cp = new DiscoveryCustomEvent();
+
+        cp.node(evt.node());
+        cp.customMessage(msg);
+        cp.eventNode(evt.eventNode());
+        cp.affinityTopologyVersion(evt.affinityTopologyVersion());
+
+        return cp;
+    }
+
+    /**
+     * Services discovery messages high priority listener.
+     * <p/>
+     * The listener should be notified earlier then PME's listener because of 
a custom message of {@link
+     * DiscoveryCustomEvent} may be nullified in PME before the listener will 
be able to capture it.
+     */
+    private class ServiceDiscoveryListener implements DiscoveryEventListener, 
HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(final DiscoveryEvent evt, final 
DiscoCache discoCache) {
+            if (!enterBusy())
+                return;
+
+            final UUID snd = evt.eventNode().id();
+            final int evtType = evt.type();
+
+            assert snd != null : "Event's node id shouldn't be null.";
+            assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || 
evtType == EVT_NODE_FAILED
+                || evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was 
received, evt=" + evt;
+
+            try {
+                if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
+                    DiscoveryCustomMessage msg = 
((DiscoveryCustomEvent)evt).customMessage();
+
+                    if (msg instanceof ChangeGlobalStateFinishMessage) {
+                        ChangeGlobalStateFinishMessage msg0 = 
(ChangeGlobalStateFinishMessage)msg;
+
+                        if (msg0.clusterActive())
+                            pendingEvts.forEach(t -> addTask(t.evt, t.topVer, 
t.depActions));
+                        else if (log.isDebugEnabled())
+                            pendingEvts.forEach(t -> log.debug("Ignore event, 
cluster is inactive: " + t.evt));
+
+                        pendingEvts.clear();
+                    }
+                    else {
+                        if (msg instanceof 
ServiceClusterDeploymentResultBatch) {
+                            ServiceClusterDeploymentResultBatch msg0 = 
(ServiceClusterDeploymentResultBatch)msg;
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received services full deployments 
message : " +
+                                    "[locId=" + ctx.localNodeId() + ", snd=" + 
snd + ", msg=" + msg0 + ']');
+                            }
+
+                            ServiceDeploymentProcessId depId = 
msg0.deploymentId();
+
+                            assert depId != null;
+
+                            ServiceDeploymentTask task = tasks.get(depId);
+
+                            if (task != null) // May be null in case of double 
delivering
+                                task.onReceiveFullDeploymentsMessage(msg0);
+                        }
+                        else if (msg instanceof CacheAffinityChangeMessage)
+                            addTask(copyIfNeeded((DiscoveryCustomEvent)evt), 
discoCache.version(), null);
+                        else {
+                            ServiceDeploymentActions depActions = null;
+
+                            if (msg instanceof ChangeGlobalStateMessage)
+                                depActions = 
((ChangeGlobalStateMessage)msg).servicesDeploymentActions();
+                            else if (msg instanceof ServiceChangeBatchRequest) 
{
+                                depActions = ((ServiceChangeBatchRequest)msg)
+                                    .servicesDeploymentActions();
+                            }
+                            else if (msg instanceof DynamicCacheChangeBatch)
+                                depActions = 
((DynamicCacheChangeBatch)msg).servicesDeploymentActions();
+
+                            if (depActions != null)
+                                
addTask(copyIfNeeded((DiscoveryCustomEvent)evt), discoCache.version(), 
depActions);
+                        }
+                    }
+                }
+                else {
+                    if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED)
+                        tasks.values().forEach(t -> t.onNodeLeft(snd));
+
+                    checkClusterStateAndAddTask(evt, discoCache, null);
+                }
+            }
+            finally {
+                leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+    }
+
+    /**
+     * Pending event's holder.
+     */
+    private static class PendingEventHolder {
+        /** Discovery event. */
+        private DiscoveryEvent evt;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /** Services deployemnt actions. */
+        private ServiceDeploymentActions depActions;
+
+        /**
+         * @param evt Discovery event.
+         * @param topVer Topology version.
+         * @param depActions Services deployment actions.
+         */
+        private PendingEventHolder(DiscoveryEvent evt,
+            AffinityTopologyVersion topVer, ServiceDeploymentActions 
depActions) {
+            this.evt = evt;
+            this.topVer = topVer;
+            this.depActions = depActions;
+        }
+    }
+
+    /**
+     * Services messages communication listener.
+     */
+    private class ServiceCommunicationListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!enterBusy())
+                return;
+
+            try {
+                if (msg instanceof ServiceSingleNodeDeploymentResultBatch) {
+                    ServiceSingleNodeDeploymentResultBatch msg0 = 
(ServiceSingleNodeDeploymentResultBatch)msg;
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received services single deployments 
message : " +
+                            "[locId=" + ctx.localNodeId() + ", snd=" + nodeId 
+ ", msg=" + msg0 + ']');
+                    }
+
+                    tasks.computeIfAbsent(msg0.deploymentId(),
+                        t -> new ServiceDeploymentTask(ctx, 
msg0.deploymentId()))
+                        .onReceiveSingleDeploymentsMessage(nodeId, msg0);
+                }
+            }
+            finally {
+                leaveBusy();
+            }
+        }
+    }
+
+    /**
+     * Services deployment worker.
+     */
+    private class ServicesDeploymentWorker extends GridWorker {
+        /** Queue to process. */
+        private final LinkedBlockingQueue<ServiceDeploymentTask> tasksQueue = 
new LinkedBlockingQueue<>();
+
+        /** {@inheritDoc} */
+        private ServicesDeploymentWorker() {
+            super(ctx.igniteInstanceName(), "services-deployment-worker",
+                ServiceDeploymentManager.this.log, ctx.workersRegistry());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            Throwable err = null;
+
+            try {
+                ServiceDeploymentTask task;
+
+                while (!isCancelled()) {
+                    onIdle();
+
+                    blockingSectionBegin();
+
+                    try {
+                        task = tasksQueue.take();
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
+
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
+
+                    task.init();
+
+                    final long dumpTimeout = 2 * 
ctx.config().getNetworkTimeout();
+
+                    long dumpCnt = 0;
+                    long nextDumpTime = 0;
+
+                    while (true) {
+                        try {
+                            blockingSectionBegin();
+
+                            try {
+                                task.waitForComplete(dumpTimeout);
+                            }
+                            finally {
+                                blockingSectionEnd();
+                            }
+
+                            taskPostProcessing(task);
+
+                            break;
+                        }
+                        catch (IgniteFutureTimeoutCheckedException ignored) {
+                            if (isCancelled())
+                                return;
+
+                            if (nextDumpTime <= U.currentTimeMillis()) {
+                                log.warning("Failed to wait service deployment 
process or timeout had been" +
+                                    " reached, timeout=" + dumpTimeout + ", 
task=" + task);
+
+                                long nextTimeout = dumpTimeout * (2 + 
dumpCnt++);
+
+                                nextDumpTime = U.currentTimeMillis() + 
Math.min(nextTimeout, dfltDumpTimeoutLimit);
+                            }
+                        }
+                        catch (ClusterTopologyServerNotFoundException e) {
+                            U.error(log, e);
+
+                            taskPostProcessing(task);
+
+                            break;
+                        }
+                    }
+                }
+            }
+            catch (InterruptedException | IgniteInterruptedCheckedException e) 
{
+                Thread.currentThread().interrupt();
+
+                if (!isCancelled())
+                    err = e;
+            }
+            catch (Throwable t) {
+                err = t;
+            }
+            finally {
+                if (err == null && !isCancelled())
+                    err = new IllegalStateException("Worker " + name() + " is 
terminated unexpectedly.");
+
+                if (err instanceof OutOfMemoryError)
+                    ctx.failure().process(new FailureContext(CRITICAL_ERROR, 
err));
+                else if (err != null)
+                    ctx.failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+            }
+        }
+
+        /**
+         * Does additional actions after task's completion.
+         */
+        private void taskPostProcessing(ServiceDeploymentTask task) {
+            AffinityTopologyVersion readyVer = readyTopVer.get();
+
+            readyTopVer.compareAndSet(readyVer, task.topologyVersion());
+
+            tasks.remove(task.deploymentId());
+        }
+    }
+
+    /**
+     * Enters busy state.
+     *
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        return busyLock.enterBusy();
+    }
+
+    /**
+     * Leaves busy state.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java
new file mode 100644
index 0000000..711c302
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessId.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service deployment process' identifier.
+ */
+public class ServiceDeploymentProcessId implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version. */
+    @Nullable private AffinityTopologyVersion topVer;
+
+    /** Request's id. */
+    @Nullable private IgniteUuid reqId;
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public ServiceDeploymentProcessId() {
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    ServiceDeploymentProcessId(@NotNull AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+    }
+
+    /**
+     * @param reqId Request's id.
+     */
+    ServiceDeploymentProcessId(@NotNull IgniteUuid reqId) {
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Requests id.
+     */
+    public IgniteUuid requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                reqId = reader.readIgniteUuid("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(ServiceDeploymentProcessId.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 167;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ServiceDeploymentProcessId id = (ServiceDeploymentProcessId)o;
+
+        return F.eq(topVer, id.topVer) && F.eq(reqId, id.reqId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(topVer, reqId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentProcessId.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java
new file mode 100644
index 0000000..d41e5aa
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Service deployment request.
+ */
+public class ServiceDeploymentRequest extends ServiceChangeAbstractRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Service configuration. */
+    private final ServiceConfiguration cfg;
+
+    /**
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     */
+    public ServiceDeploymentRequest(@NotNull IgniteUuid srvcId, @NotNull 
ServiceConfiguration cfg) {
+        super(srvcId);
+
+        this.cfg = cfg;
+    }
+
+    /**
+     * @return Service configuration.
+     */
+    public ServiceConfiguration configuration() {
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
new file mode 100644
index 0000000..ea0114b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SERVICES;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL;
+
+/**
+ * Services deployment task.
+ *
+ * @see ServiceDeploymentActions
+ * @see ServiceSingleNodeDeploymentResultBatch
+ * @see ServiceClusterDeploymentResultBatch
+ */
+class ServiceDeploymentTask {
+    /** Task's completion future. */
+    private final GridFutureAdapter<?> completeStateFut = new 
GridFutureAdapter<>();
+
+    /** Task's completion of initialization future. */
+    private final GridFutureAdapter<?> initTaskFut = new GridFutureAdapter<>();
+
+    /** Task's completion of remaining nodes ids initialization future. */
+    private final GridFutureAdapter<?> initCrdFut = new GridFutureAdapter<>();
+
+    /** Coordinator initialization actions mutex. */
+    private final Object initCrdMux = new Object();
+
+    /** Remaining nodes to received services single deployments message. */
+    @GridToStringInclude
+    private final Set<UUID> remaining = new HashSet<>();
+
+    /** Added in deployment queue flag. */
+    private final AtomicBoolean addedInQueue = new AtomicBoolean(false);
+
+    /** Single deployments messages to process. */
+    @GridToStringInclude
+    private final Map<UUID, ServiceSingleNodeDeploymentResultBatch> 
singleDepsMsgs = new HashMap<>();
+
+    /** Expected services assignments. */
+    @GridToStringExclude
+    private final Map<IgniteUuid, Map<UUID, Integer>> expDeps = new 
HashMap<>();
+
+    /** Deployment errors. */
+    @GridToStringExclude
+    private final Map<IgniteUuid, Collection<Throwable>> depErrors = new 
HashMap<>();
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Service processor. */
+    private final IgniteServiceProcessor srvcProc;
+
+    /** Deployment process id. */
+    @GridToStringInclude
+    private final ServiceDeploymentProcessId depId;
+
+    /** Coordinator node id. */
+    @GridToStringExclude
+    private volatile UUID crdId;
+
+    /** Cause discovery event. */
+    @GridToStringInclude
+    private volatile DiscoveryEvent evt;
+
+    /** Topology version. */
+    @GridToStringInclude
+    private volatile AffinityTopologyVersion evtTopVer;
+
+    /** Services deployment actions. */
+    private volatile ServiceDeploymentActions depActions;
+
+    /**
+     * @param ctx Kernal context.
+     * @param depId Service deployment process id.
+     */
+    protected ServiceDeploymentTask(GridKernalContext ctx, 
ServiceDeploymentProcessId depId) {
+        assert ctx.service() instanceof IgniteServiceProcessor;
+
+        this.depId = depId;
+        this.ctx = ctx;
+
+        srvcProc = (IgniteServiceProcessor)ctx.service();
+        log = ctx.log(getClass());
+    }
+
+    /**
+     * Handles discovery event receiving.
+     *
+     * @param evt Discovery event.
+     * @param evtTopVer Topology version.
+     * @param depActions Services deployment actions.
+     */
+    protected void onEvent(@NotNull DiscoveryEvent evt, @NotNull 
AffinityTopologyVersion evtTopVer,
+        @Nullable ServiceDeploymentActions depActions) {
+        assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || 
evt.type() == EVT_NODE_FAILED
+            || evt.type() == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event 
type, evt=" + evt;
+
+        this.evt = evt;
+        this.evtTopVer = evtTopVer;
+        this.depActions = depActions;
+    }
+
+    /**
+     * Initializes deployment task.
+     *
+     * @throws IgniteCheckedException In case of an error.
+     */
+    protected void init() throws IgniteCheckedException {
+        if (isCompleted() || initTaskFut.isDone())
+            return;
+
+        assert evt != null && evtTopVer != null : "Illegal state to perform 
task's initialization :" + this;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Started services deployment task init: [depId=" + depId 
+
+                ", locId=" + ctx.localNodeId() + ", evt=" + evt + ']');
+        }
+
+        try {
+            if (depActions != null && depActions.deactivate()) {
+                srvcProc.onDeActivate(ctx);
+
+                completeSuccess();
+
+                return;
+            }
+
+            if (depActions == null) {
+                Map<IgniteUuid, ServiceInfo> toDeploy = new HashMap<>();
+
+                final int evtType = evt.type();
+
+                if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
+                    DiscoveryCustomMessage msg = 
((DiscoveryCustomEvent)evt).customMessage();
+
+                    if (msg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg0 = 
(CacheAffinityChangeMessage)msg;
+
+                        Map<IgniteUuid, ServiceInfo> services = 
srvcProc.deployedServices();
+
+                        if (!services.isEmpty()) {
+                            Map<Integer, Map<Integer, List<UUID>>> change = 
msg0.assignmentChange();
+
+                            if (change != null) {
+                                Set<String> names = new HashSet<>();
+
+                                ctx.cache().cacheDescriptors().forEach((name, 
desc) -> {
+                                    if (change.containsKey(desc.groupId()))
+                                        names.add(name);
+                                });
+
+                                services.forEach((srvcId, desc) -> {
+                                    if (names.contains(desc.cacheName()))
+                                        toDeploy.put(srvcId, desc);
+                                });
+                            }
+                        }
+                    }
+                }
+                else {
+                    assert evtType == EVT_NODE_JOINED || evtType == 
EVT_NODE_LEFT || evtType == EVT_NODE_FAILED;
+
+                    final ClusterNode eventNode = evt.eventNode();
+
+                    final Map<IgniteUuid, ServiceInfo> deployedServices = 
srvcProc.deployedServices();
+
+                    if (evtType == EVT_NODE_LEFT || evtType == 
EVT_NODE_FAILED) {
+                        deployedServices.forEach((srvcId, desc) -> {
+                            if 
(desc.topologySnapshot().containsKey(eventNode.id()) ||
+                                (desc.cacheName() != null && 
!eventNode.isClient())) // If affinity service
+                                toDeploy.put(srvcId, desc);
+                        });
+                    }
+                    else {
+                        toDeploy.putAll(deployedServices);
+
+                        
toDeploy.putAll(srvcProc.servicesReceivedFromJoin(eventNode.id()));
+                    }
+                }
+
+                if (toDeploy.isEmpty()) {
+                    completeSuccess();
+
+                    if (log.isDebugEnabled())
+                        log.debug("No services deployment deployment action 
required.");
+
+                    return;
+                }
+
+                depActions = new ServiceDeploymentActions();
+
+                depActions.servicesToDeploy(toDeploy);
+            }
+
+            ClusterNode crd = srvcProc.coordinator();
+
+            if (crd == null) {
+                onAllServersLeft();
+
+                return;
+            }
+
+            crdId = crd.id();
+
+            if (crd.isLocal())
+                initCoordinator(evtTopVer);
+
+            processDeploymentActions(depActions);
+        }
+        catch (Exception e) {
+            log.error("Error occurred while initializing deployment task, 
err=" + e.getMessage(), e);
+
+            completeError(e);
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (!initTaskFut.isDone())
+                initTaskFut.onDone();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Finished services deployment future init: [depId=" 
+ deploymentId() +
+                    ", locId=" + ctx.localNodeId() + ']');
+            }
+        }
+    }
+
+    /**
+     * @param depActions Services deployment actions.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void processDeploymentActions(@NotNull ServiceDeploymentActions 
depActions) {
+        srvcProc.updateDeployedServices(depActions);
+
+        depActions.servicesToUndeploy().forEach((srvcId, desc) -> {
+            srvcProc.deployment().deployerBlockingSectionBegin();
+
+            try {
+                srvcProc.undeploy(srvcId);
+            }
+            finally {
+                srvcProc.deployment().deployerBlockingSectionEnd();
+            }
+        });
+
+        if (!depActions.servicesToDeploy().isEmpty()) {
+            final Collection<UUID> evtTopNodes = 
F.nodeIds(ctx.discovery().nodes(evtTopVer));
+
+            depActions.servicesToDeploy().forEach((srvcId, desc) -> {
+                try {
+                    ServiceConfiguration cfg = desc.configuration();
+
+                    TreeMap<UUID, Integer> oldTop = 
filterDeadNodes(evtTopNodes, desc.topologySnapshot());
+
+                    Map<UUID, Integer> top = reassign(srvcId, cfg, evtTopVer, 
oldTop);
+
+                    expDeps.put(srvcId, top);
+
+                    Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0);
+
+                    if (expCnt > srvcProc.localInstancesCount(srvcId)) {
+                        srvcProc.deployment().deployerBlockingSectionBegin();
+
+                        try {
+                            srvcProc.redeploy(srvcId, cfg, top);
+                        }
+                        finally {
+                            srvcProc.deployment().deployerBlockingSectionEnd();
+                        }
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    depErrors.computeIfAbsent(srvcId, c -> new 
ArrayList<>()).add(e);
+                }
+            });
+        }
+
+        createAndSendSingleDeploymentsMessage(depId, depErrors);
+    }
+
+    /**
+     * Prepares the coordinator to manage deployment process.
+     *
+     * @param topVer Topology version to initialize {@link #remaining} 
collection.
+     */
+    private void initCoordinator(AffinityTopologyVersion topVer) {
+        synchronized (initCrdMux) {
+            if (initCrdFut.isDone())
+                return;
+
+            try {
+                for (ClusterNode node : ctx.discovery().nodes(topVer)) {
+                    if (ctx.discovery().alive(node) && 
!singleDepsMsgs.containsKey(node.id()))
+                        remaining.add(node.id());
+                }
+            }
+            catch (Exception e) {
+                log.error("Error occurred while initializing remaining 
collection.", e);
+
+                initCrdFut.onDone(e);
+            }
+            finally {
+                if (!initCrdFut.isDone())
+                    initCrdFut.onDone();
+            }
+        }
+    }
+
+    /**
+     * @param depId Deployment process id.
+     * @param errors Deployment errors.
+     */
+    private void 
createAndSendSingleDeploymentsMessage(ServiceDeploymentProcessId depId,
+        final Map<IgniteUuid, Collection<Throwable>> errors) {
+        assert crdId != null : "Coordinator should be defined at this point, 
locId=" + ctx.localNodeId();
+
+        try {
+            Set<IgniteUuid> depServicesIds = new HashSet<>();
+
+            if (evt.type() == EVT_NODE_JOINED) {
+                UUID evtNodeId = evt.eventNode().id();
+
+                expDeps.forEach((srvcId, top) -> {
+                    if (top.containsKey(evtNodeId))
+                        depServicesIds.add(srvcId);
+                });
+            }
+            else
+                depServicesIds.addAll(expDeps.keySet());
+
+            Map<IgniteUuid, ServiceSingleNodeDeploymentResult> results = new 
HashMap<>();
+
+            for (IgniteUuid srvcId : depServicesIds) {
+                ServiceSingleNodeDeploymentResult depRes = new 
ServiceSingleNodeDeploymentResult(
+                    srvcProc.localInstancesCount(srvcId));
+
+                attachDeploymentErrors(depRes, errors.get(srvcId));
+
+                results.put(srvcId, depRes);
+            }
+
+            errors.forEach((srvcId, err) -> {
+                if (results.containsKey(srvcId))
+                    return;
+
+                ServiceSingleNodeDeploymentResult depRes = new 
ServiceSingleNodeDeploymentResult(
+                    srvcProc.localInstancesCount(srvcId));
+
+                attachDeploymentErrors(depRes, err);
+
+                results.put(srvcId, depRes);
+            });
+
+            ServiceSingleNodeDeploymentResultBatch msg = new 
ServiceSingleNodeDeploymentResultBatch(depId, results);
+
+            if (ctx.localNodeId().equals(crdId))
+                onReceiveSingleDeploymentsMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().sendToGridTopic(crdId, TOPIC_SERVICES, msg, 
SERVICE_POOL);
+
+            if (log.isDebugEnabled())
+                log.debug("Send services single deployments message, msg=" + 
msg);
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send services single deployments message to 
coordinator over communication spi.", e);
+        }
+    }
+
+    /**
+     * Handles received single node services map message.
+     *
+     * @param snd Sender node id.
+     * @param msg Single services map message.
+     */
+    protected void onReceiveSingleDeploymentsMessage(UUID snd, 
ServiceSingleNodeDeploymentResultBatch msg) {
+        assert depId.equals(msg.deploymentId()) : "Wrong message's deployment 
process id, msg=" + msg;
+
+        initCrdFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            synchronized (initCrdMux) {
+                if (remaining.remove(snd)) {
+                    singleDepsMsgs.put(snd, msg);
+
+                    if (remaining.isEmpty())
+                        onAllReceived();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Unexpected service single deployments received, 
msg=" + msg);
+            }
+        });
+    }
+
+    /**
+     * Handles received full services map message.
+     *
+     * @param msg Full services map message.
+     */
+    protected void 
onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBatch msg) {
+        assert depId.equals(msg.deploymentId()) : "Wrong message's deployment 
process id, msg=" + msg;
+
+        initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            ctx.closure().runLocalSafe(() -> {
+                try {
+                    ServiceDeploymentActions depResults = 
msg.servicesDeploymentActions();
+
+                    assert depResults != null : "Services deployment actions 
should be attached.";
+
+                    final Map<IgniteUuid, Map<UUID, Integer>> fullTops = 
depResults.deploymentTopologies();
+                    final Map<IgniteUuid, Collection<byte[]>> fullErrors = 
depResults.deploymentErrors();
+
+                    depActions.deploymentTopologies(fullTops);
+                    depActions.deploymentErrors(fullErrors);
+
+                    srvcProc.updateServicesTopologies(fullTops);
+
+                    final Map<IgniteUuid, ServiceInfo> services = 
srvcProc.deployedServices();
+
+                    fullTops.forEach((srvcId, top) -> {
+                        Integer expCnt = top.getOrDefault(ctx.localNodeId(), 
0);
+
+                        if (expCnt < srvcProc.localInstancesCount(srvcId)) { 
// Undeploy exceed instances
+                            ServiceInfo desc = services.get(srvcId);
+
+                            assert desc != null;
+
+                            ServiceConfiguration cfg = desc.configuration();
+
+                            try {
+                                srvcProc.redeploy(srvcId, cfg, top);
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("Error occured during cancel exceed 
service instances: " +
+                                    "[srvcId=" + srvcId + ", name=" + 
desc.name() + ']', e);
+                            }
+                        }
+                    });
+
+                    completeSuccess();
+                }
+                catch (Throwable t) {
+                    log.error("Failed to process services full deployments 
message, msg=" + msg, t);
+
+                    completeError(t);
+                }
+            });
+        });
+    }
+
+    /**
+     * Completes initiating futures.
+     *
+     * @param err Error to complete initiating.
+     */
+    private void completeInitiatingFuture(final Throwable err) {
+        if (depActions == null)
+            return;
+
+        depActions.servicesToDeploy().forEach((srvcId, desc) -> {
+            if (err != null) {
+                srvcProc.completeInitiatingFuture(true, srvcId, err);
+
+                return;
+            }
+
+            Collection<byte[]> errors = 
depActions.deploymentErrors().get(srvcId);
+
+            if (errors == null) {
+                srvcProc.completeInitiatingFuture(true, srvcId, null);
+
+                return;
+            }
+
+            Throwable depErr = null;
+
+            for (byte[] error : errors) {
+                try {
+                    Throwable t = U.unmarshal(ctx, error, null);
+
+                    if (depErr == null)
+                        depErr = t;
+                    else
+                        depErr.addSuppressed(t);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to unmarshal deployment error.", e);
+                }
+            }
+
+            srvcProc.completeInitiatingFuture(true, srvcId, depErr);
+        });
+
+        for (IgniteUuid reqSrvcId : depActions.servicesToUndeploy().keySet())
+            srvcProc.completeInitiatingFuture(false, reqSrvcId, err);
+    }
+
+    /**
+     * Creates services full deployments message and send it over discovery.
+     */
+    private void onAllReceived() {
+        assert !isCompleted();
+
+        Collection<ServiceClusterDeploymentResult> fullResults = 
buildFullDeploymentsResults(singleDepsMsgs);
+
+        try {
+            ServiceClusterDeploymentResultBatch msg = new 
ServiceClusterDeploymentResultBatch(depId, fullResults);
+
+            ctx.discovery().sendCustomEvent(msg);
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send services full deployments message across 
the ring.", e);
+        }
+    }
+
+    /**
+     * Reassigns service to nodes.
+     *
+     * @param cfg Service configuration.
+     * @param topVer Topology version.
+     * @param oldTop Previous topology snapshot.
+     * @throws IgniteCheckedException In case of an error.
+     */
+    private Map<UUID, Integer> reassign(IgniteUuid srvcId, 
ServiceConfiguration cfg,
+        AffinityTopologyVersion topVer, TreeMap<UUID, Integer> oldTop) throws 
IgniteCheckedException {
+        try {
+            Map<UUID, Integer> top = srvcProc.reassign(srvcId, cfg, topVer, 
oldTop);
+
+            if (top.isEmpty())
+                throw new IgniteCheckedException("Failed to determine suitable 
nodes to deploy service.");
+
+            if (log.isDebugEnabled())
+                log.debug("Calculated service assignment : [srvcId=" + srvcId 
+ ", srvcTop=" + top + ']');
+
+            return top;
+        }
+        catch (Throwable e) {
+            throw new IgniteCheckedException("Failed to calculate assignments 
for service, cfg=" + cfg, e);
+        }
+    }
+
+    /**
+     * Filters dead nodes from given service topology snapshot using given ids.
+     *
+     * @param evtTopNodes Ids being used to filter.
+     * @param top Service topology snapshot.
+     * @return Filtered service topology snapshot.
+     */
+    private TreeMap<UUID, Integer> filterDeadNodes(Collection<UUID> 
evtTopNodes, Map<UUID, Integer> top) {
+        TreeMap<UUID, Integer> filtered = new TreeMap<>();
+
+        if (F.isEmpty(top))
+            return filtered;
+
+        top.forEach((nodeId, cnt) -> {
+            // We can't just use 'ctx.discovery().alive(UUID)', because during 
the deployment process discovery
+            // topology may be changed and results may be different on some 
set of nodes.
+            if (evtTopNodes.contains(nodeId))
+                filtered.put(nodeId, cnt);
+        });
+
+        return filtered;
+    }
+
+    /**
+     * Processes single deployments messages to build full deployment results.
+     *
+     * @param singleDepsMsgs Services single deployments messages.
+     * @return Services full deployments results.
+     */
+    private Collection<ServiceClusterDeploymentResult> 
buildFullDeploymentsResults(
+        Map<UUID, ServiceSingleNodeDeploymentResultBatch> singleDepsMsgs) {
+        final Map<IgniteUuid, Map<UUID, ServiceSingleNodeDeploymentResult>> 
singleResults = new HashMap<>();
+
+        singleDepsMsgs.forEach((nodeId, msg) -> msg.results().forEach((srvcId, 
res) -> {
+            Map<UUID, ServiceSingleNodeDeploymentResult> depResults = 
singleResults
+                .computeIfAbsent(srvcId, r -> new HashMap<>());
+
+            int cnt = res.count();
+
+            if (cnt != 0) {
+                Map<UUID, Integer> expTop = expDeps.get(srvcId);
+
+                if (expTop != null) {
+                    Integer expCnt = expTop.get(nodeId);
+
+                    cnt = expCnt == null ? 0 : Math.min(cnt, expCnt);
+                }
+            }
+
+            if (cnt == 0 && res.errors().isEmpty())
+                return;
+
+            ServiceSingleNodeDeploymentResult singleDepRes = new 
ServiceSingleNodeDeploymentResult(cnt);
+
+            if (!res.errors().isEmpty())
+                singleDepRes.errors(res.errors());
+
+            depResults.put(nodeId, singleDepRes);
+        }));
+
+        final Collection<ServiceClusterDeploymentResult> fullResults = new 
ArrayList<>();
+
+        singleResults.forEach((srvcId, dep) -> {
+            ServiceClusterDeploymentResult res = new 
ServiceClusterDeploymentResult(srvcId, dep);
+
+            fullResults.add(res);
+        });
+
+        return fullResults;
+    }
+
+    /**
+     * @param depRes Service single deployments results.
+     * @param errors Deployment errors.
+     */
+    private void attachDeploymentErrors(@NotNull 
ServiceSingleNodeDeploymentResult depRes,
+        @Nullable Collection<Throwable> errors) {
+        if (F.isEmpty(errors))
+            return;
+
+        Collection<byte[]> errorsBytes = new ArrayList<>();
+
+        for (Throwable th : errors) {
+            try {
+                byte[] arr = U.marshal(ctx, th);
+
+                errorsBytes.add(arr);
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to marshal deployment error, err=" + th, e);
+            }
+        }
+
+        depRes.errors(errorsBytes);
+    }
+
+    /**
+     * Handles a node leaves topology.
+     *
+     * @param nodeId Left node id.
+     */
+    protected void onNodeLeft(UUID nodeId) {
+        initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+            if (isCompleted())
+                return;
+
+            final boolean crdChanged = nodeId.equals(crdId);
+
+            if (crdChanged) {
+                ClusterNode crd = srvcProc.coordinator();
+
+                if (crd != null) {
+                    crdId = crd.id();
+
+                    if (crd.isLocal())
+                        initCoordinator(evtTopVer);
+
+                    createAndSendSingleDeploymentsMessage(depId, depErrors);
+                }
+                else
+                    onAllServersLeft();
+            }
+            else if (ctx.localNodeId().equals(crdId)) {
+                synchronized (initCrdMux) {
+                    boolean rmvd = remaining.remove(nodeId);
+
+                    if (rmvd && remaining.isEmpty()) {
+                        singleDepsMsgs.remove(nodeId);
+
+                        onAllReceived();
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Handles case when all server nodes have left the grid.
+     */
+    private void onAllServersLeft() {
+        assert ctx.clientNode();
+
+        completeError(new ClusterTopologyServerNotFoundException("Failed to 
resolve coordinator to continue services " +
+            "deployment process: [locId=" + ctx.localNodeId() + "client=" + 
ctx.clientNode() + "evt=" + evt + ']'));
+    }
+
+    /**
+     * @return Cause discovery event.
+     */
+    public DiscoveryEvent event() {
+        return evt;
+    }
+
+    /**
+     * Returns cause of deployment process topology version.
+     *
+     * @return Cause of deployment process topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return evtTopVer;
+    }
+
+    /**
+     * Returns services deployment process id of the task.
+     *
+     * @return Services deployment process id.
+     */
+    public ServiceDeploymentProcessId deploymentId() {
+        return depId;
+    }
+
+    /**
+     * Completes the task.
+     */
+    public void completeSuccess() {
+        if (!completeStateFut.isDone()) {
+            completeInitiatingFuture(null);
+
+            completeStateFut.onDone();
+        }
+
+        if (!initTaskFut.isDone())
+            initTaskFut.onDone();
+
+        if (!initCrdFut.isDone())
+            initCrdFut.onDone();
+    }
+
+    /**
+     * @param err Error to complete with.
+     */
+    public void completeError(Throwable err) {
+        if (!completeStateFut.isDone()) {
+            completeInitiatingFuture(err);
+
+            completeStateFut.onDone(err);
+        }
+
+        if (!initTaskFut.isDone())
+            initTaskFut.onDone(err);
+
+        if (!initCrdFut.isDone())
+            initCrdFut.onDone(err);
+    }
+
+    /**
+     * Returns if the task completed.
+     *
+     * @return {@code true} if the task completed, otherwise {@code false}.
+     */
+    protected boolean isCompleted() {
+        return completeStateFut.isDone();
+    }
+
+    /**
+     * Synchronously waits for completion of the task for up to the given 
timeout.
+     *
+     * @param timeout The maximum time to wait in milliseconds.
+     * @throws IgniteCheckedException In case of an error.
+     */
+    protected void waitForComplete(long timeout) throws IgniteCheckedException 
{
+        completeStateFut.get(timeout);
+    }
+
+    /**
+     * Handles when this task is being added in deployment queue.
+     * <p/>
+     * Introduced to avoid overhead on calling of {@link 
Collection#contains(Object)}}.
+     *
+     * @return {@code true} if task is has not been added previously, 
otherwise {@code false}.
+     */
+    protected boolean onEnqueued() {
+        return addedInQueue.compareAndSet(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ServiceDeploymentTask task = (ServiceDeploymentTask)o;
+
+        return depId.equals(task.depId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return depId.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceDeploymentTask.class, this,
+            "locNodeId", (ctx != null ? ctx.localNodeId() : "unknown"),
+            "crdId", crdId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
index 4db44cb..2e14b94 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDescriptorImpl.java
@@ -29,7 +29,11 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Service descriptor.
+ *
+ * @deprecated This implementation is based on {@code GridServiceDeployment} 
which has been deprecated because of
+ * services internals use messages for deployment management instead of the 
utility cache, since Ignite 2.8.
  */
+@Deprecated
 public class ServiceDescriptorImpl implements ServiceDescriptor {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
new file mode 100644
index 0000000..5a3aefc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceInfo.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDescriptor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Service's information container.
+ */
+public class ServiceInfo implements ServiceDescriptor {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Origin node ID. */
+    private final UUID originNodeId;
+
+    /** Service id. */
+    private final IgniteUuid srvcId;
+
+    /** Service configuration. */
+    private final ServiceConfiguration cfg;
+
+    /** Statically configured flag. */
+    private final boolean staticCfg;
+
+    /** Topology snapshot. */
+    @GridToStringInclude
+    private volatile Map<UUID, Integer> top = Collections.emptyMap();
+
+    /**
+     * @param originNodeId Initiating node id.
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     */
+    public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, 
@NotNull ServiceConfiguration cfg) {
+        this(originNodeId, srvcId, cfg, false);
+    }
+
+    /**
+     * @param originNodeId Initiating node id.
+     * @param srvcId Service id.
+     * @param cfg Service configuration.
+     * @param staticCfg Statically configured flag.
+     */
+    public ServiceInfo(@NotNull UUID originNodeId, @NotNull IgniteUuid srvcId, 
@NotNull ServiceConfiguration cfg,
+        boolean staticCfg) {
+        this.originNodeId = originNodeId;
+        this.srvcId = srvcId;
+        this.cfg = cfg;
+        this.staticCfg = staticCfg;
+    }
+
+    /**
+     * Sets service's new topology snapshot.
+     *
+     * @param top Topology snapshot.
+     */
+    public void topologySnapshot(@NotNull Map<UUID, Integer> top) {
+        this.top = top;
+    }
+
+    /**
+     * Returns service's configuration.
+     *
+     * @return Service configuration.
+     */
+    public ServiceConfiguration configuration() {
+        return cfg;
+    }
+
+    /**
+     * @return {@code true} if statically configured.
+     */
+    public boolean staticallyConfigured() {
+        return staticCfg;
+    }
+
+    /**
+     * Rerurns services id.
+     *
+     * @return Service id.
+     */
+    public IgniteUuid serviceId() {
+        return srvcId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return cfg.getName();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Class<? extends Service> serviceClass() {
+        if (cfg instanceof LazyServiceConfiguration) {
+            String clsName = 
((LazyServiceConfiguration)cfg).serviceClassName();
+
+            try {
+                return (Class<? extends Service>)Class.forName(clsName);
+            }
+            catch (ClassNotFoundException e) {
+                throw new IgniteException("Failed to find service class: " + 
clsName, e);
+            }
+        }
+        else
+            return cfg.getService().getClass();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int totalCount() {
+        return cfg.getTotalCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int maxPerNodeCount() {
+        return cfg.getMaxPerNodeCount();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String cacheName() {
+        return cfg.getCacheName();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Nullable @Override public <K> K affinityKey() {
+        return (K)cfg.getAffinityKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID originNodeId() {
+        return originNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<UUID, Integer> topologySnapshot() {
+        return Collections.unmodifiableMap(top);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
new file mode 100644
index 0000000..2fea452
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorAdapter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceDescriptor;
+
+/**
+ * Adapter for different service processor implementations.
+ */
+public abstract class ServiceProcessorAdapter extends GridProcessorAdapter {
+    /**
+     * @param ctx Kernal context.
+     */
+    protected ServiceProcessorAdapter(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * @param prj Grid projection.
+     * @param name Service name.
+     * @param srvc Service.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployNodeSingleton(ClusterGroup 
prj, String name, Service srvc);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service instance.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> 
deployClusterSingleton(ClusterGroup prj, String name, Service srvc);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service.
+     * @param totalCnt Total count.
+     * @param maxPerNodeCnt Max per-node count.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployMultiple(ClusterGroup prj, 
String name, Service srvc, int totalCnt,
+        int maxPerNodeCnt);
+
+    /**
+     * @param name Service name.
+     * @param srvc Service.
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> deployKeyAffinitySingleton(String 
name, Service srvc, String cacheName,
+        Object affKey);
+
+    /**
+     * @param prj Grid projection.
+     * @param cfgs Service configurations.
+     * @return Future for deployment.
+     */
+    public abstract IgniteInternalFuture<?> deployAll(ClusterGroup prj, 
Collection<ServiceConfiguration> cfgs);
+
+    /**
+     * @param name Service name.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancel(String name);
+
+    /**
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancelAll();
+
+    /**
+     * @param servicesNames Name of services to deploy.
+     * @return Future.
+     */
+    public abstract IgniteInternalFuture<?> cancelAll(Collection<String> 
servicesNames);
+
+    /**
+     * @return Collection of service descriptors.
+     */
+    public abstract Collection<ServiceDescriptor> serviceDescriptors();
+
+    /**
+     * @param name Service name.
+     * @param <T> Service type.
+     * @return Service by specified service name.
+     */
+    public abstract <T> T service(String name);
+
+    /**
+     * @param prj Grid projection.
+     * @param name Service name.
+     * @param srvcCls Service class.
+     * @param sticky Whether multi-node request should be done.
+     * @param timeout If greater than 0 limits service acquire time. Cannot be 
negative.
+     * @param <T> Service interface type.
+     * @return The proxy of a service by its name and class.
+     * @throws IgniteException If failed to create proxy.
+     */
+    public abstract <T> T serviceProxy(ClusterGroup prj, String name, Class<? 
super T> srvcCls, boolean sticky,
+        long timeout) throws IgniteException;
+
+    /**
+     * @param name Service name.
+     * @param <T> Service type.
+     * @return Services by specified service name.
+     */
+    public abstract <T> Collection<T> services(String name);
+
+    /**
+     * @param name Service name.
+     * @return Service by specified service name.
+     */
+    public abstract ServiceContextImpl serviceContext(String name);
+
+    /**
+     * @param name Service name.
+     * @param timeout If greater than 0 limits task execution time. Cannot be 
negative.
+     * @return Service topology.
+     * @throws IgniteCheckedException On error.
+     */
+    public abstract Map<UUID, Integer> serviceTopology(String name, long 
timeout) throws IgniteCheckedException;
+
+    /**
+     * Callback for local join events for which the regular events are not 
generated.
+     * <p/>
+     * Local join event is expected in cases of joining to topology or client 
reconnect.
+     *
+     * @param evt Discovery event.
+     * @param discoCache Discovery cache.
+     */
+    public void onLocalJoin(DiscoveryEvent evt, DiscoCache discoCache) {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java
new file mode 100644
index 0000000..7f9fc83
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorCommonDiscoveryData.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Initial data container to be sent to newly joining node for initialization 
of {@link IgniteServiceProcessor}.
+ */
+class ServiceProcessorCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Clusters registered services descriptors. */
+    private final ArrayList<ServiceInfo> registeredServices;
+
+    /**
+     * @param registeredServices Clusters registered services descriptors.
+     */
+    public ServiceProcessorCommonDiscoveryData(@NotNull ArrayList<ServiceInfo> 
registeredServices) {
+        this.registeredServices = registeredServices;
+    }
+
+    /**
+     * Returns clusters registered services descriptors.
+     *
+     * @return Registered services descriptors.
+     */
+    public ArrayList<ServiceInfo> registeredServices() {
+        return registeredServices;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceProcessorCommonDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java
new file mode 100644
index 0000000..5a8473d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceProcessorJoinNodeDiscoveryData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Initial data of {@link IgniteServiceProcessor} to send in cluster on 
joining node.
+ */
+public class ServiceProcessorJoinNodeDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Static services configurations info. */
+    public final ArrayList<ServiceInfo> staticServicesInfo;
+
+    /**
+     * @param staticServicesInfo Static services configurations info.
+     */
+    public ServiceProcessorJoinNodeDiscoveryData(@NotNull 
ArrayList<ServiceInfo> staticServicesInfo) {
+        this.staticServicesInfo = staticServicesInfo;
+    }
+
+    /**
+     * @return Static services configurations info.
+     */
+    public ArrayList<ServiceInfo> services() {
+        return staticServicesInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceProcessorJoinNodeDiscoveryData.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/62c560a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java
new file mode 100644
index 0000000..1f003d9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.BYTE_ARR;
+
+/**
+ * Service single node deployment result.
+ * <p/>
+ * Contains count of deployed service instances on single node and deployment 
errors if exist.
+ */
+public class ServiceSingleNodeDeploymentResult implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Count of service's instances. */
+    private int cnt;
+
+    /** Serialized exceptions. */
+    private Collection<byte[]> errors;
+
+    /**
+     * Empty constructor for marshalling purposes.
+     */
+    public ServiceSingleNodeDeploymentResult() {
+    }
+
+    /**
+     * @param cnt Count of service's instances.
+     */
+    public ServiceSingleNodeDeploymentResult(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Count of service's instances.
+     */
+    public int count() {
+        return cnt;
+    }
+
+    /**
+     * @param cnt Count of service's instances.
+     */
+    public void count(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Serialized exceptions.
+     */
+    @NotNull public Collection<byte[]> errors() {
+        return errors != null ? errors : Collections.emptyList();
+    }
+
+    /**
+     * @param errors Serialized exceptions.
+     */
+    public void errors(Collection<byte[]> errors) {
+        this.errors = errors;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cnt", cnt))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeCollection("errors", errors, BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cnt = reader.readInt("cnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errors = reader.readCollection("errors", BYTE_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return 
reader.afterMessageRead(ServiceSingleNodeDeploymentResult.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 169;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ServiceSingleNodeDeploymentResult.class, this);
+    }
+}

Reply via email to