http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java new file mode 100644 index 0000000..06e1b18 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * GGFS fragmentizer manager. + */ +public class GridGgfsFragmentizerManager extends GridGgfsManager { + /** Message offer wait interval. */ + private static final int MSG_OFFER_TIMEOUT = 1000; + + /** Fragmentizer files check interval. */ + private static final int FRAGMENTIZER_CHECK_INTERVAL = 3000; + + /** Message send retry interval. */ + private static final int MESSAGE_SEND_RETRY_INTERVAL = 1000; + + /** How many times retry message send. */ + private static final int MESSAGE_SEND_RETRY_COUNT = 3; + + /** Manager stopping flag. */ + private volatile boolean stopping; + + /** Coordinator worker. */ + private volatile FragmentizerCoordinator fragmentizerCrd; + + /** This variable is used in tests only. */ + @SuppressWarnings("FieldCanBeLocal") + private volatile boolean fragmentizerEnabled = true; + + /** Fragmentizer worker. */ + private FragmentizerWorker fragmentizerWorker; + + /** Shutdown lock. */ + private GridSpinReadWriteLock rw = new GridSpinReadWriteLock(); + + /** Message topic. */ + private Object topic; + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + if (!ggfsCtx.configuration().isFragmentizerEnabled()) + return; + + // We care only about node leave and fail events. + ggfsCtx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + checkLaunchCoordinator(discoEvt); + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + + fragmentizerWorker = new FragmentizerWorker(); + + String ggfsName = ggfsCtx.configuration().getName(); + + topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName); + + ggfsCtx.kernalContext().io().addMessageListener(topic, fragmentizerWorker); + + new IgniteThread(fragmentizerWorker).start(); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + if (ggfsCtx.configuration().isFragmentizerEnabled()) { + // Check at startup if this node is a fragmentizer coordinator. + IgniteDiscoveryEvent locJoinEvt = ggfsCtx.kernalContext().discovery().localJoinEvent(); + + checkLaunchCoordinator(locJoinEvt); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void onKernalStop0(boolean cancel) { + boolean interrupted = false; + + // Busy wait is intentional. + while (true) { + try { + if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS)) + break; + else + Thread.sleep(200); + } + catch (InterruptedException ignore) { + // Preserve interrupt status & ignore. + // Note that interrupted flag is cleared. + interrupted = true; + } + } + + try { + if (interrupted) + Thread.currentThread().interrupt(); + + stopping = true; + } + finally { + rw.writeUnlock(); + } + + synchronized (this) { + if (fragmentizerCrd != null) + fragmentizerCrd.cancel(); + } + + if (fragmentizerWorker != null) + fragmentizerWorker.cancel(); + + U.join(fragmentizerCrd, log); + U.join(fragmentizerWorker, log); + } + + /** + * @param nodeId Node ID to send message to. + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + private void sendWithRetries(UUID nodeId, GridGgfsCommunicationMessage msg) throws IgniteCheckedException { + for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) { + try { + ggfsCtx.send(nodeId, topic, msg, SYSTEM_POOL); + + return; + } + catch (IgniteCheckedException e) { + if (!ggfsCtx.kernalContext().discovery().alive(nodeId)) + throw new ClusterTopologyException("Failed to send message (node left the grid) " + + "[nodeId=" + nodeId + ", msg=" + msg + ']'); + + if (i == MESSAGE_SEND_RETRY_COUNT - 1) + throw e; + + U.sleep(MESSAGE_SEND_RETRY_INTERVAL); + } + } + } + + /** + * Checks if current node is the oldest node in topology and starts coordinator thread if so. + * Note that once node is the oldest one, it will be the oldest until it leaves grid. + * + * @param discoEvt Discovery event. + */ + private void checkLaunchCoordinator(IgniteDiscoveryEvent discoEvt) { + rw.readLock(); + + try { + if (stopping) + return; + + if (fragmentizerCrd == null) { + long minNodeOrder = Long.MAX_VALUE; + + Collection<ClusterNode> nodes = discoEvt.topologyNodes(); + + for (ClusterNode node : nodes) { + if (node.order() < minNodeOrder && ggfsCtx.ggfsNode(node)) + minNodeOrder = node.order(); + } + + ClusterNode locNode = ggfsCtx.kernalContext().grid().localNode(); + + if (locNode.order() == minNodeOrder) { + if (log.isDebugEnabled()) + log.debug("Detected local node to be the eldest GGFS node in topology, starting fragmentizer " + + "coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']'); + + synchronized (this) { + if (fragmentizerCrd == null && !stopping) { + fragmentizerCrd = new FragmentizerCoordinator(); + + new IgniteThread(fragmentizerCrd).start(); + } + } + } + } + } + finally { + rw.readUnlock(); + } + } + + /** + * Processes fragmentizer request. For each range assigned to this node: + * <ul> + * <li>Mark range as moving indicating that block copying started.</li> + * <li>Copy blocks to non-colocated keys.</li> + * <li>Update map to indicate that blocks were copied and old blocks should be deleted.</li> + * <li>Delete old blocks.</li> + * <li>Remove range from file map.</li> + * </ul> + * + * @param req Request. + * @throws IgniteCheckedException In case of error. + */ + @SuppressWarnings("fallthrough") + private void processFragmentizerRequest(GridGgfsFragmentizerRequest req) throws IgniteCheckedException { + req.finishUnmarshal(ggfsCtx.kernalContext().config().getMarshaller(), null); + + Collection<GridGgfsFileAffinityRange> ranges = req.fragmentRanges(); + IgniteUuid fileId = req.fileId(); + + GridGgfsFileInfo fileInfo = ggfsCtx.meta().info(fileId); + + if (fileInfo == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find file info for fragmentizer request: " + req); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Moving file ranges for fragmentizer request [req=" + req + ", fileInfo=" + fileInfo + ']'); + + for (GridGgfsFileAffinityRange range : ranges) { + try { + GridGgfsFileInfo updated; + + switch (range.status()) { + case RANGE_STATUS_INITIAL: { + // Mark range as moving. + updated = ggfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVING)); + + if (updated == null) { + ggfsCtx.data().cleanBlocks(fileInfo, range, true); + + continue; + } + + // Fall-through. + } + + case RANGE_STATUS_MOVING: { + // Move colocated blocks. + ggfsCtx.data().spreadBlocks(fileInfo, range); + + // Mark range as moved. + updated = ggfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVED)); + + if (updated == null) { + ggfsCtx.data().cleanBlocks(fileInfo, range, true); + + continue; + } + + // Fall-through. + } + + case RANGE_STATUS_MOVED: { + // Remove old blocks. + ggfsCtx.data().cleanBlocks(fileInfo, range, false); + + // Remove range from map. + updated = ggfsCtx.meta().updateInfo(fileId, deleteRange(range)); + + if (updated == null) + ggfsCtx.data().cleanBlocks(fileInfo, range, true); + } + } + } + catch (GridGgfsInvalidRangeException e) { + if (log.isDebugEnabled()) + log.debug("Failed to update file range " + + "[range=" + range + "fileId=" + fileId + ", err=" + e.getMessage() + ']'); + } + } + } + + /** + * Creates update info closure that will mark given range as moving. + * + * @param range Range to mark as moving. + * @param status Status. + * @return Update closure. + */ + private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> updateRange(final GridGgfsFileAffinityRange range, + final int status) { + return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() { + @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) throws IgniteCheckedException { + GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap()); + + map.updateRangeStatus(range, status); + + if (log.isDebugEnabled()) + log.debug("Updated file map for range [fileId=" + info.id() + ", range=" + range + + ", status=" + status + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']'); + + GridGgfsFileInfo updated = new GridGgfsFileInfo(info, info.length()); + + updated.fileMap(map); + + return updated; + } + }; + } + + /** + * Creates update info closure that will mark given range as moving. + * + * @param range Range to mark as moving. + * @return Update closure. + */ + private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> deleteRange(final GridGgfsFileAffinityRange range) { + return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() { + @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) throws IgniteCheckedException { + GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap()); + + map.deleteRange(range); + + if (log.isDebugEnabled()) + log.debug("Deleted range from file map [fileId=" + info.id() + ", range=" + range + + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']'); + + GridGgfsFileInfo updated = new GridGgfsFileInfo(info, info.length()); + + updated.fileMap(map); + + return updated; + } + }; + } + + /** + * Fragmentizer coordinator thread. + */ + private class FragmentizerCoordinator extends GridWorker implements GridLocalEventListener, GridMessageListener { + /** Files being fragmented. */ + private ConcurrentMap<IgniteUuid, Collection<UUID>> fragmentingFiles = new ConcurrentHashMap<>(); + + /** Node IDs captured on start. */ + private volatile Collection<UUID> startSync; + + /** Wait lock. */ + private Lock lock = new ReentrantLock(); + + /** Wait condition. */ + private Condition cond = lock.newCondition(); + + /** + * Constructor. + */ + protected FragmentizerCoordinator() { + super(ggfsCtx.kernalContext().gridName(), "fragmentizer-coordinator", ggfsCtx.kernalContext().log()); + + ggfsCtx.kernalContext().event().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); + ggfsCtx.kernalContext().io().addMessageListener(topic, this); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + // Wait for all previous fragmentizer tasks to complete. + syncStart(); + + while (!isCancelled()) { + // If we have room for files, add them to fragmentizer. + try { + while (fragmentingFiles.size() < ggfsCtx.configuration().getFragmentizerConcurrentFiles()) { + GridGgfsFileInfo fileInfo = fileForFragmentizer(fragmentingFiles.keySet()); + + // If no colocated files found, exit loop. + if (fileInfo == null) + break; + + requestFragmenting(fileInfo); + } + } + catch (IgniteCheckedException | IgniteException e) { + if (!X.hasCause(e, InterruptedException.class) && !X.hasCause(e, IgniteInterruptedException.class)) + LT.warn(log, e, "Failed to get fragmentizer file info (will retry)."); + else { + if (log.isDebugEnabled()) + log.debug("Got interrupted exception in fragmentizer coordinator (grid is stopping)."); + + break; // While. + } + } + + lock.lock(); + + try { + cond.await(FRAGMENTIZER_CHECK_INTERVAL, MILLISECONDS); + } + finally { + lock.unlock(); + } + } + } + + /** {@inheritDoc} */ + @Override public void onEvent(IgniteEvent evt) { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + if (log.isDebugEnabled()) + log.debug("Processing node leave event: " + discoEvt); + + boolean signal = false; + + Collection<UUID> startSync0 = startSync; + + if (startSync0 != null && !startSync0.isEmpty()) { + startSync0.remove(discoEvt.eventNode().id()); + + if (startSync0.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Completed fragmentizer coordinator sync start."); + + signal = true; + } + } + + if (!signal) { + Iterator<Map.Entry<IgniteUuid, Collection<UUID>>> it = fragmentingFiles.entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<IgniteUuid, Collection<UUID>> entry = it.next(); + + Collection<UUID> nodeIds = entry.getValue(); + + if (nodeIds.remove(discoEvt.eventNode().id())) { + if (nodeIds.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Received all responses for fragmentizer task [fileId=" + entry.getKey() + + ']'); + + it.remove(); + + signal = true; + } + } + } + } + + if (signal) + wakeUp(); + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof GridGgfsFragmentizerResponse) { + GridGgfsFragmentizerResponse res = (GridGgfsFragmentizerResponse)msg; + + IgniteUuid fileId = res.fileId(); + + Collection<UUID> nodeIds = fragmentingFiles.get(fileId); + + if (nodeIds != null) { + if (nodeIds.remove(nodeId)) { + if (nodeIds.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Received all responses for fragmentizer task [fileId=" + fileId + ']'); + + fragmentingFiles.remove(fileId, nodeIds); + + wakeUp(); + } + } + } + else + log.warning("Received fragmentizer response for file ID which was not requested (will ignore) " + + "[nodeId=" + nodeId + ", fileId=" + res.fileId() + ']'); + } + else if (msg instanceof GridGgfsSyncMessage) { + GridGgfsSyncMessage sync = (GridGgfsSyncMessage)msg; + + if (sync.response() && sync.order() == ggfsCtx.kernalContext().grid().localNode().order()) { + if (log.isDebugEnabled()) + log.debug("Received fragmentizer sync response from remote node: " + nodeId); + + Collection<UUID> startSync0 = startSync; + + if (startSync0 != null) { + startSync0.remove(nodeId); + + if (startSync0.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Completed fragmentizer coordinator sync start: " + startSync0); + + wakeUp(); + } + } + } + } + } + + /** + * Signals condition. + */ + private void wakeUp() { + lock.lock(); + + try { + cond.signalAll(); + } + finally { + lock.unlock(); + } + } + + /** + * Sends sync message to remote nodes and awaits for response from all nodes. + * + * @throws InterruptedException If waiting was interrupted. + */ + private void syncStart() throws InterruptedException { + Collection<UUID> startSync0 = startSync = new GridConcurrentHashSet<>( + F.viewReadOnly( + ggfsCtx.kernalContext().discovery().allNodes(), + F.node2id(), + new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + return ggfsCtx.ggfsNode(n); + } + })); + + ClusterNode locNode = ggfsCtx.kernalContext().grid().localNode(); + + while (!startSync0.isEmpty()) { + for (UUID nodeId : startSync0) { + GridGgfsSyncMessage syncReq = new GridGgfsSyncMessage(locNode.order(), false); + + try { + if (log.isDebugEnabled()) + log.debug("Sending fragmentizer sync start request to remote node [nodeId=" + nodeId + + ", syncReq=" + syncReq + ']'); + + sendWithRetries(nodeId, syncReq); + + // Close window between message sending and discovery event. + if (!ggfsCtx.kernalContext().discovery().alive(nodeId)) + startSync0.remove(nodeId); + } + catch (IgniteCheckedException e) { + if (e.hasCause(ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to send sync message to remote node (node has left the grid): " + + nodeId); + } + else + U.error(log, "Failed to send synchronize message to remote node (will not wait for reply): " + + nodeId, e); + + startSync0.remove(nodeId); + } + } + + lock.lock(); + + try { + if (!startSync0.isEmpty()) + cond.await(10000, MILLISECONDS); + } + finally { + lock.unlock(); + } + } + } + + /** + * Starts file fragmenting. Will group file affinity ranges by nodes and send requests to each node. + * File will be considered processed when each node replied with success (or error) or left the grid. + * + * @param fileInfo File info to process. + */ + private void requestFragmenting(GridGgfsFileInfo fileInfo) { + GridGgfsFileMap map = fileInfo.fileMap(); + + assert map != null && !map.ranges().isEmpty(); + + Map<UUID, Collection<GridGgfsFileAffinityRange>> grpMap = U.newHashMap(map.ranges().size()); + + for (GridGgfsFileAffinityRange range : map.ranges()) { + UUID nodeId = ggfsCtx.data().affinityNode(range.affinityKey()).id(); + + Collection<GridGgfsFileAffinityRange> nodeRanges = grpMap.get(nodeId); + + if (nodeRanges == null) { + nodeRanges = new LinkedList<>(); + + grpMap.put(nodeId, nodeRanges); + } + + nodeRanges.addAll(range.split(ggfsCtx.data().groupBlockSize())); + } + + Collection<UUID> nodeIds = new IdentityHashSet(grpMap.keySet()); + + if (log.isDebugEnabled()) + log.debug("Calculating fragmentizer groups for file [fileInfo=" + fileInfo + + ", nodeIds=" + nodeIds + ']'); + + // Put assignment to map first. + Object old = fragmentingFiles.putIfAbsent(fileInfo.id(), nodeIds); + + assert old == null; + + for (Map.Entry<UUID, Collection<GridGgfsFileAffinityRange>> entry : grpMap.entrySet()) { + UUID nodeId = entry.getKey(); + + GridGgfsFragmentizerRequest msg = new GridGgfsFragmentizerRequest(fileInfo.id(), entry.getValue()); + + try { + if (log.isDebugEnabled()) + log.debug("Sending fragmentizer request to remote node [nodeId=" + nodeId + + ", fileId=" + fileInfo.id() + ", msg=" + msg + ']'); + + sendWithRetries(nodeId, msg); + } + catch (IgniteCheckedException e) { + if (e.hasCause(ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to send fragmentizer request to remote node (node left grid): " + + nodeId); + } + else + U.error(log, "Failed to send fragmentizer request to remote node [nodeId=" + nodeId + + ", msg=" + msg + ']', e); + + nodeIds.remove(nodeId); + } + } + + if (nodeIds.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Got empty wait set for fragmentized file: " + fileInfo); + + fragmentingFiles.remove(fileInfo.id(), nodeIds); + } + } + } + + /** + * Gets next file for fragmentizer to be processed. + * + * @param exclude File IDs to exclude (the ones that are currently being processed). + * @return File ID to process or {@code null} if there are no such files. + * @throws IgniteCheckedException In case of error. + */ + @Nullable private GridGgfsFileInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException { + return fragmentizerEnabled ? ggfsCtx.meta().fileForFragmentizer(exclude) : null; + } + + /** + * Fragmentizer worker thread. + */ + private class FragmentizerWorker extends GridWorker implements GridMessageListener { + /** Requests for this worker. */ + private BlockingQueue<IgniteBiTuple<UUID, GridGgfsCommunicationMessage>> msgs = new LinkedBlockingDeque<>(); + + /** + * Constructor. + */ + protected FragmentizerWorker() { + super(ggfsCtx.kernalContext().gridName(), "fragmentizer-worker", ggfsCtx.kernalContext().log()); + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof GridGgfsFragmentizerRequest || + msg instanceof GridGgfsSyncMessage) { + if (log.isDebugEnabled()) + log.debug("Received fragmentizer request from remote node [nodeId=" + nodeId + + ", msg=" + msg + ']'); + + IgniteBiTuple<UUID, GridGgfsCommunicationMessage> tup = F.t(nodeId, (GridGgfsCommunicationMessage)msg); + + try { + if (!msgs.offer(tup, MSG_OFFER_TIMEOUT, TimeUnit.MILLISECONDS)) { + U.error(log, "Failed to process fragmentizer communication message (will discard) " + + "[nodeId=" + nodeId + ", msg=" + msg + ']'); + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + + U.warn(log, "Failed to process fragmentizer communication message (thread was interrupted) "+ + "[nodeId=" + nodeId + ", msg=" + msg + ']'); + } + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedException { + while (!isCancelled()) { + IgniteBiTuple<UUID, GridGgfsCommunicationMessage> req = msgs.take(); + + UUID nodeId = req.get1(); + + if (req.get2() instanceof GridGgfsFragmentizerRequest) { + GridGgfsFragmentizerRequest fragmentizerReq = (GridGgfsFragmentizerRequest)req.get2(); + + if (!rw.tryReadLock()) { + if (log.isDebugEnabled()) + log.debug("Received fragmentizing request while stopping grid (will ignore) " + + "[nodeId=" + nodeId + ", req=" + req.get2() + ']'); + + continue; // while. + } + + try { + try { + processFragmentizerRequest(fragmentizerReq); + } + catch (IgniteCheckedException e) { + if (e.hasCause(ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to process fragmentizer request (remote node left the grid) " + + "[req=" + req + ", err=" + e.getMessage() + ']'); + } + else + U.error(log, "Failed to process fragmentizer request [nodeId=" + nodeId + + ", req=" + req + ']', e); + } + finally { + sendResponse(nodeId, new GridGgfsFragmentizerResponse(fragmentizerReq.fileId())); + } + } + finally { + rw.readUnlock(); + } + } + else { + assert req.get2() instanceof GridGgfsSyncMessage; + + GridGgfsSyncMessage syncMsg = (GridGgfsSyncMessage)req.get2(); + + if (!syncMsg.response()) { + GridGgfsSyncMessage res = new GridGgfsSyncMessage(syncMsg.order(), true); + + if (log.isDebugEnabled()) + log.debug("Sending fragmentizer sync response to remote node [nodeId=" + nodeId + + ", res=" + res + ']'); + + sendResponse(nodeId, res); + } + } + } + } + + /** + * Sends response to remote node. + * + * @param nodeId Node ID to send response to. + * @param msg Message to send. + */ + private void sendResponse(UUID nodeId, GridGgfsCommunicationMessage msg) { + try { + sendWithRetries(nodeId, msg); + } + catch (IgniteCheckedException e) { + if (e.hasCause(ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to send sync response to GGFS fragmentizer coordinator " + + "(originating node left the grid): " + nodeId); + } + else + U.error(log, "Failed to send sync response to GGFS fragmentizer coordinator: " + nodeId, e); + } + } + } + + /** + * Hash set that overrides equals to use identity comparison. + */ + private static class IdentityHashSet extends GridConcurrentHashSet<UUID> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Constructor. + * + * @param c Collection to add. + */ + private IdentityHashSet(Collection<UUID> c) { + super(c); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + // Identity comparison. + return this == o; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java new file mode 100644 index 0000000..3224340 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Fragmentizer request. Sent from coordinator to other GGFS nodes when colocated part of file + * should be fragmented. + */ +public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Ranges to fragment. */ + @GridToStringInclude + @GridDirectCollection(GridGgfsFileAffinityRange.class) + private Collection<GridGgfsFileAffinityRange> fragmentRanges; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridGgfsFragmentizerRequest() { + // No-op. + } + + /** + * @param fileId File id to fragment. + * @param fragmentRanges Ranges to fragment. + */ + public GridGgfsFragmentizerRequest(IgniteUuid fileId, Collection<GridGgfsFileAffinityRange> fragmentRanges) { + this.fileId = fileId; + this.fragmentRanges = fragmentRanges; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Fragment ranges. + */ + public Collection<GridGgfsFileAffinityRange> fragmentRanges() { + return fragmentRanges; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridGgfsFragmentizerRequest.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFragmentizerRequest _clone = new GridGgfsFragmentizerRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsFragmentizerRequest _clone = (GridGgfsFragmentizerRequest)_msg; + + _clone.fileId = fileId; + _clone.fragmentRanges = fragmentRanges; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putGridUuid(fileId)) + return false; + + commState.idx++; + + case 1: + if (fragmentRanges != null) { + if (commState.it == null) { + if (!commState.putInt(fragmentRanges.size())) + return false; + + commState.it = fragmentRanges.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + if (!commState.putMessage((GridGgfsFileAffinityRange)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { + if (!commState.putInt(-1)) + return false; + } + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + IgniteUuid fileId0 = commState.getGridUuid(); + + if (fileId0 == GRID_UUID_NOT_READ) + return false; + + fileId = fileId0; + + commState.idx++; + + case 1: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { + if (fragmentRanges == null) + fragmentRanges = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + Object _val = commState.getMessage(); + + if (_val == MSG_NOT_READ) + return false; + + fragmentRanges.add((GridGgfsFileAffinityRange)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 70; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java new file mode 100644 index 0000000..641cdcc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; + +import java.io.*; +import java.nio.*; + +/** + * Fragmentizer response. + */ +public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File ID. */ + private IgniteUuid fileId; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridGgfsFragmentizerResponse() { + // No-op. + } + + /** + * @param fileId File ID. + */ + public GridGgfsFragmentizerResponse(IgniteUuid fileId) { + this.fileId = fileId; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridGgfsFragmentizerResponse _clone = new GridGgfsFragmentizerResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridGgfsFragmentizerResponse _clone = (GridGgfsFragmentizerResponse)_msg; + + _clone.fileId = fileId; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putGridUuid(fileId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 0: + IgniteUuid fileId0 = commState.getGridUuid(); + + if (fileId0 == GRID_UUID_NOT_READ) + return false; + + fileId = fileId0; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 71; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java new file mode 100644 index 0000000..363ac30 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Handshake message. + */ +public class GridGgfsHandshakeResponse implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** SECONDARY paths. */ + private GridGgfsPaths paths; + + /** Server block size. */ + private long blockSize; + + /** Whether to force sampling on client's side. */ + private Boolean sampling; + + /** + * {@link Externalizable} support. + */ + public GridGgfsHandshakeResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param paths Secondary paths. + * @param blockSize Server default block size. + */ + public GridGgfsHandshakeResponse(String ggfsName, GridGgfsPaths paths, long blockSize, Boolean sampling) { + assert paths != null; + + this.ggfsName = ggfsName; + this.paths = paths; + this.blockSize = blockSize; + this.sampling = sampling; + } + + /** + * @return GGFS name. + */ + public String ggfsName() { + return ggfsName; + } + + /** + * @return SECONDARY paths configured on server. + */ + public GridGgfsPaths secondaryPaths() { + return paths; + } + + /** + * @return Server default block size. + */ + public long blockSize() { + return blockSize; + } + + /** + * @return Sampling flag. + */ + public Boolean sampling() { + return sampling; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + + paths.writeExternal(out); + + out.writeLong(blockSize); + + if (sampling != null) { + out.writeBoolean(true); + out.writeBoolean(sampling); + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + + paths = new GridGgfsPaths(); + + paths.readExternal(in); + + blockSize = in.readLong(); + + if (in.readBoolean()) + sampling = in.readBoolean(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java new file mode 100644 index 0000000..c11e2b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * GGFS utility processor adapter. + */ +public interface GridGgfsHelper { + /** + * Pre-process cache configuration. + * + * @param cfg Cache configuration. + */ + public abstract void preProcessCacheConfiguration(CacheConfiguration cfg); + + /** + * Validate cache configuration for GGFS. + * + * @param cfg Cache configuration. + * @throws IgniteCheckedException If validation failed. + */ + public abstract void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException; + + /** + * Check whether object is of type {@code GridGgfsBlockKey} + * + * @param key Key. + * @return {@code True} if GGFS block key. + */ + public abstract boolean isGgfsBlockKey(Object key); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java new file mode 100644 index 0000000..84b1eaf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.ggfs.*; + +/** + * GGFS utils processor. + */ +public class GridGgfsHelperImpl implements GridGgfsHelper { + /** {@inheritDoc} */ + @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) { + GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); + + if (evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy && cfg.getEvictionFilter() == null) + cfg.setEvictionFilter(new GridCacheGgfsEvictionFilter()); + } + + /** {@inheritDoc} */ + @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException { + GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy(); + + if (evictPlc != null && evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy) { + GridCacheEvictionFilter evictFilter = cfg.getEvictionFilter(); + + if (evictFilter != null && !(evictFilter instanceof GridCacheGgfsEvictionFilter)) + throw new IgniteCheckedException("Eviction filter cannot be set explicitly when using " + + "GridCacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName()); + } + } + + /** {@inheritDoc} */ + @Override public boolean isGgfsBlockKey(Object key) { + return key instanceof GridGgfsBlockKey; + } +}
