http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java new file mode 100644 index 0000000..6fb78b8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -0,0 +1,1846 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; +import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*; + +/** + * Processor for continuous routines. + */ +public class GridContinuousProcessor extends GridProcessorAdapter { + /** Local infos. */ + private final ConcurrentMap<UUID, LocalRoutineInfo> locInfos = new ConcurrentHashMap8<>(); + + /** Remote infos. */ + private final ConcurrentMap<UUID, RemoteRoutineInfo> rmtInfos = new ConcurrentHashMap8<>(); + + /** Start futures. */ + private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>(); + + /** Start ack wait lists. */ + private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>(); + + /** Stop futures. */ + private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>(); + + /** Stop ack wait lists. */ + private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>(); + + /** Threads started by this processor. */ + private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>(); + + /** Pending start requests. */ + private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>(); + + /** */ + private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>(); + + /** Stopped IDs. */ + private final Collection<UUID> stopped = new HashSet<>(); + + /** Lock for pending requests. */ + private final Lock pendingLock = new ReentrantLock(); + + /** Lock for stop process. */ + private final Lock stopLock = new ReentrantLock(); + + /** Delay in milliseconds between retries. */ + private long retryDelay = 1000; + + /** Number of retries using to send messages. */ + private int retryCnt = 3; + + /** Acknowledgement timeout. */ + private long ackTimeout; + + /** Marshaller. */ + private IgniteMarshaller marsh; + + /** + * @param ctx Kernal context. + */ + public GridContinuousProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + retryDelay = ctx.config().getNetworkSendRetryDelay(); + retryCnt = ctx.config().getNetworkSendRetryCount(); + ackTimeout = ctx.config().getNetworkTimeout(); + + if (ackTimeout < retryDelay * retryCnt) { + U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " + + "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" + + ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']'); + + ackTimeout = retryDelay * retryCnt; + } + + marsh = ctx.config().getMarshaller(); + + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @SuppressWarnings({"fallthrough", "TooBroadScope"}) + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteDiscoveryEvent; + + UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); + + Collection<GridContinuousMessage> reqs; + + pendingLock.lock(); + + try { + // Remove pending requests to send to joined node + // (if node is left or failed, they are dropped). + reqs = pending.remove(nodeId); + } + finally { + pendingLock.unlock(); + } + + switch (evt.type()) { + case EVT_NODE_JOINED: + if (reqs != null) { + UUID routineId = null; + + // Send pending requests. + try { + for (GridContinuousMessage req : reqs) { + routineId = req.routineId(); + + sendWithRetries(nodeId, req, null); + } + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send pending start request to node (is node alive?): " + + nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send pending start request to node: " + nodeId, e); + + completeStartFuture(routineId); + } + } + + break; + + case EVT_NODE_LEFT: + case EVT_NODE_FAILED: + // Do not wait for start acknowledgements from left node. + for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) { + Collection<UUID> nodeIds = e.getValue(); + + for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { + if (nodeId.equals(it.next())) { + it.remove(); + + break; + } + } + + if (nodeIds.isEmpty()) + completeStartFuture(e.getKey()); + } + + // Do not wait for stop acknowledgements from left node. + for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) { + Collection<UUID> nodeIds = e.getValue(); + + for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) { + if (nodeId.equals(it.next())) { + it.remove(); + + break; + } + } + + if (nodeIds.isEmpty()) + completeStopFuture(e.getKey()); + } + + // Unregister handlers created by left node. + for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) { + UUID routineId = e.getKey(); + RemoteRoutineInfo info = e.getValue(); + + if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) + unregisterRemote(routineId); + } + + for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); + + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + + if (fut0 != null) { + ClusterTopologyException err = new ClusterTopologyException( + "Node left grid while sending message to: " + nodeId); + + fut0.onDone(err); + } + } + } + + break; + + default: + assert false : "Unexpected event received: " + evt.shortDisplay(); + } + } + }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + + ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object obj) { + GridContinuousMessage msg = (GridContinuousMessage)obj; + + if (msg.data() == null && msg.dataBytes() != null) { + try { + msg.data(marsh.unmarshal(msg.dataBytes(), null)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process message (ignoring): " + msg, e); + + return; + } + } + + switch (msg.type()) { + case MSG_START_REQ: + processStartRequest(nodeId, msg); + + break; + + case MSG_START_ACK: + processStartAck(nodeId, msg); + + break; + + case MSG_STOP_REQ: + processStopRequest(nodeId, msg); + + break; + + case MSG_STOP_ACK: + processStopAck(nodeId, msg); + + break; + + case MSG_EVT_NOTIFICATION: + processNotification(nodeId, msg); + + break; + + case MSG_EVT_ACK: + processMessageAck(msg); + + break; + + default: + assert false : "Unexpected message received: " + msg.type(); + } + } + }); + + if (log.isDebugEnabled()) + log.debug("Continuous processor started."); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + ctx.io().removeMessageListener(TOPIC_CONTINUOUS); + + U.interrupt(threads); + U.joinThreads(threads, log); + + if (log.isDebugEnabled()) + log.debug("Continuous processor stopped."); + } + + /** {@inheritDoc} */ + @Override @Nullable public Object collectDiscoveryData(UUID nodeId) { + if (!nodeId.equals(ctx.localNodeId())) { + pendingLock.lock(); + + try { + // Create empty pending set. + pending.put(nodeId, new HashSet<GridContinuousMessage>()); + + DiscoveryData data = new DiscoveryData(ctx.localNodeId()); + + // Collect listeners information (will be sent to + // joining node during discovery process). + for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + data.addItem(new DiscoveryDataItem(routineId, info.prjPred, + info.hnd, info.bufSize, info.interval)); + } + + return data; + } + finally { + pendingLock.unlock(); + } + } + else + return null; + } + + /** {@inheritDoc} */ + @Override public void onDiscoveryDataReceived(Object obj) { + DiscoveryData data = (DiscoveryData)obj; + + if (!ctx.isDaemon() && data != null) { + for (DiscoveryDataItem item : data.items) { + // Register handler only if local node passes projection predicate. + if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) { + try { + if (ctx.config().isPeerClassLoadingEnabled()) + item.hnd.p2pUnmarshal(data.nodeId, ctx); + + if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, + item.autoUnsubscribe, false)) + item.hnd.onListenerRegistered(item.routineId, ctx); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous handler.", e); + } + } + } + } + } + + /** + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + * @param prjPred Projection predicate. + * @return Future. + */ + @SuppressWarnings("TooBroadScope") + public IgniteFuture<UUID> startRoutine(GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgnitePredicate<ClusterNode> prjPred) { + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + // Whether local node is included in routine. + boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); + + // Generate ID. + final UUID routineId = UUID.randomUUID(); + + StartRequestData reqData = new StartRequestData(prjPred, hnd, bufSize, interval, autoUnsubscribe); + + try { + if (ctx.config().isPeerClassLoadingEnabled()) { + // Handle peer deployment for projection predicate. + if (prjPred != null && !U.isGrid(prjPred.getClass())) { + Class cls = U.detectClass(prjPred); + + String clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentException("Failed to deploy projection predicate: " + prjPred); + + reqData.clsName = clsName; + reqData.depInfo = new GridDeploymentInfoBean(dep); + + reqData.p2pMarshal(marsh); + } + + // Handle peer deployment for other handler-specific objects. + hnd.p2pMarshal(ctx); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + + // Register per-routine notifications listener if ordered messaging is used. + if (hnd.orderedTopic() != null) { + ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object obj) { + GridContinuousMessage msg = (GridContinuousMessage)obj; + + // Only notification can be ordered. + assert msg.type() == MSG_EVT_NOTIFICATION; + + if (msg.data() == null && msg.dataBytes() != null) { + try { + msg.data(marsh.unmarshal(msg.dataBytes(), null)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process message (ignoring): " + msg, e); + + return; + } + } + + processNotification(nodeId, msg); + } + }); + } + + Collection<? extends ClusterNode> nodes; + Collection<UUID> nodeIds; + + pendingLock.lock(); + + try { + // Nodes that participate in routine (request will be sent to these nodes directly). + nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId()))); + + // Stop with exception if projection is empty. + if (nodes.isEmpty() && !locIncluded) { + return new GridFinishedFuture<>(ctx, + new ClusterTopologyException("Failed to register remote continuous listener (projection is empty).")); + } + + // IDs of nodes where request will be sent. + nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())); + + // If there are currently joining nodes, add request to their pending lists. + // Node IDs set is updated to make sure that we wait for acknowledgement from + // these nodes. + for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) { + if (nodeIds.add(e.getKey())) + e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData)); + } + + // Register routine locally. + locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); + } + finally { + pendingLock.unlock(); + } + + StartFuture fut = new StartFuture(ctx, routineId); + + if (!nodeIds.isEmpty()) { + // Wait for acknowledgements. + waitForStartAck.put(routineId, nodeIds); + + startFuts.put(routineId, fut); + + // Register acknowledge timeout (timeout object will be removed when + // future is completed). + fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) { + @Override public void onTimeout() { + // Stop waiting for acknowledgements. + Collection<UUID> ids = waitForStartAck.remove(routineId); + + if (ids != null) { + StartFuture f = startFuts.remove(routineId); + + assert f != null; + + // If there are still nodes without acknowledgements, + // Stop routine with exception. Continue and complete + // future otherwise. + if (!ids.isEmpty()) { + f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " + + "expired): " + ids + ". Will unregister all continuous listeners.")); + + stopRoutine(routineId); + } + else + f.onRemoteRegistered(); + } + } + }); + } + + if (!nodes.isEmpty()) { + // Do not send projection predicate (nodes already filtered). + reqData.prjPred = null; + + // Send start requests. + try { + GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData); + + sendWithRetries(nodes, req, null); + } + catch (IgniteCheckedException e) { + startFuts.remove(routineId); + waitForStartAck.remove(routineId); + + fut.onDone(e); + + stopRoutine(routineId); + + locIncluded = false; + } + } + else { + // There are no remote nodes, but we didn't throw topology exception. + assert locIncluded; + + // Do not wait anything from remote nodes. + fut.onRemoteRegistered(); + } + + // Register local handler if needed. + if (locIncluded) { + try { + if (registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) + hnd.onListenerRegistered(routineId, ctx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, + new IgniteCheckedException("Failed to register handler locally: " + hnd, e)); + } + } + + // Handler is registered locally. + fut.onLocalRegistered(); + + return fut; + } + + /** + * @param routineId Consume ID. + * @return Future. + */ + public IgniteFuture<?> stopRoutine(UUID routineId) { + assert routineId != null; + + boolean doStop = false; + + StopFuture fut = stopFuts.get(routineId); + + // Only one thread will stop routine with provided ID. + if (fut == null) { + StopFuture old = stopFuts.putIfAbsent(routineId, fut = new StopFuture(ctx)); + + if (old != null) + fut = old; + else + doStop = true; + } + + if (doStop) { + // Unregister routine locally. + LocalRoutineInfo routine = locInfos.remove(routineId); + + // Finish if routine is not found (wrong ID is provided). + if (routine == null) { + stopFuts.remove(routineId); + + fut.onDone(); + + return fut; + } + + // Unregister handler locally. + unregisterHandler(routineId, routine.hnd, true); + + pendingLock.lock(); + + try { + // Remove pending requests for this routine. + for (Collection<GridContinuousMessage> msgs : pending.values()) { + Iterator<GridContinuousMessage> it = msgs.iterator(); + + while (it.hasNext()) { + if (it.next().routineId().equals(routineId)) + it.remove(); + } + } + } + finally { + pendingLock.unlock(); + } + + // Nodes where to send stop requests. + Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(), + F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId()))); + + if (!nodes.isEmpty()) { + // Wait for acknowledgements. + waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()))); + + // Register acknowledge timeout (timeout object will be removed when + // future is completed). + fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId, + new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null))); + + // Send stop requests. + try { + for (ClusterNode node : nodes) { + try { + sendWithRetries(node.id(), + new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null), + null); + } + catch (ClusterTopologyException ignored) { + U.warn(log, "Failed to send stop request (node left topology): " + node.id()); + } + } + } + catch (IgniteCheckedException e) { + stopFuts.remove(routineId); + waitForStopAck.remove(routineId); + + fut.onDone(e); + } + } + else { + stopFuts.remove(routineId); + + fut.onDone(); + } + } + + return fut; + } + + /** + * @param nodeId ID of the node that started routine. + * @param routineId Routine ID. + * @param obj Notification object. + * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. + * @param sync If {@code true} then waits for event acknowledgment. + * @throws IgniteCheckedException In case of error. + */ + public void addNotification(UUID nodeId, + UUID routineId, + @Nullable Object obj, + @Nullable Object orderedTopic, + boolean sync) + throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + + assert !nodeId.equals(ctx.localNodeId()); + + RemoteRoutineInfo info = rmtInfos.get(routineId); + + if (info != null) { + assert info.interval == 0 || !sync; + + if (sync) { + SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId); + + IgniteUuid futId = IgniteUuid.randomUuid(); + + syncMsgFuts.put(futId, fut); + + try { + sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic); + } + catch (IgniteCheckedException e) { + syncMsgFuts.remove(futId); + + throw e; + } + + fut.get(); + } + else { + Collection<Object> toSnd = info.add(obj); + + if (toSnd != null) + sendNotification(nodeId, routineId, null, toSnd, orderedTopic); + } + } + } + + /** + * @param nodeId Node ID. + * @param routineId Routine ID. + * @param futId Future ID. + * @param toSnd Notification object to send. + * @param orderedTopic Topic for ordered notifications. + * If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + private void sendNotification(UUID nodeId, + UUID routineId, + @Nullable IgniteUuid futId, + Collection<Object> toSnd, + @Nullable Object orderedTopic) throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + assert toSnd != null; + assert !toSnd.isEmpty(); + + sendWithRetries(nodeId, new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd), orderedTopic); + } + + /** + * @param nodeId Sender ID. + * @param req Start request. + */ + private void processStartRequest(UUID nodeId, GridContinuousMessage req) { + assert nodeId != null; + assert req != null; + + UUID routineId = req.routineId(); + StartRequestData data = req.data(); + + GridContinuousHandler hnd = data.hnd; + + IgniteCheckedException err = null; + + try { + if (ctx.config().isPeerClassLoadingEnabled()) { + String clsName = data.clsName; + + if (clsName != null) { + GridDeploymentInfo depInfo = data.depInfo; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentException("Failed to obtain deployment for class: " + clsName); + + data.p2pUnmarshal(marsh, dep.classLoader()); + } + + hnd.p2pUnmarshal(nodeId, ctx); + } + } + catch (IgniteCheckedException e) { + err = e; + + U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + } + + boolean registered = false; + + if (err == null) { + try { + IgnitePredicate<ClusterNode> prjPred = data.prjPred; + + if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { + registered = registerHandler(nodeId, routineId, hnd, data.bufSize, data.interval, + data.autoUnsubscribe, false); + } + } + catch (IgniteCheckedException e) { + err = e; + + U.error(log, "Failed to register handler [nodeId=" + nodeId + ", routineId=" + routineId + ']', e); + } + } + + try { + sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err), null); + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e); + } + + if (registered) + hnd.onListenerRegistered(routineId, ctx); + } + + /** + * @param nodeId Sender ID. + * @param ack Start acknowledgement. + */ + private void processStartAck(UUID nodeId, GridContinuousMessage ack) { + assert nodeId != null; + assert ack != null; + + UUID routineId = ack.routineId(); + + final IgniteCheckedException err = ack.data(); + + if (err != null) { + if (waitForStartAck.remove(routineId) != null) { + final StartFuture fut = startFuts.remove(routineId); + + if (fut != null) { + fut.onDone(err); + + stopRoutine(routineId); + } + } + } + + Collection<UUID> nodeIds = waitForStartAck.get(routineId); + + if (nodeIds != null) { + nodeIds.remove(nodeId); + + if (nodeIds.isEmpty()) + completeStartFuture(routineId); + } + } + + /** + * @param nodeId Sender ID. + * @param req Stop request. + */ + private void processStopRequest(UUID nodeId, GridContinuousMessage req) { + assert nodeId != null; + assert req != null; + + UUID routineId = req.routineId(); + + unregisterRemote(routineId); + + try { + sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null), null); + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e); + } + } + + /** + * @param nodeId Sender ID. + * @param ack Stop acknowledgement. + */ + private void processStopAck(UUID nodeId, GridContinuousMessage ack) { + assert nodeId != null; + assert ack != null; + + UUID routineId = ack.routineId(); + + Collection<UUID> nodeIds = waitForStopAck.get(routineId); + + if (nodeIds != null) { + nodeIds.remove(nodeId); + + if (nodeIds.isEmpty()) + completeStopFuture(routineId); + } + } + + /** + * @param msg Message. + */ + private void processMessageAck(GridContinuousMessage msg) { + assert msg.futureId() != null; + + SyncMessageAckFuture fut = syncMsgFuts.remove(msg.futureId()); + + if (fut != null) + fut.onDone(); + } + + /** + * @param nodeId Sender ID. + * @param msg Message. + */ + private void processNotification(UUID nodeId, GridContinuousMessage msg) { + assert nodeId != null; + assert msg != null; + + UUID routineId = msg.routineId(); + + try { + LocalRoutineInfo routine = locInfos.get(routineId); + + if (routine != null) + routine.hnd.notifyCallback(nodeId, routineId, (Collection<?>)msg.data(), ctx); + } + finally { + if (msg.futureId() != null) { + try { + sendWithRetries(nodeId, + new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null), + null); + } + catch (IgniteCheckedException e) { + log.error("Failed to send event acknowledgment to node: " + nodeId, e); + } + } + } + } + + /** + * @param routineId Consume ID. + */ + private void completeStartFuture(UUID routineId) { + assert routineId != null; + + if (waitForStartAck.remove(routineId) != null) { + StartFuture fut = startFuts.remove(routineId); + + assert fut != null; + + fut.onRemoteRegistered(); + } + } + + /** + * @param routineId Consume ID. + */ + private void completeStopFuture(UUID routineId) { + assert routineId != null; + + if (waitForStopAck.remove(routineId) != null) { + GridFutureAdapter <?> fut = stopFuts.remove(routineId); + + assert fut != null; + + fut.onDone(); + } + } + + /** + * @param nodeId Node ID. + * @param routineId Consume ID. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + * @param loc Local registration flag. + * @return Whether listener was actually registered. + * @throws IgniteCheckedException In case of error. + */ + private boolean registerHandler(final UUID nodeId, + final UUID routineId, + final GridContinuousHandler hnd, + int bufSize, + final long interval, + boolean autoUnsubscribe, + boolean loc) throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + final RemoteRoutineInfo info = new RemoteRoutineInfo(nodeId, hnd, bufSize, interval, autoUnsubscribe); + + boolean doRegister = loc; + + if (!doRegister) { + stopLock.lock(); + + try { + doRegister = !stopped.remove(routineId) && rmtInfos.putIfAbsent(routineId, info) == null; + } + finally { + stopLock.unlock(); + } + } + + if (doRegister) { + if (interval > 0) { + IgniteThread checker = new IgniteThread(new GridWorker(ctx.gridName(), "continuous-buffer-checker", log) { + @SuppressWarnings("ConstantConditions") + @Override protected void body() { + long interval0 = interval; + + while (!isCancelled()) { + try { + U.sleep(interval0); + } + catch (IgniteInterruptedException ignored) { + break; + } + + IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval(); + + Collection<Object> toSnd = t.get1(); + + if (toSnd != null) { + try { + sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic()); + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send notification to node (is node alive?): " + nodeId); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send notification to node: " + nodeId, e); + } + } + + interval0 = t.get2(); + } + } + }); + + threads.add(checker); + + checker.start(); + } + + return hnd.register(nodeId, routineId, ctx); + } + + return false; + } + + /** + * @param routineId Routine ID. + * @param hnd Handler + * @param loc If Handler unregistered on master node. + */ + private void unregisterHandler(UUID routineId, GridContinuousHandler hnd, boolean loc) { + assert routineId != null; + assert hnd != null; + + if (loc && hnd.orderedTopic() != null) + ctx.io().removeMessageListener(hnd.orderedTopic()); + + hnd.unregister(routineId, ctx); + } + + /** + * @param routineId Routine ID. + */ + @SuppressWarnings("TooBroadScope") + private void unregisterRemote(UUID routineId) { + RemoteRoutineInfo info; + + stopLock.lock(); + + try { + info = rmtInfos.remove(routineId); + + if (info == null) + stopped.add(routineId); + } + finally { + stopLock.unlock(); + } + + if (info != null) + unregisterHandler(routineId, info.hnd, false); + } + + /** + * @param nodeId Destination node ID. + * @param msg Message. + * @param orderedTopic Topic for ordered notifications. + * If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic) + throws IgniteCheckedException { + assert nodeId != null; + assert msg != null; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node != null) + sendWithRetries(node, msg, orderedTopic); + else + throw new ClusterTopologyException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId); + } + + /** + * @param node Destination node. + * @param msg Message. + * @param orderedTopic Topic for ordered notifications. + * If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic) + throws IgniteCheckedException { + assert node != null; + assert msg != null; + + sendWithRetries(F.asList(node), msg, orderedTopic); + } + + /** + * @param nodes Destination nodes. + * @param msg Message. + * @param orderedTopic Topic for ordered notifications. + * If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg, + @Nullable Object orderedTopic) throws IgniteCheckedException { + assert !F.isEmpty(nodes); + assert msg != null; + + if (msg.data() != null && (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id()))) + msg.dataBytes(marsh.marshal(msg.data())); + + boolean first = true; + + for (ClusterNode node : nodes) { + msg = first ? msg : (GridContinuousMessage)msg.clone(); + + first = false; + + int cnt = 0; + + while (cnt <= retryCnt) { + try { + cnt++; + + if (orderedTopic != null) { + ctx.io().sendOrderedMessage( + node, + orderedTopic, + ctx.io().nextMessageId(orderedTopic, node.id()), + msg, + SYSTEM_POOL, + 0, + true); + } + else + ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL); + + break; + } + catch (IgniteInterruptedException e) { + throw e; + } + catch (IgniteCheckedException e) { + if (!ctx.discovery().alive(node.id())) + throw new ClusterTopologyException("Node left grid while sending message to: " + node.id(), e); + + if (cnt == retryCnt) + throw e; + else if (log.isDebugEnabled()) + log.debug("Failed to send message to node (will retry): " + node.id()); + } + + U.sleep(retryDelay); + } + } + } + + /** + * Local routine info. + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class LocalRoutineInfo { + /** Projection predicate. */ + private final IgnitePredicate<ClusterNode> prjPred; + + /** Continuous routine handler. */ + private final GridContinuousHandler hnd; + + /** Buffer size. */ + private final int bufSize; + + /** Time interval. */ + private final long interval; + + /** + * @param prjPred Projection predicate. + * @param hnd Continuous routine handler. + * @param bufSize Buffer size. + * @param interval Interval. + */ + LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize, + long interval) { + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + } + + /** + * @return Handler. + */ + GridContinuousHandler handler() { + return hnd; + } + } + + /** + * Remote routine info. + */ + private static class RemoteRoutineInfo { + /** Master node ID. */ + private final UUID nodeId; + + /** Continuous routine handler. */ + private final GridContinuousHandler hnd; + + /** Buffer size. */ + private final int bufSize; + + /** Time interval. */ + private final long interval; + + /** Lock. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Buffer. */ + private ConcurrentLinkedDeque8<Object> buf; + + /** Last send time. */ + private long lastSndTime = U.currentTimeMillis(); + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * @param nodeId Master node ID. + * @param hnd Continuous routine handler. + * @param bufSize Buffer size. + * @param interval Interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + RemoteRoutineInfo(UUID nodeId, GridContinuousHandler hnd, int bufSize, long interval, + boolean autoUnsubscribe) { + assert nodeId != null; + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.nodeId = nodeId; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + + buf = new ConcurrentLinkedDeque8<>(); + } + + /** + * @param obj Object to add. + * @return Object to send or {@code null} if there is nothing to send for now. + */ + @Nullable Collection<Object> add(@Nullable Object obj) { + Collection<Object> toSnd = null; + + if (buf.sizex() >= bufSize - 1) { + lock.writeLock().lock(); + + try { + buf.add(obj); + + toSnd = buf; + + buf = new ConcurrentLinkedDeque8<>(); + + if (interval > 0) + lastSndTime = U.currentTimeMillis(); + } + finally { + lock.writeLock().unlock(); + } + } + else { + lock.readLock().lock(); + + try { + buf.add(obj); + } + finally { + lock.readLock().unlock(); + } + } + + return toSnd != null ? new ArrayList<>(toSnd) : null; + } + + /** + * @return Tuple with objects to sleep (or {@code null} if there is nothing to + * send for now) and time interval after next check is needed. + */ + @SuppressWarnings("TooBroadScope") + IgniteBiTuple<Collection<Object>, Long> checkInterval() { + assert interval > 0; + + Collection<Object> toSnd = null; + long diff; + + long now = U.currentTimeMillis(); + + lock.writeLock().lock(); + + try { + diff = now - lastSndTime; + + if (diff >= interval && !buf.isEmpty()) { + toSnd = buf; + + buf = new ConcurrentLinkedDeque8<>(); + + lastSndTime = now; + } + } + finally { + lock.writeLock().unlock(); + } + + return F.t(toSnd, diff < interval ? interval - diff : interval); + } + } + + /** + * Start request data. + */ + private static class StartRequestData implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Projection predicate. */ + private IgnitePredicate<ClusterNode> prjPred; + + /** Serialized projection predicate. */ + private byte[] prjPredBytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** Handler. */ + private GridContinuousHandler hnd; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * Required by {@link Externalizable}. + */ + public StartRequestData() { + // No-op. + } + + /** + * @param prjPred Serialized projection predicate. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, + int bufSize, long interval, boolean autoUnsubscribe) { + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + void p2pMarshal(IgniteMarshaller marsh) throws IgniteCheckedException { + assert marsh != null; + + prjPredBytes = marsh.marshal(prjPred); + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + void p2pUnmarshal(IgniteMarshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + assert prjPred == null; + assert prjPredBytes != null; + + prjPred = marsh.unmarshal(prjPredBytes, ldr); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + boolean b = prjPredBytes != null; + + out.writeBoolean(b); + + if (b) { + U.writeByteArray(out, prjPredBytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + else + out.writeObject(prjPred); + + out.writeObject(hnd); + out.writeInt(bufSize); + out.writeLong(interval); + out.writeBoolean(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + boolean b = in.readBoolean(); + + if (b) { + prjPredBytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + else + prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); + + hnd = (GridContinuousHandler)in.readObject(); + bufSize = in.readInt(); + interval = in.readLong(); + autoUnsubscribe = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRequestData.class, this); + } + } + + /** + * Discovery data. + */ + private static class DiscoveryData implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Node ID. */ + private UUID nodeId; + + /** Items. */ + @GridToStringInclude + private Collection<DiscoveryDataItem> items; + + /** + * Required by {@link Externalizable}. + */ + public DiscoveryData() { + // No-op. + } + + /** + * @param nodeId Node ID. + */ + DiscoveryData(UUID nodeId) { + assert nodeId != null; + + this.nodeId = nodeId; + + items = new ArrayList<>(); + } + + /** + * @param item Item. + */ + public void addItem(DiscoveryDataItem item) { + items.add(item); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, nodeId); + U.writeCollection(out, items); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeId = U.readUuid(in); + items = U.readCollection(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryData.class, this); + } + } + + /** + * Discovery data item. + */ + private static class DiscoveryDataItem implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Consume ID. */ + private UUID routineId; + + /** Projection predicate. */ + private IgnitePredicate<ClusterNode> prjPred; + + /** Handler. */ + private GridContinuousHandler hnd; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * Required by {@link Externalizable}. + */ + public DiscoveryDataItem() { + // No-op. + } + + /** + * @param routineId Consume ID. + * @param prjPred Projection predicate. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + */ + DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred, + GridContinuousHandler hnd, int bufSize, long interval) { + assert routineId != null; + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.routineId = routineId; + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, routineId); + out.writeObject(prjPred); + out.writeObject(hnd); + out.writeInt(bufSize); + out.writeLong(interval); + out.writeBoolean(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + routineId = U.readUuid(in); + prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); + hnd = (GridContinuousHandler)in.readObject(); + bufSize = in.readInt(); + interval = in.readLong(); + autoUnsubscribe = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryDataItem.class, this); + } + } + + /** + * Future for start routine. + */ + private static class StartFuture extends GridFutureAdapter<UUID> { + /** */ + private static final long serialVersionUID = 0L; + + /** Consume ID. */ + private UUID routineId; + + /** Local listener is registered. */ + private volatile boolean loc; + + /** All remote listeners are registered. */ + private volatile boolean rmt; + + /** Timeout object. */ + private volatile GridTimeoutObject timeoutObj; + + /** + * Required by {@link Externalizable}. + */ + public StartFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + * @param routineId Consume ID. + */ + StartFuture(GridKernalContext ctx, UUID routineId) { + super(ctx); + + this.routineId = routineId; + } + + /** + * Called when local listener is registered. + */ + public void onLocalRegistered() { + loc = true; + + if (rmt && !isDone()) + onDone(routineId); + } + + /** + * Called when all remote listeners are registered. + */ + public void onRemoteRegistered() { + rmt = true; + + if (loc && !isDone()) + onDone(routineId); + } + + /** + * @param timeoutObj Timeout object. + */ + public void addTimeoutObject(GridTimeoutObject timeoutObj) { + assert timeoutObj != null; + + this.timeoutObj = timeoutObj; + + ctx.timeout().addTimeoutObject(timeoutObj); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable UUID res, @Nullable Throwable err) { + if (timeoutObj != null) + ctx.timeout().removeTimeoutObject(timeoutObj); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartFuture.class, this); + } + } + + /** + * Future for stop routine. + */ + private static class StopFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Timeout object. */ + private volatile GridTimeoutObject timeoutObj; + + /** + * Required by {@link Externalizable}. + */ + public StopFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + */ + StopFuture(GridKernalContext ctx) { + super(ctx); + } + + /** + * @param timeoutObj Timeout object. + */ + public void addTimeoutObject(GridTimeoutObject timeoutObj) { + assert timeoutObj != null; + + this.timeoutObj = timeoutObj; + + ctx.timeout().addTimeoutObject(timeoutObj); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + if (timeoutObj != null) + ctx.timeout().removeTimeoutObject(timeoutObj); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StopFuture.class, this); + } + } + + /** + * Synchronous message acknowledgement future. + */ + private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** + * Required by {@link Externalizable}. + */ + public SyncMessageAckFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + * @param nodeId Master node ID. + */ + SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { + super(ctx); + + this.nodeId = nodeId; + } + + /** + * @return Master node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SyncMessageAckFuture.class, this); + } + } + + /** + * Timeout object for stop process. + */ + private class StopTimeoutObject extends GridTimeoutObjectAdapter { + /** Timeout. */ + private final long timeout; + + /** Routine ID. */ + private final UUID routineId; + + /** Request. */ + private final GridContinuousMessage req; + + /** + * @param timeout Timeout. + * @param routineId Routine ID. + * @param req Request. + */ + protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) { + super(timeout); + + assert routineId != null; + assert req != null; + + this.timeout = timeout; + this.routineId = routineId; + this.req = req; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + Collection<UUID> ids = waitForStopAck.remove(routineId); + + if (ids != null) { + U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids + + ". Will retry."); + + StopFuture f = stopFuts.get(routineId); + + if (f != null) { + if (!ids.isEmpty()) { + waitForStopAck.put(routineId, ids); + + // Resend requests. + for (UUID id : ids) { + try { + sendWithRetries(id, req, null); + } + catch (ClusterTopologyException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to resend stop request to node (is node alive?): " + id); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to resend stop request to node: " + id, e); + + ids.remove(id); + + if (ids.isEmpty()) + f.onDone(e); + } + } + + // Reschedule timeout. + ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req)); + } + else if (stopFuts.remove(routineId) != null) + f.onDone(); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java new file mode 100644 index 0000000..2bda02f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.dataload.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Bundled factory for cache updaters. + */ +public class GridDataLoadCacheUpdaters { + /** */ + private static final IgniteDataLoadCacheUpdater INDIVIDUAL = new Individual(); + + /** */ + private static final IgniteDataLoadCacheUpdater BATCHED = new Batched(); + + /** */ + private static final IgniteDataLoadCacheUpdater BATCHED_SORTED = new BatchedSorted(); + + /** */ + private static final IgniteDataLoadCacheUpdater GROUP_LOCKED = new GroupLocked(); + + /** + * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance + * is not the best. + * + * @return Single updater. + */ + public static <K, V> IgniteDataLoadCacheUpdater<K, V> individual() { + return INDIVIDUAL; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting + * updated concurrently. Performance is generally better than in {@link #individual()}. + * + * @return Batched updater. + */ + public static <K, V> IgniteDataLoadCacheUpdater<K, V> batched() { + return BATCHED; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates + * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}. + * + * @return Batched sorted updater. + */ + public static <K extends Comparable<?>, V> IgniteDataLoadCacheUpdater<K, V> batchedSorted() { + return BATCHED_SORTED; + } + + /** + * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and + * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])} in group lock transaction. Requires that there are no + * concurrent updates other than in group lock. + * + * @return Updater with group lock. + */ + public static <K, V> IgniteDataLoadCacheUpdater<K, V> groupLocked() { + return GROUP_LOCKED; + } + + /** + * Updates cache. + * + * @param cache Cache. + * @param rmvCol Keys to remove. + * @param putMap Entries to put. + * @throws IgniteCheckedException If failed. + */ + protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Collection<K> rmvCol, + Map<K, V> putMap) throws IgniteCheckedException { + assert rmvCol != null || putMap != null; + + // Here we assume that there are no key duplicates, so the following calls are valid. + if (rmvCol != null) + ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol); + + if (putMap != null) + cache.putAll(putMap); + } + + /** + * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone. + */ + private static class Individual<K, V> implements IgniteDataLoadCacheUpdater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + throws IgniteCheckedException { + assert cache != null; + assert !F.isEmpty(entries); + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) + cache.remove(key); + else + cache.put(key, val); + } + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class Batched<K, V> implements IgniteDataLoadCacheUpdater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + throws IgniteCheckedException { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Collection<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new ArrayList<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new HashMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } + + /** + * Batched updater. Updates cache using batch operations thus is dead lock prone. + */ + private static class BatchedSorted<K, V> implements IgniteDataLoadCacheUpdater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + throws IgniteCheckedException { + assert cache != null; + assert !F.isEmpty(entries); + + Map<K, V> putAll = null; + Collection<K> rmvAll = null; + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key instanceof Comparable; + + V val = entry.getValue(); + + if (val == null) { + if (rmvAll == null) + rmvAll = new TreeSet<>(); + + rmvAll.add(key); + } + else { + if (putAll == null) + putAll = new TreeMap<>(); + + putAll.put(key, val); + } + } + + updateAll(cache, rmvAll, putAll); + } + } + + /** + * Cache updater which uses group lock. + */ + private static class GroupLocked<K, V> implements IgniteDataLoadCacheUpdater<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) + throws IgniteCheckedException { + assert cache != null; + assert !F.isEmpty(entries); + + assert cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != ATOMIC; + + Map<Integer, Integer> partsCounts = new HashMap<>(); + + // Group by partition ID. + Map<Integer, Collection<K>> rmvPartMap = null; + Map<Integer, Map<K, V>> putPartMap = null; + + Ignite ignite = cache.unwrap(Ignite.class); + + GridCacheAffinity<K> aff = ignite.<K, V>cache(cache.getName()).affinity(); + + for (Map.Entry<K, V> entry : entries) { + K key = entry.getKey(); + + assert key != null; + + V val = entry.getValue(); + + int part = aff.partition(key); + + Integer cnt = partsCounts.get(part); + + partsCounts.put(part, cnt == null ? 1 : cnt + 1); + + if (val == null) { + if (rmvPartMap == null) + rmvPartMap = new HashMap<>(); + + F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key); + } + else { + if (putPartMap == null) + putPartMap = new HashMap<>(); + + F.addIfAbsent(putPartMap, part, F.<K, V>newMap()).put(key, val); + } + } + + IgniteTransactions txs = ignite.transactions(); + + for (Map.Entry<Integer, Integer> e : partsCounts.entrySet()) { + Integer part = e.getKey(); + int cnt = e.getValue(); + + try (IgniteTx tx = txs.txStartPartition(cache.getName(), part, PESSIMISTIC, REPEATABLE_READ, 0, cnt)) { + updateAll(cache, rmvPartMap == null ? null : rmvPartMap.get(part), + putPartMap == null ? null : putPartMap.get(part)); + + tx.commit(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java new file mode 100644 index 0000000..794a02b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class GridDataLoadRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private byte[] resTopicBytes; + + /** Cache name. */ + private String cacheName; + + /** */ + private byte[] updaterBytes; + + /** Entries to put. */ + private byte[] colBytes; + + /** {@code True} to ignore deployment ownership. */ + private boolean ignoreDepOwnership; + + /** */ + private boolean skipStore; + + /** */ + private IgniteDeploymentMode depMode; + + /** */ + private String sampleClsName; + + /** */ + private String userVer; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map<UUID, IgniteUuid> ldrParticipants; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private boolean forceLocDep; + + /** + * {@code Externalizable} support. + */ + public GridDataLoadRequest() { + // No-op. + } + + /** + * @param reqId Request ID. + * @param resTopicBytes Response topic. + * @param cacheName Cache name. + * @param updaterBytes Cache updater. + * @param colBytes Collection bytes. + * @param ignoreDepOwnership Ignore ownership. + * @param skipStore Skip store flag. + * @param depMode Deployment mode. + * @param sampleClsName Sample class name. + * @param userVer User version. + * @param ldrParticipants Loader participants. + * @param clsLdrId Class loader ID. + * @param forceLocDep Force local deployment. + */ + public GridDataLoadRequest(long reqId, + byte[] resTopicBytes, + @Nullable String cacheName, + byte[] updaterBytes, + byte[] colBytes, + boolean ignoreDepOwnership, + boolean skipStore, + IgniteDeploymentMode depMode, + String sampleClsName, + String userVer, + Map<UUID, IgniteUuid> ldrParticipants, + IgniteUuid clsLdrId, + boolean forceLocDep) { + this.reqId = reqId; + this.resTopicBytes = resTopicBytes; + this.cacheName = cacheName; + this.updaterBytes = updaterBytes; + this.colBytes = colBytes; + this.ignoreDepOwnership = ignoreDepOwnership; + this.skipStore = skipStore; + this.depMode = depMode; + this.sampleClsName = sampleClsName; + this.userVer = userVer; + this.ldrParticipants = ldrParticipants; + this.clsLdrId = clsLdrId; + this.forceLocDep = forceLocDep; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Response topic. + */ + public byte[] responseTopicBytes() { + return resTopicBytes; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Updater. + */ + public byte[] updaterBytes() { + return updaterBytes; + } + + /** + * @return Collection bytes. + */ + public byte[] collectionBytes() { + return colBytes; + } + + /** + * @return {@code True} to ignore ownership. + */ + public boolean ignoreDeploymentOwnership() { + return ignoreDepOwnership; + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { + return skipStore; + } + + /** + * @return Deployment mode. + */ + public IgniteDeploymentMode deploymentMode() { + return depMode; + } + + /** + * @return Sample class name. + */ + public String sampleClassName() { + return sampleClsName; + } + + /** + * @return User version. + */ + public String userVersion() { + return userVer; + } + + /** + * @return Participants. + */ + public Map<UUID, IgniteUuid> participants() { + return ldrParticipants; + } + + /** + * @return Class loader ID. + */ + public IgniteUuid classLoaderId() { + return clsLdrId; + } + + /** + * @return {@code True} to force local deployment. + */ + public boolean forceLocalDeployment() { + return forceLocDep; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDataLoadRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putString(cacheName)) + return false; + + commState.idx++; + + case 1: + if (!commState.putGridUuid(clsLdrId)) + return false; + + commState.idx++; + + case 2: + if (!commState.putByteArray(colBytes)) + return false; + + commState.idx++; + + case 3: + if (!commState.putEnum(depMode)) + return false; + + commState.idx++; + + case 4: + if (!commState.putBoolean(forceLocDep)) + return false; + + commState.idx++; + + case 5: + if (!commState.putBoolean(ignoreDepOwnership)) + return false; + + commState.idx++; + + case 6: + if (ldrParticipants != null) { + if (commState.it == null) { + if (!commState.putInt(ldrParticipants.size())) + return false; + + commState.it = ldrParticipants.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur; + + if (!commState.keyDone) { + if (!commState.putUuid(e.getKey())) + return false; + + commState.keyDone = true; + } + + if (!commState.putGridUuid(e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + case 7: + if (!commState.putLong(reqId)) + return false; + + commState.idx++; + + case 8: + if (!commState.putByteArray(resTopicBytes)) + return false; + + commState.idx++; + + case 9: + if (!commState.putString(sampleClsName)) + return false; + + commState.idx++; + + case 10: + if (!commState.putBoolean(skipStore)) + return false; + + commState.idx++; + + case 11: + if (!commState.putByteArray(updaterBytes)) + return false; + + commState.idx++; + + case 12: + if (!commState.putString(userVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + String cacheName0 = commState.getString(); + + if (cacheName0 == STR_NOT_READ) + return false; + + cacheName = cacheName0; + + commState.idx++; + + case 1: + IgniteUuid clsLdrId0 = commState.getGridUuid(); + + if (clsLdrId0 == GRID_UUID_NOT_READ) + return false; + + clsLdrId = clsLdrId0; + + commState.idx++; + + case 2: + byte[] colBytes0 = commState.getByteArray(); + + if (colBytes0 == BYTE_ARR_NOT_READ) + return false; + + colBytes = colBytes0; + + commState.idx++; + + case 3: + if (buf.remaining() < 1) + return false; + + byte depMode0 = commState.getByte(); + + depMode = IgniteDeploymentMode.fromOrdinal(depMode0); + + commState.idx++; + + case 4: + if (buf.remaining() < 1) + return false; + + forceLocDep = commState.getBoolean(); + + commState.idx++; + + case 5: + if (buf.remaining() < 1) + return false; + + ignoreDepOwnership = commState.getBoolean(); + + commState.idx++; + + case 6: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (ldrParticipants == null) + ldrParticipants = new HashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { + UUID _val = commState.getUuid(); + + if (_val == UUID_NOT_READ) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + + IgniteUuid _val = commState.getGridUuid(); + + if (_val == GRID_UUID_NOT_READ) + return false; + + ldrParticipants.put((UUID)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 7: + if (buf.remaining() < 8) + return false; + + reqId = commState.getLong(); + + commState.idx++; + + case 8: + byte[] resTopicBytes0 = commState.getByteArray(); + + if (resTopicBytes0 == BYTE_ARR_NOT_READ) + return false; + + resTopicBytes = resTopicBytes0; + + commState.idx++; + + case 9: + String sampleClsName0 = commState.getString(); + + if (sampleClsName0 == STR_NOT_READ) + return false; + + sampleClsName = sampleClsName0; + + commState.idx++; + + case 10: + if (buf.remaining() < 1) + return false; + + skipStore = commState.getBoolean(); + + commState.idx++; + + case 11: + byte[] updaterBytes0 = commState.getByteArray(); + + if (updaterBytes0 == BYTE_ARR_NOT_READ) + return false; + + updaterBytes = updaterBytes0; + + commState.idx++; + + case 12: + String userVer0 = commState.getString(); + + if (userVer0 == STR_NOT_READ) + return false; + + userVer = userVer0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 61; + } + + /** {@inheritDoc} */ + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDataLoadRequest _clone = new GridDataLoadRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridDataLoadRequest _clone = (GridDataLoadRequest)_msg; + + _clone.reqId = reqId; + _clone.resTopicBytes = resTopicBytes; + _clone.cacheName = cacheName; + _clone.updaterBytes = updaterBytes; + _clone.colBytes = colBytes; + _clone.ignoreDepOwnership = ignoreDepOwnership; + _clone.skipStore = skipStore; + _clone.depMode = depMode; + _clone.sampleClsName = sampleClsName; + _clone.userVer = userVer; + _clone.ldrParticipants = ldrParticipants; + _clone.clsLdrId = clsLdrId; + _clone.forceLocDep = forceLocDep; + } +}
