http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java
new file mode 100644
index 0000000..06e1b18
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerManager.java
@@ -0,0 +1,828 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.thread.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * GGFS fragmentizer manager.
+ */
+public class GridGgfsFragmentizerManager extends GridGgfsManager {
+    /** Message offer wait interval. */
+    private static final int MSG_OFFER_TIMEOUT = 1000;
+
+    /** Fragmentizer files check interval. */
+    private static final int FRAGMENTIZER_CHECK_INTERVAL = 3000;
+
+    /** Message send retry interval. */
+    private static final int MESSAGE_SEND_RETRY_INTERVAL = 1000;
+
+    /** How many times retry message send. */
+    private static final int MESSAGE_SEND_RETRY_COUNT = 3;
+
+    /** Manager stopping flag. */
+    private volatile boolean stopping;
+
+    /** Coordinator worker. */
+    private volatile FragmentizerCoordinator fragmentizerCrd;
+
+    /** This variable is used in tests only. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private volatile boolean fragmentizerEnabled = true;
+
+    /** Fragmentizer worker. */
+    private FragmentizerWorker fragmentizerWorker;
+
+    /** Shutdown lock. */
+    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
+
+    /** Message topic. */
+    private Object topic;
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        if (!ggfsCtx.configuration().isFragmentizerEnabled())
+            return;
+
+        // We care only about node leave and fail events.
+        ggfsCtx.kernalContext().event().addLocalEventListener(new 
GridLocalEventListener() {
+            @Override public void onEvent(IgniteEvent evt) {
+                assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
+
+                IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+                checkLaunchCoordinator(discoEvt);
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        fragmentizerWorker = new FragmentizerWorker();
+
+        String ggfsName = ggfsCtx.configuration().getName();
+
+        topic = F.isEmpty(ggfsName) ? TOPIC_GGFS : TOPIC_GGFS.topic(ggfsName);
+
+        ggfsCtx.kernalContext().io().addMessageListener(topic, 
fragmentizerWorker);
+
+        new IgniteThread(fragmentizerWorker).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onKernalStart0() throws IgniteCheckedException {
+        if (ggfsCtx.configuration().isFragmentizerEnabled()) {
+            // Check at startup if this node is a fragmentizer coordinator.
+            IgniteDiscoveryEvent locJoinEvt = 
ggfsCtx.kernalContext().discovery().localJoinEvent();
+
+            checkLaunchCoordinator(locJoinEvt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("BusyWait")
+    @Override protected void onKernalStop0(boolean cancel) {
+        boolean interrupted = false;
+
+        // Busy wait is intentional.
+        while (true) {
+            try {
+                if (rw.tryWriteLock(200, TimeUnit.MILLISECONDS))
+                    break;
+                else
+                    Thread.sleep(200);
+            }
+            catch (InterruptedException ignore) {
+                // Preserve interrupt status & ignore.
+                // Note that interrupted flag is cleared.
+                interrupted = true;
+            }
+        }
+
+        try {
+            if (interrupted)
+                Thread.currentThread().interrupt();
+
+            stopping = true;
+        }
+        finally {
+            rw.writeUnlock();
+        }
+
+        synchronized (this) {
+            if (fragmentizerCrd != null)
+                fragmentizerCrd.cancel();
+        }
+
+        if (fragmentizerWorker != null)
+            fragmentizerWorker.cancel();
+
+        U.join(fragmentizerCrd, log);
+        U.join(fragmentizerWorker, log);
+    }
+
+    /**
+     * @param nodeId Node ID to send message to.
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If send failed.
+     */
+    private void sendWithRetries(UUID nodeId, GridGgfsCommunicationMessage 
msg) throws IgniteCheckedException {
+        for (int i = 0; i < MESSAGE_SEND_RETRY_COUNT; i++) {
+            try {
+                ggfsCtx.send(nodeId, topic, msg, SYSTEM_POOL);
+
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                if (!ggfsCtx.kernalContext().discovery().alive(nodeId))
+                    throw new ClusterTopologyException("Failed to send message 
(node left the grid) " +
+                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
+
+                if (i == MESSAGE_SEND_RETRY_COUNT - 1)
+                    throw e;
+
+                U.sleep(MESSAGE_SEND_RETRY_INTERVAL);
+            }
+        }
+    }
+
+    /**
+     * Checks if current node is the oldest node in topology and starts 
coordinator thread if so.
+     * Note that once node is the oldest one, it will be the oldest until it 
leaves grid.
+     *
+     * @param discoEvt Discovery event.
+     */
+    private void checkLaunchCoordinator(IgniteDiscoveryEvent discoEvt) {
+        rw.readLock();
+
+        try {
+            if (stopping)
+                return;
+
+            if (fragmentizerCrd == null) {
+                long minNodeOrder = Long.MAX_VALUE;
+
+                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
+
+                for (ClusterNode node : nodes) {
+                    if (node.order() < minNodeOrder && ggfsCtx.ggfsNode(node))
+                        minNodeOrder = node.order();
+                }
+
+                ClusterNode locNode = 
ggfsCtx.kernalContext().grid().localNode();
+
+                if (locNode.order() == minNodeOrder) {
+                    if (log.isDebugEnabled())
+                        log.debug("Detected local node to be the eldest GGFS 
node in topology, starting fragmentizer " +
+                            "coordinator thread [discoEvt=" + discoEvt + ", 
locNode=" + locNode + ']');
+
+                    synchronized (this) {
+                        if (fragmentizerCrd == null && !stopping) {
+                            fragmentizerCrd = new FragmentizerCoordinator();
+
+                            new IgniteThread(fragmentizerCrd).start();
+                        }
+                    }
+                }
+            }
+        }
+        finally {
+            rw.readUnlock();
+        }
+    }
+
+    /**
+     * Processes fragmentizer request. For each range assigned to this node:
+     * <ul>
+     *     <li>Mark range as moving indicating that block copying started.</li>
+     *     <li>Copy blocks to non-colocated keys.</li>
+     *     <li>Update map to indicate that blocks were copied and old blocks 
should be deleted.</li>
+     *     <li>Delete old blocks.</li>
+     *     <li>Remove range from file map.</li>
+     * </ul>
+     *
+     * @param req Request.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @SuppressWarnings("fallthrough")
+    private void processFragmentizerRequest(GridGgfsFragmentizerRequest req) 
throws IgniteCheckedException {
+        req.finishUnmarshal(ggfsCtx.kernalContext().config().getMarshaller(), 
null);
+
+        Collection<GridGgfsFileAffinityRange> ranges = req.fragmentRanges();
+        IgniteUuid fileId = req.fileId();
+
+        GridGgfsFileInfo fileInfo = ggfsCtx.meta().info(fileId);
+
+        if (fileInfo == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find file info for fragmentizer request: 
" + req);
+
+            return;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Moving file ranges for fragmentizer request [req=" + 
req + ", fileInfo=" + fileInfo + ']');
+
+        for (GridGgfsFileAffinityRange range : ranges) {
+            try {
+                GridGgfsFileInfo updated;
+
+                switch (range.status()) {
+                    case RANGE_STATUS_INITIAL: {
+                        // Mark range as moving.
+                        updated = ggfsCtx.meta().updateInfo(fileId, 
updateRange(range, RANGE_STATUS_MOVING));
+
+                        if (updated == null) {
+                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
+
+                            continue;
+                        }
+
+                        // Fall-through.
+                    }
+
+                    case RANGE_STATUS_MOVING: {
+                        // Move colocated blocks.
+                        ggfsCtx.data().spreadBlocks(fileInfo, range);
+
+                        // Mark range as moved.
+                        updated = ggfsCtx.meta().updateInfo(fileId, 
updateRange(range, RANGE_STATUS_MOVED));
+
+                        if (updated == null) {
+                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
+
+                            continue;
+                        }
+
+                        // Fall-through.
+                    }
+
+                    case RANGE_STATUS_MOVED: {
+                        // Remove old blocks.
+                        ggfsCtx.data().cleanBlocks(fileInfo, range, false);
+
+                        // Remove range from map.
+                        updated = ggfsCtx.meta().updateInfo(fileId, 
deleteRange(range));
+
+                        if (updated == null)
+                            ggfsCtx.data().cleanBlocks(fileInfo, range, true);
+                    }
+                }
+            }
+            catch (GridGgfsInvalidRangeException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to update file range " +
+                        "[range=" + range + "fileId=" + fileId + ", err=" + 
e.getMessage() + ']');
+            }
+        }
+    }
+
+    /**
+     * Creates update info closure that will mark given range as moving.
+     *
+     * @param range Range to mark as moving.
+     * @param status Status.
+     * @return Update closure.
+     */
+    private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> 
updateRange(final GridGgfsFileAffinityRange range,
+        final int status) {
+        return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() {
+            @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) 
throws IgniteCheckedException {
+                GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap());
+
+                map.updateRangeStatus(range, status);
+
+                if (log.isDebugEnabled())
+                    log.debug("Updated file map for range [fileId=" + 
info.id() + ", range=" + range +
+                        ", status=" + status + ", oldMap=" + info.fileMap() + 
", newMap=" + map + ']');
+
+                GridGgfsFileInfo updated = new GridGgfsFileInfo(info, 
info.length());
+
+                updated.fileMap(map);
+
+                return updated;
+            }
+        };
+    }
+
+    /**
+     * Creates update info closure that will mark given range as moving.
+     *
+     * @param range Range to mark as moving.
+     * @return Update closure.
+     */
+    private IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> 
deleteRange(final GridGgfsFileAffinityRange range) {
+        return new CX1<GridGgfsFileInfo, GridGgfsFileInfo>() {
+            @Override public GridGgfsFileInfo applyx(GridGgfsFileInfo info) 
throws IgniteCheckedException {
+                GridGgfsFileMap map = new GridGgfsFileMap(info.fileMap());
+
+                map.deleteRange(range);
+
+                if (log.isDebugEnabled())
+                    log.debug("Deleted range from file map [fileId=" + 
info.id() + ", range=" + range +
+                        ", oldMap=" + info.fileMap() + ", newMap=" + map + 
']');
+
+                GridGgfsFileInfo updated = new GridGgfsFileInfo(info, 
info.length());
+
+                updated.fileMap(map);
+
+                return updated;
+            }
+        };
+    }
+
+    /**
+     * Fragmentizer coordinator thread.
+     */
+    private class FragmentizerCoordinator extends GridWorker implements 
GridLocalEventListener, GridMessageListener {
+        /** Files being fragmented. */
+        private ConcurrentMap<IgniteUuid, Collection<UUID>> fragmentingFiles = 
new ConcurrentHashMap<>();
+
+        /** Node IDs captured on start. */
+        private volatile Collection<UUID> startSync;
+
+        /** Wait lock. */
+        private Lock lock = new ReentrantLock();
+
+        /** Wait condition. */
+        private Condition cond = lock.newCondition();
+
+        /**
+         * Constructor.
+         */
+        protected FragmentizerCoordinator() {
+            super(ggfsCtx.kernalContext().gridName(), 
"fragmentizer-coordinator", ggfsCtx.kernalContext().log());
+
+            ggfsCtx.kernalContext().event().addLocalEventListener(this, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
+            ggfsCtx.kernalContext().io().addMessageListener(topic, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
+            // Wait for all previous fragmentizer tasks to complete.
+            syncStart();
+
+            while (!isCancelled()) {
+                // If we have room for files, add them to fragmentizer.
+                try {
+                    while (fragmentingFiles.size() < 
ggfsCtx.configuration().getFragmentizerConcurrentFiles()) {
+                        GridGgfsFileInfo fileInfo = 
fileForFragmentizer(fragmentingFiles.keySet());
+
+                        // If no colocated files found, exit loop.
+                        if (fileInfo == null)
+                            break;
+
+                        requestFragmenting(fileInfo);
+                    }
+                }
+                catch (IgniteCheckedException | IgniteException e) {
+                    if (!X.hasCause(e, InterruptedException.class) && 
!X.hasCause(e, IgniteInterruptedException.class))
+                        LT.warn(log, e, "Failed to get fragmentizer file info 
(will retry).");
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Got interrupted exception in 
fragmentizer coordinator (grid is stopping).");
+
+                        break; // While.
+                    }
+                }
+
+                lock.lock();
+
+                try {
+                    cond.await(FRAGMENTIZER_CHECK_INTERVAL, MILLISECONDS);
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEvent(IgniteEvent evt) {
+            assert evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED;
+
+            IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+            if (log.isDebugEnabled())
+                log.debug("Processing node leave event: " + discoEvt);
+
+            boolean signal = false;
+
+            Collection<UUID> startSync0 = startSync;
+
+            if (startSync0 != null && !startSync0.isEmpty()) {
+                startSync0.remove(discoEvt.eventNode().id());
+
+                if (startSync0.isEmpty()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completed fragmentizer coordinator sync 
start.");
+
+                    signal = true;
+                }
+            }
+
+            if (!signal) {
+                Iterator<Map.Entry<IgniteUuid, Collection<UUID>>> it = 
fragmentingFiles.entrySet().iterator();
+
+                while (it.hasNext()) {
+                    Map.Entry<IgniteUuid, Collection<UUID>> entry = it.next();
+
+                    Collection<UUID> nodeIds = entry.getValue();
+
+                    if (nodeIds.remove(discoEvt.eventNode().id())) {
+                        if (nodeIds.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received all responses for 
fragmentizer task [fileId=" + entry.getKey() +
+                                    ']');
+
+                            it.remove();
+
+                            signal = true;
+                        }
+                    }
+                }
+            }
+
+            if (signal)
+                wakeUp();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            if (msg instanceof GridGgfsFragmentizerResponse) {
+                GridGgfsFragmentizerResponse res = 
(GridGgfsFragmentizerResponse)msg;
+
+                IgniteUuid fileId = res.fileId();
+
+                Collection<UUID> nodeIds = fragmentingFiles.get(fileId);
+
+                if (nodeIds != null) {
+                    if (nodeIds.remove(nodeId)) {
+                        if (nodeIds.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Received all responses for 
fragmentizer task [fileId=" + fileId + ']');
+
+                            fragmentingFiles.remove(fileId, nodeIds);
+
+                            wakeUp();
+                        }
+                    }
+                }
+                else
+                    log.warning("Received fragmentizer response for file ID 
which was not requested (will ignore) " +
+                        "[nodeId=" + nodeId + ", fileId=" + res.fileId() + 
']');
+            }
+            else if (msg instanceof GridGgfsSyncMessage) {
+                GridGgfsSyncMessage sync = (GridGgfsSyncMessage)msg;
+
+                if (sync.response() && sync.order() == 
ggfsCtx.kernalContext().grid().localNode().order()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received fragmentizer sync response from 
remote node: " + nodeId);
+
+                    Collection<UUID> startSync0 = startSync;
+
+                    if (startSync0 != null) {
+                        startSync0.remove(nodeId);
+
+                        if (startSync0.isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Completed fragmentizer coordinator 
sync start: " + startSync0);
+
+                            wakeUp();
+                        }
+                    }
+                }
+            }
+        }
+
+        /**
+         * Signals condition.
+         */
+        private void wakeUp() {
+            lock.lock();
+
+            try {
+                cond.signalAll();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /**
+         * Sends sync message to remote nodes and awaits for response from all 
nodes.
+         *
+         * @throws InterruptedException If waiting was interrupted.
+         */
+        private void syncStart() throws InterruptedException {
+            Collection<UUID> startSync0 = startSync = new 
GridConcurrentHashSet<>(
+                F.viewReadOnly(
+                    ggfsCtx.kernalContext().discovery().allNodes(),
+                    F.node2id(),
+                    new P1<ClusterNode>() {
+                        @Override public boolean apply(ClusterNode n) {
+                            return ggfsCtx.ggfsNode(n);
+                        }
+                    }));
+
+            ClusterNode locNode = ggfsCtx.kernalContext().grid().localNode();
+
+            while (!startSync0.isEmpty()) {
+                for (UUID nodeId : startSync0) {
+                    GridGgfsSyncMessage syncReq = new 
GridGgfsSyncMessage(locNode.order(), false);
+
+                    try {
+                        if (log.isDebugEnabled())
+                            log.debug("Sending fragmentizer sync start request 
to remote node [nodeId=" + nodeId +
+                                ", syncReq=" + syncReq + ']');
+
+                        sendWithRetries(nodeId, syncReq);
+
+                        // Close window between message sending and discovery 
event.
+                        if (!ggfsCtx.kernalContext().discovery().alive(nodeId))
+                            startSync0.remove(nodeId);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (e.hasCause(ClusterTopologyException.class)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send sync message to 
remote node (node has left the grid): " +
+                                    nodeId);
+                        }
+                        else
+                            U.error(log, "Failed to send synchronize message 
to remote node (will not wait for reply): " +
+                                nodeId, e);
+
+                        startSync0.remove(nodeId);
+                    }
+                }
+
+                lock.lock();
+
+                try {
+                    if (!startSync0.isEmpty())
+                        cond.await(10000, MILLISECONDS);
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+        }
+
+        /**
+         * Starts file fragmenting. Will group file affinity ranges by nodes 
and send requests to each node.
+         * File will be considered processed when each node replied with 
success (or error) or left the grid.
+         *
+         * @param fileInfo File info to process.
+         */
+        private void requestFragmenting(GridGgfsFileInfo fileInfo) {
+            GridGgfsFileMap map = fileInfo.fileMap();
+
+            assert map != null && !map.ranges().isEmpty();
+
+            Map<UUID, Collection<GridGgfsFileAffinityRange>> grpMap = 
U.newHashMap(map.ranges().size());
+
+            for (GridGgfsFileAffinityRange range : map.ranges()) {
+                UUID nodeId = 
ggfsCtx.data().affinityNode(range.affinityKey()).id();
+
+                Collection<GridGgfsFileAffinityRange> nodeRanges = 
grpMap.get(nodeId);
+
+                if (nodeRanges == null) {
+                    nodeRanges = new LinkedList<>();
+
+                    grpMap.put(nodeId, nodeRanges);
+                }
+
+                
nodeRanges.addAll(range.split(ggfsCtx.data().groupBlockSize()));
+            }
+
+            Collection<UUID> nodeIds = new IdentityHashSet(grpMap.keySet());
+
+            if (log.isDebugEnabled())
+                log.debug("Calculating fragmentizer groups for file 
[fileInfo=" + fileInfo +
+                    ", nodeIds=" + nodeIds + ']');
+
+            // Put assignment to map first.
+            Object old = fragmentingFiles.putIfAbsent(fileInfo.id(), nodeIds);
+
+            assert old == null;
+
+            for (Map.Entry<UUID, Collection<GridGgfsFileAffinityRange>> entry 
: grpMap.entrySet()) {
+                UUID nodeId = entry.getKey();
+
+                GridGgfsFragmentizerRequest msg = new 
GridGgfsFragmentizerRequest(fileInfo.id(), entry.getValue());
+
+                try {
+                    if (log.isDebugEnabled())
+                        log.debug("Sending fragmentizer request to remote node 
[nodeId=" + nodeId +
+                            ", fileId=" + fileInfo.id() + ", msg=" + msg + 
']');
+
+                    sendWithRetries(nodeId, msg);
+                }
+                catch (IgniteCheckedException e) {
+                    if (e.hasCause(ClusterTopologyException.class)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send fragmentizer request to 
remote node (node left grid): " +
+                                nodeId);
+                    }
+                    else
+                        U.error(log, "Failed to send fragmentizer request to 
remote node [nodeId=" + nodeId +
+                            ", msg=" + msg + ']', e);
+
+                    nodeIds.remove(nodeId);
+                }
+            }
+
+            if (nodeIds.isEmpty()) {
+                if (log.isDebugEnabled())
+                    log.debug("Got empty wait set for fragmentized file: " + 
fileInfo);
+
+                fragmentingFiles.remove(fileInfo.id(), nodeIds);
+            }
+        }
+    }
+
+    /**
+     * Gets next file for fragmentizer to be processed.
+     *
+     * @param exclude File IDs to exclude (the ones that are currently being 
processed).
+     * @return File ID to process or {@code null} if there are no such files.
+     * @throws IgniteCheckedException In case of error.
+     */
+    @Nullable private GridGgfsFileInfo 
fileForFragmentizer(Collection<IgniteUuid> exclude) throws 
IgniteCheckedException {
+        return fragmentizerEnabled ? 
ggfsCtx.meta().fileForFragmentizer(exclude) : null;
+    }
+
+    /**
+     * Fragmentizer worker thread.
+     */
+    private class FragmentizerWorker extends GridWorker implements 
GridMessageListener {
+        /** Requests for this worker. */
+        private BlockingQueue<IgniteBiTuple<UUID, 
GridGgfsCommunicationMessage>> msgs = new LinkedBlockingDeque<>();
+
+        /**
+         * Constructor.
+         */
+        protected FragmentizerWorker() {
+            super(ggfsCtx.kernalContext().gridName(), "fragmentizer-worker", 
ggfsCtx.kernalContext().log());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            if (msg instanceof GridGgfsFragmentizerRequest ||
+                msg instanceof GridGgfsSyncMessage) {
+                if (log.isDebugEnabled())
+                    log.debug("Received fragmentizer request from remote node 
[nodeId=" + nodeId +
+                        ", msg=" + msg + ']');
+
+                IgniteBiTuple<UUID, GridGgfsCommunicationMessage> tup = 
F.t(nodeId, (GridGgfsCommunicationMessage)msg);
+
+                try {
+                    if (!msgs.offer(tup, MSG_OFFER_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
+                        U.error(log, "Failed to process fragmentizer 
communication message (will discard) " +
+                            "[nodeId=" + nodeId + ", msg=" + msg + ']');
+                    }
+                }
+                catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+
+                    U.warn(log, "Failed to process fragmentizer communication 
message (thread was interrupted) "+
+                        "[nodeId=" + nodeId + ", msg=" + msg + ']');
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedException {
+            while (!isCancelled()) {
+                IgniteBiTuple<UUID, GridGgfsCommunicationMessage> req = 
msgs.take();
+
+                UUID nodeId = req.get1();
+
+                if (req.get2() instanceof GridGgfsFragmentizerRequest) {
+                    GridGgfsFragmentizerRequest fragmentizerReq = 
(GridGgfsFragmentizerRequest)req.get2();
+
+                    if (!rw.tryReadLock()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received fragmentizing request while 
stopping grid (will ignore) " +
+                                "[nodeId=" + nodeId + ", req=" + req.get2() + 
']');
+
+                        continue; // while.
+                    }
+
+                    try {
+                        try {
+                            processFragmentizerRequest(fragmentizerReq);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (e.hasCause(ClusterTopologyException.class)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to process fragmentizer 
request (remote node left the grid) " +
+                                        "[req=" + req + ", err=" + 
e.getMessage() + ']');
+                            }
+                            else
+                                U.error(log, "Failed to process fragmentizer 
request [nodeId=" + nodeId +
+                                    ", req=" + req + ']', e);
+                        }
+                        finally {
+                            sendResponse(nodeId, new 
GridGgfsFragmentizerResponse(fragmentizerReq.fileId()));
+                        }
+                    }
+                    finally {
+                        rw.readUnlock();
+                    }
+                }
+                else {
+                    assert req.get2() instanceof GridGgfsSyncMessage;
+
+                    GridGgfsSyncMessage syncMsg = 
(GridGgfsSyncMessage)req.get2();
+
+                    if (!syncMsg.response()) {
+                        GridGgfsSyncMessage res = new 
GridGgfsSyncMessage(syncMsg.order(), true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Sending fragmentizer sync response to 
remote node [nodeId=" + nodeId +
+                                ", res=" + res + ']');
+
+                        sendResponse(nodeId, res);
+                    }
+                }
+            }
+        }
+
+        /**
+         * Sends response to remote node.
+         *
+         * @param nodeId Node ID to send response to.
+         * @param msg Message to send.
+         */
+        private void sendResponse(UUID nodeId, GridGgfsCommunicationMessage 
msg) {
+            try {
+                sendWithRetries(nodeId, msg);
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(ClusterTopologyException.class)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send sync response to GGFS 
fragmentizer coordinator " +
+                            "(originating node left the grid): " + nodeId);
+                }
+                else
+                    U.error(log, "Failed to send sync response to GGFS 
fragmentizer coordinator: " + nodeId, e);
+            }
+        }
+    }
+
+    /**
+     * Hash set that overrides equals to use identity comparison.
+     */
+    private static class IdentityHashSet extends GridConcurrentHashSet<UUID> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Constructor.
+         *
+         * @param c Collection to add.
+         */
+        private IdentityHashSet(Collection<UUID> c) {
+            super(c);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            // Identity comparison.
+            return this == o;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java
new file mode 100644
index 0000000..3224340
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerRequest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Fragmentizer request. Sent from coordinator to other GGFS nodes when 
colocated part of file
+ * should be fragmented.
+ */
+public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** Ranges to fragment. */
+    @GridToStringInclude
+    @GridDirectCollection(GridGgfsFileAffinityRange.class)
+    private Collection<GridGgfsFileAffinityRange> fragmentRanges;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridGgfsFragmentizerRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param fileId File id to fragment.
+     * @param fragmentRanges Ranges to fragment.
+     */
+    public GridGgfsFragmentizerRequest(IgniteUuid fileId, 
Collection<GridGgfsFileAffinityRange> fragmentRanges) {
+        this.fileId = fileId;
+        this.fragmentRanges = fragmentRanges;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /**
+     * @return Fragment ranges.
+     */
+    public Collection<GridGgfsFileAffinityRange> fragmentRanges() {
+        return fragmentRanges;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridGgfsFragmentizerRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridGgfsFragmentizerRequest _clone = new GridGgfsFragmentizerRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridGgfsFragmentizerRequest _clone = (GridGgfsFragmentizerRequest)_msg;
+
+        _clone.fileId = fileId;
+        _clone.fragmentRanges = fragmentRanges;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (!commState.putGridUuid(fileId))
+                    return false;
+
+                commState.idx++;
+
+            case 1:
+                if (fragmentRanges != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(fragmentRanges.size()))
+                            return false;
+
+                        commState.it = fragmentRanges.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if 
(!commState.putMessage((GridGgfsFileAffinityRange)commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 0:
+                IgniteUuid fileId0 = commState.getGridUuid();
+
+                if (fileId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                fileId = fileId0;
+
+                commState.idx++;
+
+            case 1:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (fragmentRanges == null)
+                        fragmentRanges = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; 
i++) {
+                        Object _val = commState.getMessage();
+
+                        if (_val == MSG_NOT_READ)
+                            return false;
+
+                        fragmentRanges.add((GridGgfsFileAffinityRange)_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 70;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java
new file mode 100644
index 0000000..641cdcc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsFragmentizerResponse.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.direct.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Fragmentizer response.
+ */
+public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage 
{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File ID. */
+    private IgniteUuid fileId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridGgfsFragmentizerResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param fileId File ID.
+     */
+    public GridGgfsFragmentizerResponse(IgniteUuid fileId) {
+        this.fileId = fileId;
+    }
+
+    /**
+     * @return File ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridGgfsFragmentizerResponse _clone = new 
GridGgfsFragmentizerResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridGgfsFragmentizerResponse _clone = 
(GridGgfsFragmentizerResponse)_msg;
+
+        _clone.fileId = fileId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (!commState.putGridUuid(fileId))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 0:
+                IgniteUuid fileId0 = commState.getGridUuid();
+
+                if (fileId0 == GRID_UUID_NOT_READ)
+                    return false;
+
+                fileId = fileId0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 71;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java
new file mode 100644
index 0000000..363ac30
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHandshakeResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Handshake message.
+ */
+public class GridGgfsHandshakeResponse implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** SECONDARY paths. */
+    private GridGgfsPaths paths;
+
+    /** Server block size. */
+    private long blockSize;
+
+    /** Whether to force sampling on client's side. */
+    private Boolean sampling;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public GridGgfsHandshakeResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param paths Secondary paths.
+     * @param blockSize Server default block size.
+     */
+    public GridGgfsHandshakeResponse(String ggfsName, GridGgfsPaths paths, 
long blockSize, Boolean sampling) {
+        assert paths != null;
+
+        this.ggfsName = ggfsName;
+        this.paths = paths;
+        this.blockSize = blockSize;
+        this.sampling = sampling;
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String ggfsName() {
+        return ggfsName;
+    }
+
+    /**
+     * @return SECONDARY paths configured on server.
+     */
+    public GridGgfsPaths secondaryPaths() {
+        return paths;
+    }
+
+    /**
+     * @return Server default block size.
+     */
+    public long blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Sampling flag.
+     */
+    public Boolean sampling() {
+        return sampling;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, ggfsName);
+
+        paths.writeExternal(out);
+
+        out.writeLong(blockSize);
+
+        if (sampling != null) {
+            out.writeBoolean(true);
+            out.writeBoolean(sampling);
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        ggfsName = U.readString(in);
+
+        paths = new GridGgfsPaths();
+
+        paths.readExternal(in);
+
+        blockSize = in.readLong();
+
+        if (in.readBoolean())
+            sampling = in.readBoolean();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java
new file mode 100644
index 0000000..c11e2b4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+
+/**
+ * GGFS utility processor adapter.
+ */
+public interface GridGgfsHelper {
+    /**
+     * Pre-process cache configuration.
+     *
+     * @param cfg Cache configuration.
+     */
+    public abstract void preProcessCacheConfiguration(CacheConfiguration cfg);
+
+    /**
+     * Validate cache configuration for GGFS.
+     *
+     * @param cfg Cache configuration.
+     * @throws IgniteCheckedException If validation failed.
+     */
+    public abstract void validateCacheConfiguration(CacheConfiguration cfg) 
throws IgniteCheckedException;
+
+    /**
+     * Check whether object is of type {@code GridGgfsBlockKey}
+     *
+     * @param key Key.
+     * @return {@code True} if GGFS block key.
+     */
+    public abstract boolean isGgfsBlockKey(Object key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java
new file mode 100644
index 0000000..84b1eaf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsHelperImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.*;
+import org.apache.ignite.cache.eviction.ggfs.*;
+
+/**
+ * GGFS utils processor.
+ */
+public class GridGgfsHelperImpl implements GridGgfsHelper {
+    /** {@inheritDoc} */
+    @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) 
{
+        GridCacheEvictionPolicy evictPlc = cfg.getEvictionPolicy();
+
+        if (evictPlc instanceof GridCacheGgfsPerBlockLruEvictionPolicy && 
cfg.getEvictionFilter() == null)
+            cfg.setEvictionFilter(new GridCacheGgfsEvictionFilter());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateCacheConfiguration(CacheConfiguration cfg) 
throws IgniteCheckedException {
+        GridCacheEvictionPolicy evictPlc =  cfg.getEvictionPolicy();
+
+        if (evictPlc != null && evictPlc instanceof 
GridCacheGgfsPerBlockLruEvictionPolicy) {
+            GridCacheEvictionFilter evictFilter = cfg.getEvictionFilter();
+
+            if (evictFilter != null && !(evictFilter instanceof 
GridCacheGgfsEvictionFilter))
+                throw new IgniteCheckedException("Eviction filter cannot be 
set explicitly when using " +
+                    "GridCacheGgfsPerBlockLruEvictionPolicy:" + cfg.getName());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isGgfsBlockKey(Object key) {
+        return key instanceof GridGgfsBlockKey;
+    }
+}

Reply via email to