http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cfcf46df/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java new file mode 100644 index 0000000..b17eb41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsMetaManager.java @@ -0,0 +1,2991 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.fs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Cache based structure (meta data) manager. + */ +@SuppressWarnings("all") +public class GridGgfsMetaManager extends GridGgfsManager { + /** GGFS configuration. */ + private IgniteFsConfiguration cfg; + + /** Metadata cache. */ + private GridCache<Object, Object> metaCache; + + /** */ + private IgniteFuture<?> metaCacheStartFut; + + /** File ID to file info projection. */ + private GridCacheProjectionEx<IgniteUuid, GridGgfsFileInfo> id2InfoPrj; + + /** Predefined key for sampling mode value. */ + private GridCacheInternal sampling; + + /** Logger. */ + private IgniteLogger log; + + /** Delete worker. */ + private volatile GridGgfsDeleteWorker delWorker; + + /** Events manager. */ + private GridEventStorageManager evts; + + /** Local node. */ + private ClusterNode locNode; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** + * + */ + void awaitInit() { + if (!metaCacheStartFut.isDone()) { + try { + metaCacheStartFut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + cfg = ggfsCtx.configuration(); + + metaCache = ggfsCtx.kernalContext().cache().cache(cfg.getMetaCacheName()); + + metaCacheStartFut = ggfsCtx.kernalContext().cache().internalCache(cfg.getMetaCacheName()).preloader() + .startFuture(); + + if (metaCache.configuration().getAtomicityMode() != TRANSACTIONAL) + throw new IgniteCheckedException("Meta cache should be transactional: " + cfg.getMetaCacheName()); + + evts = ggfsCtx.kernalContext().event(); + + sampling = new GridGgfsSamplingKey(cfg.getName()); + + assert metaCache != null; + + id2InfoPrj = (GridCacheProjectionEx<IgniteUuid, GridGgfsFileInfo>)metaCache.<IgniteUuid, GridGgfsFileInfo>cache(); + + log = ggfsCtx.kernalContext().log(GridGgfsMetaManager.class); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + locNode = ggfsCtx.kernalContext().discovery().localNode(); + + // Start background delete worker. + delWorker = new GridGgfsDeleteWorker(ggfsCtx); + + delWorker.start(); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + GridGgfsDeleteWorker delWorker0 = delWorker; + + if (delWorker0 != null) + delWorker0.cancel(); + + if (delWorker0 != null) { + try { + U.join(delWorker0); + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + } + + busyLock.block(); + } + + /** + * Return nodes where meta cache is defined. + * + * @return Nodes where meta cache is defined. + */ + Collection<ClusterNode> metaCacheNodes() { + if (busyLock.enterBusy()) { + try { + return ggfsCtx.kernalContext().discovery().cacheNodes(metaCache.name(), -1); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get meta cache nodes because Grid is stopping."); + } + + /** + * Gets file ID for specified path. + * + * @param path Path. + * @return File ID for specified path or {@code null} if such file doesn't exist. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteUuid fileId(IgniteFsPath path) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + return fileId(path, false); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file ID because Grid is stopping: " + path); + } + + /** + * Gets file ID for specified path possibly skipping existing transaction. + * + * @param path Path. + * @param skipTx Whether to skip existing transaction. + * @return File ID for specified path or {@code null} if such file doesn't exist. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgniteUuid fileId(IgniteFsPath path, boolean skipTx) throws IgniteCheckedException { + List<IgniteUuid> ids = fileIds(path, skipTx); + + assert ids != null && !ids.isEmpty() : "Invalid file IDs [path=" + path + ", ids=" + ids + ']'; + + return ids.get(ids.size() - 1); + } + + /** + * Gets file ID by its name from parent directory listing. + * + * @param parentId Parent directory ID to get child ID for. + * @param fileName File name in parent listing to get file ID for. + * @return File ID. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteUuid fileId(IgniteUuid parentId, String fileName) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return fileId(parentId, fileName, false); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file ID because Grid is stopping [parentId=" + parentId + + ", fileName=" + fileName + ']'); + } + + /** + * Gets file ID by its name from parent directory listing possibly skipping existing transaction. + * + * @param parentId Parent directory ID to get child ID for. + * @param fileName File name in parent listing to get file ID for. + * @param skipTx Whether to skip existing transaction. + * @return File ID. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx) throws IgniteCheckedException { + GridGgfsListingEntry entry = directoryListing(parentId, skipTx).get(fileName); + + if (entry == null) { + if (log.isDebugEnabled()) + log.debug("Missing file ID [parentId=" + parentId + ", fileName=" + fileName + ']'); + + return null; + } + + return entry.fileId(); + } + + /** + * Gets all file IDs for components of specified path. Result cannot be empty - there is at least root element. + * But each element (except the first) can be {@code null} if such files don't exist. + * + * @param path Path. + * @return Collection of file IDs for components of specified path. + * @throws IgniteCheckedException If failed. + */ + public List<IgniteUuid> fileIds(IgniteFsPath path) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + return fileIds(path, false); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path); + } + + /** + * Gets all file IDs for components of specified path possibly skipping existing transaction. Result cannot + * be empty - there is at least root element. But each element (except the first) can be {@code null} if such + * files don't exist. + * + * @param path Path. + * @param skipTx Whether to skip existing transaction. + * @return Collection of file IDs for components of specified path. + * @throws IgniteCheckedException If failed. + */ + private List<IgniteUuid> fileIds(IgniteFsPath path, boolean skipTx) throws IgniteCheckedException { + assert path != null; + + // Path components. + Collection<String> components = path.components(); + + // Collection of file IDs for components of specified path. + List<IgniteUuid> ids = new ArrayList<>(components.size() + 1); + + ids.add(ROOT_ID); // Always add root ID. + + IgniteUuid fileId = ROOT_ID; + + for (String s : components) { + assert !s.isEmpty(); + + if (fileId != null) + fileId = fileId(fileId, s, skipTx); + + ids.add(fileId); + } + + return ids; + } + + /** + * Ensure that entry with the given ID exists in meta cache. + * + * @param fileId File id. + * @return {@code True} in case such entry exists. + * @throws IgniteCheckedException IF failed. + */ + public boolean exists(IgniteUuid fileId) throws IgniteCheckedException{ + if (busyLock.enterBusy()) { + try { + assert fileId != null; + + // containsKey() doesn't work here since meta cache can be PARTITIONED (we do not restrict if!). + return info(fileId) != null; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to check file system entry existence because Grid is stopping: " + + fileId); + } + + /** + * Gets file info by its ID. + * + * @param fileId File ID to get details for. + * @return File info. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridGgfsFileInfo info(@Nullable IgniteUuid fileId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + if (fileId == null) + return null; + + GridGgfsFileInfo info = id2InfoPrj.get(fileId); + + // Force root ID always exist in cache. + if (info == null && ROOT_ID.equals(fileId)) + id2InfoPrj.putxIfAbsent(ROOT_ID, info = new GridGgfsFileInfo()); + + return info; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file info because Grid is stopping: " + fileId); + } + + /** + * Gets files details by their IDs. + * + * @param fileIds file IDs to get details for. + * @return Files details. + * @throws IgniteCheckedException If failed. + */ + public Map<IgniteUuid, GridGgfsFileInfo> infos(Collection<IgniteUuid> fileIds) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + assert fileIds != null; + + if (F.isEmpty(fileIds)) + return Collections.emptyMap(); + + Map<IgniteUuid, GridGgfsFileInfo> map = id2InfoPrj.getAll(fileIds); + + // Force root ID always exist in cache. + if (fileIds.contains(ROOT_ID) && !map.containsKey(ROOT_ID)) { + GridGgfsFileInfo info = new GridGgfsFileInfo(); + + id2InfoPrj.putxIfAbsent(ROOT_ID, info); + + map = new GridLeanMap<>(map); + + map.put(ROOT_ID, info); + } + + return map; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file infos because Grid is stopping: " + fileIds); + } + + /** + * Lock the file explicitly outside of transaction. + * + * @param fileId File ID to lock. + * @return Locked file info or {@code null} if file cannot be locked or doesn't exist. + * @throws IgniteCheckedException If failed. + */ + public GridGgfsFileInfo lock(IgniteUuid fileId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + assert fileId != null; + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + // Lock file ID for this transaction. + GridGgfsFileInfo oldInfo = info(fileId); + + if (oldInfo == null) + throw new IgniteCheckedException("Failed to lock file (file not found): " + fileId); + + GridGgfsFileInfo newInfo = lockInfo(oldInfo); + + boolean put = metaCache.putx(fileId, newInfo); + + assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + + tx.commit(); + + return newInfo; + } + catch (GridClosureException e) { + throw U.cast(e); + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to obtain lock because Grid is stopping: " + fileId); + } + + /** + * Set lock on file info. + * + * @param info File info. + * @return New file info with lock set. + * @throws IgniteCheckedException In case lock is already set on that file. + */ + public GridGgfsFileInfo lockInfo(GridGgfsFileInfo info) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert info != null; + + if (info.lockId() != null) + throw new IgniteCheckedException("Failed to lock file (file is being concurrently written) [fileId=" + + info.id() + ", lockId=" + info.lockId() + ']'); + + return new GridGgfsFileInfo(info, IgniteUuid.randomUuid(), info.modificationTime()); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get lock info because Grid is stopping: " + info); + } + + /** + * Remove explicit lock on file held by the current thread. + * + * @param info File info to unlock. + * @param modificationTime Modification time to write to file info. + * @throws IgniteCheckedException If failed. + */ + public void unlock(GridGgfsFileInfo info, long modificationTime) throws IgniteCheckedException { + assert validTxState(false); + assert info != null; + + if (busyLock.enterBusy()) { + try { + IgniteUuid lockId = info.lockId(); + + if (lockId == null) + return; + + // Temporary clear interrupted state for unlocking. + boolean interrupted = Thread.interrupted(); + + IgniteUuid fileId = info.id(); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + // Lock file ID for this transaction. + GridGgfsFileInfo oldInfo = info(fileId); + + if (oldInfo == null) + throw new IgniteFsFileNotFoundException("Failed to unlock file (file not found): " + fileId); + + if (!info.lockId().equals(oldInfo.lockId())) + throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId + + ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']'); + + GridGgfsFileInfo newInfo = new GridGgfsFileInfo(oldInfo, null, modificationTime); + + boolean put = metaCache.putx(fileId, newInfo); + + assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']'; + + tx.commit(); + } + catch (GridClosureException e) { + throw U.cast(e); + } + finally { + tx.close(); + + assert validTxState(false); + + if (interrupted) + Thread.currentThread().interrupt(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + info); + } + + /** + * Lock file IDs participating in the transaction.<br/> + * + * @param fileIds file IDs to lock. + * @return Locked file details. Resulting map doesn't contain details for not-existent files. + * @throws IgniteCheckedException If failed. + */ + private Map<IgniteUuid, GridGgfsFileInfo> lockIds(IgniteUuid... fileIds) throws IgniteCheckedException { + assert validTxState(true); + assert fileIds != null && fileIds.length > 0; + + // Always sort file IDs participating in transaction to escape cache transaction deadlocks. + Arrays.sort(fileIds); + + // Wrap array as collection (1) to escape superfluous check in projection and (2) to check assertions. + Collection<IgniteUuid> keys = Arrays.asList(fileIds); + + if (log.isDebugEnabled()) + log.debug("Locking file ids: " + keys); + + // Lock files and get their infos. + Map<IgniteUuid, GridGgfsFileInfo> map = id2InfoPrj.getAll(keys); + + if (log.isDebugEnabled()) + log.debug("Locked file ids: " + keys); + + // Force root ID always exist in cache. + if (keys.contains(ROOT_ID) && !map.containsKey(ROOT_ID)) { + GridGgfsFileInfo info = new GridGgfsFileInfo(); + + id2InfoPrj.putxIfAbsent(ROOT_ID, info); + + map = new GridLeanMap<>(map); + + map.put(ROOT_ID, info); + } + + // Returns detail's map for locked IDs. + return map; + } + + /** + * List child files for specified file ID. + * + * @param fileId File to list child files for. + * @return Directory listing for the specified file. + * @throws IgniteCheckedException If failed. + */ + public Map<String, GridGgfsListingEntry> directoryListing(IgniteUuid fileId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return directoryListing(fileId, false); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get directory listing because Grid is stopping: " + fileId); + } + + /** + * Gets first available file info for fragmentizer. + * + * @param exclude File IDs to exclude from result. + * @return First qualified file info. + * @throws IgniteCheckedException If failed to get file for fragmentizer. + */ + public GridGgfsFileInfo fileForFragmentizer(Collection<IgniteUuid> exclude) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return fileForFragmentizer0(ROOT_ID, exclude); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get file for framentizer because Grid is stopping."); + } + + /** + * Gets first available file info for fragmentizer. + * + * @param parentId Parent ID to scan. + * @param exclude File IDs to exclude from result. + * @return First qualified file info. + * @throws IgniteCheckedException If failed to get file for fragmentizer. + */ + private GridGgfsFileInfo fileForFragmentizer0(IgniteUuid parentId, Collection<IgniteUuid> exclude) + throws IgniteCheckedException { + GridGgfsFileInfo info = info(parentId); + + // Check if file was concurrently deleted. + if (info == null) + return null; + + assert info.isDirectory(); + + Map<String, GridGgfsListingEntry> listing = info.listing(); + + for (GridGgfsListingEntry entry : listing.values()) { + if (entry.isFile()) { + GridGgfsFileInfo fileInfo = info(entry.fileId()); + + if (fileInfo != null) { + if (!exclude.contains(fileInfo.id()) && + fileInfo.fileMap() != null && + !fileInfo.fileMap().ranges().isEmpty()) + return fileInfo; + } + } + else { + GridGgfsFileInfo fileInfo = fileForFragmentizer0(entry.fileId(), exclude); + + if (fileInfo != null) + return fileInfo; + } + } + + return null; + } + + /** + * List child files for specified file ID possibly skipping existing transaction. + * + * @param fileId File to list child files for. + * @param skipTx Whether to skip existing transaction. + * @return Directory listing for the specified file.* + * @throws IgniteCheckedException If failed. + */ + private Map<String, GridGgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException { + assert fileId != null; + + GridGgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singletonList(fileId)).get(fileId) : + id2InfoPrj.get(fileId); + + return info == null ? Collections.<String, GridGgfsListingEntry>emptyMap() : info.listing(); + } + + /** + * Add file into file system structure. + * + * @param parentId Parent file ID. + * @param fileName File name in the parent's listing. + * @param newFileInfo File info to store in the parent's listing. + * @return File id already stored in meta cache or {@code null} if passed file info was stored. + * @throws IgniteCheckedException If failed. + */ + public IgniteUuid putIfAbsent(IgniteUuid parentId, String fileName, GridGgfsFileInfo newFileInfo) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + assert parentId != null; + assert fileName != null; + assert newFileInfo != null; + + IgniteUuid res = null; + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + res = putIfAbsentNonTx(parentId, fileName, newFileInfo); + + tx.commit(); + } + finally { + tx.close(); + } + + return res; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to put file because Grid is stopping [parentId=" + parentId + + ", fileName=" + fileName + ", newFileInfo=" + newFileInfo + ']'); + } + + /** + * Add file into file system structure. Do not create new transaction expecting that the one already exists. + * + * @param parentId Parent file ID. + * @param fileName File name in the parent's listing. + * @param newFileInfo File info to store in the parent's listing. + * @return File id already stored in meta cache or {@code null} if passed file info was stored. + * @throws IgniteCheckedException If failed. + */ + private IgniteUuid putIfAbsentNonTx(IgniteUuid parentId, String fileName, GridGgfsFileInfo newFileInfo) + throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Locking parent id [parentId=" + parentId + ", fileName=" + fileName + ", newFileInfo=" + + newFileInfo + ']'); + + validTxState(true); + + // Lock only parent file ID. + GridGgfsFileInfo parentInfo = info(parentId); + + assert validTxState(true); + + if (parentInfo == null) + throw new IgniteFsFileNotFoundException("Failed to lock parent directory (not found): " + parentId); + + if (!parentInfo.isDirectory()) + throw new IgniteFsInvalidPathException("Parent file is not a directory: " + parentInfo); + + Map<String, GridGgfsListingEntry> parentListing = parentInfo.listing(); + + assert parentListing != null; + + GridGgfsListingEntry entry = parentListing.get(fileName); + + assert validTxState(true); + + if (entry != null) + return entry.fileId(); + + IgniteUuid fileId = newFileInfo.id(); + + if (!id2InfoPrj.putxIfAbsent(fileId, newFileInfo)) + throw new IgniteFsException("Failed to add file details into cache: " + newFileInfo); + + assert metaCache.get(parentId) != null; + + id2InfoPrj.invoke(parentId, new UpdateListing(fileName, new GridGgfsListingEntry(newFileInfo), false)); + + return null; + } + + /** + * Move or rename file. + * + * @param fileId File ID to move or rename. + * @param srcFileName Original file name in the parent's listing. + * @param srcParentId Parent directory ID. + * @param destFileName New file name in the parent's listing after moving. + * @param destParentId New parent directory ID. + * @throws IgniteCheckedException If failed. + */ + public void move(IgniteUuid fileId, String srcFileName, IgniteUuid srcParentId, String destFileName, + IgniteUuid destParentId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + moveNonTx(fileId, srcFileName, srcParentId, destFileName, destParentId); + + tx.commit(); + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to move file system entry because Grid is stopping [fileId=" + + fileId + ", srcFileName=" + srcFileName + ", srcParentId=" + srcParentId + ", destFileName=" + + destFileName + ", destParentId=" + destParentId + ']'); + } + + /** + * Move or rename file in existing transaction. + * + * @param fileId File ID to move or rename. + * @param srcFileName Original file name in the parent's listing. + * @param srcParentId Parent directory ID. + * @param destFileName New file name in the parent's listing after moving. + * @param destParentId New parent directory ID. + * @throws IgniteCheckedException If failed. + */ + private void moveNonTx(IgniteUuid fileId, @Nullable String srcFileName, IgniteUuid srcParentId, String destFileName, + IgniteUuid destParentId) throws IgniteCheckedException { + assert validTxState(true); + assert fileId != null; + assert srcFileName != null; + assert srcParentId != null; + assert destFileName != null; + assert destParentId != null; + + if (srcParentId.equals(destParentId) && srcFileName.equals(destFileName)) { + if (log.isDebugEnabled()) + log.debug("File is moved to itself [fileId=" + fileId + + ", fileName=" + srcFileName + ", parentId=" + srcParentId + ']'); + + return; // File is moved to itself. + } + + // Lock file ID and parent IDs for this transaction. + Map<IgniteUuid, GridGgfsFileInfo> infoMap = lockIds(srcParentId, fileId, destParentId); + + validTxState(true); + + GridGgfsFileInfo srcInfo = infoMap.get(srcParentId); + + if (srcInfo == null) + throw new IgniteFsFileNotFoundException("Failed to lock source directory (not found?)" + + " [srcParentId=" + srcParentId + ']'); + + if (!srcInfo.isDirectory()) + throw new IgniteFsInvalidPathException("Source is not a directory: " + srcInfo); + + GridGgfsFileInfo destInfo = infoMap.get(destParentId); + + if (destInfo == null) + throw new IgniteFsFileNotFoundException("Failed to lock destination directory (not found?)" + + " [destParentId=" + destParentId + ']'); + + if (!destInfo.isDirectory()) + throw new IgniteFsInvalidPathException("Destination is not a directory: " + destInfo); + + GridGgfsFileInfo fileInfo = infoMap.get(fileId); + + if (fileInfo == null) + throw new IgniteFsFileNotFoundException("Failed to lock target file (not found?) [fileId=" + + fileId + ']'); + + GridGgfsListingEntry srcEntry = srcInfo.listing().get(srcFileName); + GridGgfsListingEntry destEntry = destInfo.listing().get(destFileName); + + // If source file does not exist or was re-created. + if (srcEntry == null || !srcEntry.fileId().equals(fileId)) + throw new IgniteFsFileNotFoundException("Failed to remove file name from the source directory" + + " (file not found) [fileId=" + fileId + ", srcFileName=" + srcFileName + + ", srcParentId=" + srcParentId + ", srcEntry=" + srcEntry + ']'); + + // If stored file already exist. + if (destEntry != null) + throw new IgniteFsInvalidPathException("Failed to add file name into the destination directory " + + "(file already exists) [fileId=" + fileId + ", destFileName=" + destFileName + + ", destParentId=" + destParentId + ", destEntry=" + destEntry + ']'); + + assert metaCache.get(srcParentId) != null; + assert metaCache.get(destParentId) != null; + + // Remove listing entry from the source parent listing. + id2InfoPrj.invoke(srcParentId, new UpdateListing(srcFileName, srcEntry, true)); + + // Add listing entry into the destination parent listing. + id2InfoPrj.invoke(destParentId, new UpdateListing(destFileName, srcEntry, false)); + } + + /** + * Remove file from the file system structure. + * + * @param parentId Parent file ID. + * @param fileName New file name in the parent's listing. + * @param fileId File ID to remove. + * @param path Path of the deleted file. + * @param rmvLocked Whether to remove this entry in case it is has explicit lock. + * @return The last actual file info or {@code null} if such file no more exist. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridGgfsFileInfo removeIfEmpty(IgniteUuid parentId, String fileName, IgniteUuid fileId, + IgniteFsPath path, boolean rmvLocked) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + if (parentId != null) + lockIds(parentId, fileId, TRASH_ID); + else + lockIds(fileId, TRASH_ID); + + GridGgfsFileInfo fileInfo = removeIfEmptyNonTx(parentId, fileName, fileId, path, rmvLocked); + + tx.commit(); + + delWorker.signal(); + + return fileInfo; + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to remove file system entry because Grid is stopping [parentId=" + + parentId + ", fileName=" + fileName + ", fileId=" + fileId + ", path=" + path + ']'); + } + + /** + * Remove file from the file system structure in existing transaction. + * + * @param parentId Parent file ID. + * @param fileName New file name in the parent's listing. + * @param fileId File ID to remove. + * @param path Path of the deleted file. + * @param rmvLocked Whether to remove this entry in case it has explicit lock. + * @return The last actual file info or {@code null} if such file no more exist. + * @throws IgniteCheckedException If failed. + */ + @Nullable private GridGgfsFileInfo removeIfEmptyNonTx(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, + IgniteFsPath path, boolean rmvLocked) + throws IgniteCheckedException { + assert validTxState(true); + assert parentId != null; + assert fileName != null; + assert fileId != null; + assert !ROOT_ID.equals(fileId); + + if (log.isDebugEnabled()) + log.debug("Remove file: [parentId=" + parentId + ", fileName= " + fileName + ", fileId=" + fileId + ']'); + + // Safe gets because locks are obtained in removeIfEmpty. + GridGgfsFileInfo fileInfo = id2InfoPrj.get(fileId); + GridGgfsFileInfo parentInfo = id2InfoPrj.get(parentId); + + if (fileInfo == null || parentInfo == null) { + if (parentInfo != null) { // fileInfo == null + GridGgfsListingEntry entry = parentInfo.listing().get(fileName); + + // If file info does not exists but listing entry exists, throw inconsistent exception. + if (entry != null && entry.fileId().equals(fileId)) + throw new IgniteCheckedException("Failed to remove file (file system is in inconsistent state) " + + "[fileInfo=" + fileInfo + ", fileName=" + fileName + ", fileId=" + fileId + ']'); + } + + return null; // Parent directory or removed file cannot be locked (not found?). + } + + assert parentInfo.isDirectory(); + + if (!rmvLocked && fileInfo.lockId() != null) + throw new IgniteFsException("Failed to remove file (file is opened for writing) [fileName=" + + fileName + ", fileId=" + fileId + ", lockId=" + fileInfo.lockId() + ']'); + + // Validate own directory listing. + if (fileInfo.isDirectory()) { + Map<String, GridGgfsListingEntry> listing = fileInfo.listing(); + + if (!F.isEmpty(listing)) + throw new GridGgfsDirectoryNotEmptyException("Failed to remove file (directory is not empty)" + + " [fileId=" + fileId + ", listing=" + listing + ']'); + } + + // Validate file in the parent listing. + GridGgfsListingEntry listingEntry = parentInfo.listing().get(fileName); + + if (listingEntry == null || !listingEntry.fileId().equals(fileId)) + return null; + + // Actual remove. + softDeleteNonTx(parentId, fileName, fileId); + + // Update a file info of the removed file with a file path, + // which will be used by delete worker for event notifications. + id2InfoPrj.invoke(fileId, new UpdatePath(path)); + + return GridGgfsFileInfo.builder(fileInfo).path(path).build(); + } + + /** + * Move path to the trash directory. + * + * @param parentId Parent ID. + * @param pathName Path name. + * @param pathId Path ID. + * @return ID of an entry located directly under the trash directory. + * @throws IgniteCheckedException If failed. + */ + IgniteUuid softDelete(@Nullable IgniteUuid parentId, @Nullable String pathName, IgniteUuid pathId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + if (parentId == null) + lockIds(pathId, TRASH_ID); + else + lockIds(parentId, pathId, TRASH_ID); + + IgniteUuid resId = softDeleteNonTx(parentId, pathName, pathId); + + tx.commit(); + + delWorker.signal(); + + return resId; + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to perform soft delete because Grid is stopping [parentId=" + + parentId + ", pathName=" + pathName + ", pathId=" + pathId + ']'); + } + + /** + * Move path to the trash directory in existing transaction. + * + * @param parentId Parent ID. + * @param name Path name. + * @param id Path ID. + * @return ID of an entry located directly under the trash directory. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id) + throws IgniteCheckedException { + assert validTxState(true); + + IgniteUuid resId; + + if (parentId == null) { + // Handle special case when we deleting root directory. + assert ROOT_ID.equals(id); + + GridGgfsFileInfo rootInfo = id2InfoPrj.get(ROOT_ID); + + if (rootInfo == null) + return null; // Root was never created. + + // Ensure trash directory existence. + if (id2InfoPrj.get(TRASH_ID) == null) + id2InfoPrj.put(TRASH_ID, new GridGgfsFileInfo(TRASH_ID)); + + Map<String, GridGgfsListingEntry> rootListing = rootInfo.listing(); + + if (!rootListing.isEmpty()) { + IgniteUuid[] lockIds = new IgniteUuid[rootInfo.listing().size()]; + + int i = 0; + + for (GridGgfsListingEntry entry : rootInfo.listing().values()) + lockIds[i++] = entry.fileId(); + + // Lock children IDs in correct order. + lockIds(lockIds); + + // Construct new info and move locked entries from root to it. + Map<String, GridGgfsListingEntry> transferListing = new HashMap<>(); + + transferListing.putAll(rootListing); + + GridGgfsFileInfo newInfo = new GridGgfsFileInfo(transferListing); + + id2InfoPrj.put(newInfo.id(), newInfo); + + // Add new info to trash listing. + id2InfoPrj.invoke(TRASH_ID, new UpdateListing(newInfo.id().toString(), + new GridGgfsListingEntry(newInfo), false)); + + // Remove listing entries from root. + for (Map.Entry<String, GridGgfsListingEntry> entry : transferListing.entrySet()) + id2InfoPrj.invoke(ROOT_ID, new UpdateListing(entry.getKey(), entry.getValue(), true)); + + resId = newInfo.id(); + } + else + resId = null; + } + else { + // Ensure trash directory existence. + if (id2InfoPrj.get(TRASH_ID) == null) + id2InfoPrj.put(TRASH_ID, new GridGgfsFileInfo(TRASH_ID)); + + moveNonTx(id, name, parentId, id.toString(), TRASH_ID); + + resId = id; + } + + return resId; + } + + /** + * Remove listing entries of the given parent. + * + * @param parentId Parent ID. + * @param listing Listing entries. + * @return Collection of really deleted entries. + * @throws IgniteCheckedException If failed. + */ + Collection<IgniteUuid> delete(IgniteUuid parentId, Map<String, GridGgfsListingEntry> listing) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert parentId != null; + assert listing != null; + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + Collection<IgniteUuid> res = new HashSet<>(); + + // Obtain all necessary locks in one hop. + IgniteUuid[] allIds = new IgniteUuid[listing.size() + 1]; + + allIds[0] = parentId; + + int i = 1; + + for (GridGgfsListingEntry entry : listing.values()) + allIds[i++] = entry.fileId(); + + Map<IgniteUuid, GridGgfsFileInfo> locks = lockIds(allIds); + + GridGgfsFileInfo parentInfo = locks.get(parentId); + + // Ensure parent is still in place. + if (parentInfo != null) { + Map<String, GridGgfsListingEntry> newListing = + new HashMap<>(parentInfo.listing().size(), 1.0f); + + newListing.putAll(parentInfo.listing()); + + // Remove child entries if possible. + for (Map.Entry<String, GridGgfsListingEntry> entry : listing.entrySet()) { + IgniteUuid entryId = entry.getValue().fileId(); + + GridGgfsFileInfo entryInfo = locks.get(entryId); + + if (entryInfo != null) { + // Delete only files or empty folders. + if (entryInfo.isFile() || entryInfo.isDirectory() && entryInfo.listing().isEmpty()) { + id2InfoPrj.remove(entryId); + + newListing.remove(entry.getKey()); + + res.add(entryId); + } + } + else { + // Entry was deleted concurrently. + newListing.remove(entry.getKey()); + + res.add(entryId); + } + } + + // Update parent listing. + id2InfoPrj.putx(parentId, new GridGgfsFileInfo(newListing, parentInfo)); + } + + tx.commit(); + + return res; + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to perform delete because Grid is stopping [parentId=" + + parentId + ", listing=" + listing + ']'); + } + + /** + * Remove entry from the metadata listing. + * + * @param parentId Parent ID. + * @param name Name. + * @param id ID. + * @return {@code True} in case the entry really was removed from the cache by this call. + * @throws IgniteCheckedException If failed. + */ + boolean delete(IgniteUuid parentId, String name, IgniteUuid id) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + boolean res = false; + + Map<IgniteUuid, GridGgfsFileInfo> infos = lockIds(parentId, id); + + // Proceed only in case both parent and child exist. + if (infos.containsKey(parentId) && infos.containsKey(id)) { + GridGgfsFileInfo parentInfo = infos.get(parentId); + + assert parentInfo != null; + + GridGgfsListingEntry listingEntry = parentInfo.listing().get(name); + + if (listingEntry != null) + id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true)); + + id2InfoPrj.remove(id); + + res = true; + } + + tx.commit(); + + return res; + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to perform delete because Grid is stopping [parentId=" + + parentId + ", name=" + name + ", id=" + id + ']'); + } + + /** + * Check whether there are any pending deletes and return collection of pending delete entry IDs. + * + * @return Collection of entry IDs to be deleted. + * @throws IgniteCheckedException If operation failed. + */ + public Collection<IgniteUuid> pendingDeletes() throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + GridGgfsFileInfo trashInfo = id2InfoPrj.get(TRASH_ID); + + if (trashInfo != null) { + Map<String, GridGgfsListingEntry> listing = trashInfo.listing(); + + if (listing != null && !listing.isEmpty()) { + return F.viewReadOnly(listing.values(), new IgniteClosure<GridGgfsListingEntry, IgniteUuid>() { + @Override public IgniteUuid apply(GridGgfsListingEntry e) { + return e.fileId(); + } + }); + } + } + + return Collections.emptySet(); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get pending deletes because Grid is stopping."); + } + + /** + * Update file info (file properties) in cache in existing transaction. + * + * @param parentId Parent ID ({@code null} if file is root). + * @param fileName To update listing. + * @param fileId File ID to update information for. + * @param props Properties to set for the file. + * @return Updated file info or {@code null} if such file ID not found. + * @throws IgniteCheckedException If operation failed. + */ + @Nullable private GridGgfsFileInfo updatePropertiesNonTx(@Nullable IgniteUuid parentId, IgniteUuid fileId, + String fileName, Map<String, String> props) throws IgniteCheckedException { + assert fileId != null; + assert !F.isEmpty(props) : "Expects not-empty file's properties"; + assert validTxState(true); + + if (log.isDebugEnabled()) + log.debug("Update file properties [fileId=" + fileId + ", props=" + props + ']'); + + try { + GridGgfsFileInfo oldInfo; + GridGgfsFileInfo parentInfo; + + // Lock file ID for this transaction. + if (parentId == null) { + oldInfo = info(fileId); + parentInfo = null; + } + else { + Map<IgniteUuid, GridGgfsFileInfo> locked = lockIds(parentId, fileId); + + oldInfo = locked.get(fileId); + parentInfo = locked.get(parentId); + + if (parentInfo == null) + return null; // Parent not found. + } + + assert validTxState(true); + + if (oldInfo == null) + return null; // File not found. + + if (parentInfo != null) { + Map<String, GridGgfsListingEntry> listing = parentInfo.listing(); + + GridGgfsListingEntry entry = listing.get(fileName); + + if (entry == null || !entry.fileId().equals(fileId)) // File was removed or recreated. + return null; + } + + Map<String, String> tmp = oldInfo.properties(); + + tmp = tmp == null ? new GridLeanMap<String, String>(props.size()) : new GridLeanMap<>(tmp); + + for (Map.Entry<String, String> e : props.entrySet()) { + if (e.getValue() == null) + // Remove properties with 'null' values. + tmp.remove(e.getKey()); + else + // Add/overwrite property. + tmp.put(e.getKey(), e.getValue()); + } + + GridGgfsFileInfo newInfo = new GridGgfsFileInfo(oldInfo, tmp); + + id2InfoPrj.putx(fileId, newInfo); + + if (parentId != null) { + GridGgfsListingEntry entry = new GridGgfsListingEntry(newInfo); + + assert metaCache.get(parentId) != null; + + id2InfoPrj.invoke(parentId, new UpdateListing(fileName, entry, false)); + } + + return newInfo; + } + catch (GridClosureException e) { + throw U.cast(e); + } + } + + /** + * Update file info (file properties) in cache. + * + * @param parentId Parent ID ({@code null} if file is root). + * @param fileName To update listing. + * @param fileId File ID to update information for. + * @param props Properties to set for the file. + * @return Updated file info or {@code null} if such file ID not found. + * @throws IgniteCheckedException If operation failed. + */ + @Nullable public GridGgfsFileInfo updateProperties(@Nullable IgniteUuid parentId, IgniteUuid fileId, String fileName, + Map<String, String> props) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + GridGgfsFileInfo info = updatePropertiesNonTx(parentId, fileId, fileName, props); + + tx.commit(); + + return info; + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to update properties because Grid is stopping [parentId=" + + parentId + ", fileId=" + fileId + ", fileName=" + fileName + ", props=" + props + ']'); + } + + /** + * Asynchronously updates record in parent listing. + * + * @param parentId Parent ID. + * @param fileId File ID. + * @param fileName File name. + * @param lenDelta Length delta. + * @param modificationTime Last modification time. + */ + public void updateParentListingAsync(IgniteUuid parentId, IgniteUuid fileId, String fileName, long lenDelta, + long modificationTime) { + if (busyLock.enterBusy()) { + try { + assert parentId != null; + + assert validTxState(false); + + id2InfoPrj.invokeAsync(parentId, new UpdateListingEntry(fileId, fileName, lenDelta, 0, + modificationTime)); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to update parent listing because Grid is stopping [parentId=" + + parentId + ", fileId=" + fileId + ", fileName=" + fileName + ']'); + } + + /** + * Update file info in cache. + * + * @param fileId File ID to update information for. + * @param c Closure to update file's info inside transaction. + * @return Updated file info or {@code null} if such file ID not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridGgfsFileInfo updateInfo(IgniteUuid fileId, IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo> c) + throws IgniteCheckedException { + assert validTxState(false); + assert fileId != null; + assert c != null; + + if (busyLock.enterBusy()) { + try { + if (log.isDebugEnabled()) + log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']'); + + IgniteTx tx = metaCache.isLockedByThread(fileId) ? null : metaCache.txStart(PESSIMISTIC, + REPEATABLE_READ); + + try { + // Lock file ID for this transaction. + GridGgfsFileInfo oldInfo = info(fileId); + + if (oldInfo == null) + return null; // File not found. + + GridGgfsFileInfo newInfo = c.apply(oldInfo); + + if (newInfo == null) + throw new IgniteFsException("Failed to update file info with null value" + + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); + + if (!oldInfo.id().equals(newInfo.id())) + throw new IgniteFsException("Failed to update file info (file IDs differ)" + + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); + + if (oldInfo.isDirectory() != newInfo.isDirectory()) + throw new IgniteFsException("Failed to update file info (file types differ)" + + " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']'); + + boolean b = metaCache.replace(fileId, oldInfo, newInfo); + + assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo + + ", c=" + c + ']'; + + if (tx != null) + tx.commit(); + + return newInfo; + } + catch (GridClosureException e) { + throw U.cast(e); + } + finally { + if (tx != null) + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to update file system entry info because Grid is stopping: " + + fileId); + } + + /** + * Set sampling flag. + * + * @param val Sampling flag state or {@code null} to clear sampling state and mark it as "not set". + * @return {@code True} if sampling mode was actually changed by this call. + * @throws IgniteCheckedException If failed. + */ + public boolean sampling(Boolean val) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + validTxState(false); + + IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + Object prev = val != null ? metaCache.put(sampling, val) : metaCache.remove(sampling); + + tx.commit(); + + return !F.eq(prev, val); + } + finally { + tx.close(); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set sampling flag because Grid is stopping."); + } + + /** + * Get sampling flag state. + * + * @return {@code True} in case sampling is enabled, {@code false} otherwise or {@code null} in case sampling + * is not set. + * @throws IgniteCheckedException If failed. + */ + public Boolean sampling() throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + validTxState(false); + + Object val = metaCache.get(sampling); + + return (val == null || !(val instanceof Boolean)) ? null : (Boolean)val; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get sampling flag because Grid is stopping."); + } + + /** + * Create the file in DUAL mode. + * + * @param fs File system. + * @param path Path. + * @param simpleCreate "Simple create" flag. + * @param props Properties.. + * @param overwrite Overwrite flag. + * @param bufSize Buffer size. + * @param replication Replication factor. + * @param blockSize Block size. + * @param affKey Affinity key. + * @return Output stream descriptor. + * @throws IgniteCheckedException If file creation failed. + */ + public GridGgfsSecondaryOutputStreamDescriptor createDual(final IgniteFsFileSystem fs, final IgniteFsPath path, + final boolean simpleCreate, @Nullable final Map<String, String> props, final boolean overwrite, final int bufSize, + final short replication, final long blockSize, final IgniteUuid affKey) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + + // Events to fire (can be done outside of a transaction). + final Deque<IgniteFsEvent> pendingEvts = new LinkedList<>(); + + SynchronizationTask<GridGgfsSecondaryOutputStreamDescriptor> task = + new SynchronizationTask<GridGgfsSecondaryOutputStreamDescriptor>() { + /** Output stream to the secondary file system. */ + private OutputStream out; + + @Override public GridGgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgniteFsPath, + GridGgfsFileInfo> infos) throws Exception { + assert !infos.isEmpty(); + + // Determine the first existing parent. + IgniteFsPath parentPath = null; + + for (IgniteFsPath curPath : infos.keySet()) { + if (parentPath == null || curPath.isSubDirectoryOf(parentPath)) + parentPath = curPath; + } + + assert parentPath != null; + + GridGgfsFileInfo parentInfo = infos.get(parentPath); + + // Delegate to the secondary file system. + out = simpleCreate ? fs.create(path, overwrite) : + fs.create(path, bufSize, overwrite, replication, blockSize, props); + + IgniteFsPath parent0 = path.parent(); + + assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path; + + // If some of the parent directories were missing, synchronize again. + if (!parentPath.equals(parent0)) { + parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null); + + // Fire notification about missing directories creation. + if (evts.isRecordable(EVT_GGFS_DIR_CREATED)) { + IgniteFsPath evtPath = parent0; + + while (!parentPath.equals(evtPath)) { + pendingEvts.addFirst(new IgniteFsEvent(evtPath, locNode, EVT_GGFS_DIR_CREATED)); + + evtPath = evtPath.parent(); + + assert evtPath != null; // If this fails, then ROOT does not exist. + } + } + } + + // Get created file info. + IgniteFsFile status = fs.info(path); + + if (status == null) + throw new IgniteFsException("Failed to open output stream to the file created in " + + "the secondary file system because it no longer exists: " + path); + else if (status.isDirectory()) + throw new IgniteFsException("Failed to open output stream to the file created in " + + "the secondary file system because the path points to a directory: " + path); + + GridGgfsFileInfo newInfo = new GridGgfsFileInfo(status.blockSize(), status.length(), affKey, + IgniteUuid.randomUuid(), ggfsCtx.ggfs().evictExclude(path, false), status.properties()); + + // Add new file info to the listing optionally removing the previous one. + IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo); + + if (oldId != null) { + GridGgfsFileInfo oldInfo = info(oldId); + + id2InfoPrj.removex(oldId); // Remove the old one. + id2InfoPrj.putx(newInfo.id(), newInfo); // Put the new one. + + id2InfoPrj.invoke(parentInfo.id(), + new UpdateListing(path.name(), parentInfo.listing().get(path.name()), true)); + id2InfoPrj.invoke(parentInfo.id(), + new UpdateListing(path.name(), new GridGgfsListingEntry(newInfo), false)); + + IgniteFuture<?> delFut = ggfsCtx.data().delete(oldInfo); + + // Record PURGE event if needed. + if (evts.isRecordable(EVT_GGFS_FILE_PURGED)) { + delFut.listenAsync(new CI1<IgniteFuture<?>>() { + @Override public void apply(IgniteFuture<?> t) { + try { + t.get(); // Ensure delete succeeded. + + evts.record(new IgniteFsEvent(path, locNode, EVT_GGFS_FILE_PURGED)); + } + catch (IgniteCheckedException e) { + LT.warn(log, e, "Old file deletion failed in DUAL mode [path=" + path + + ", simpleCreate=" + simpleCreate + ", props=" + props + + ", overwrite=" + overwrite + ", bufferSize=" + bufSize + + ", replication=" + replication + ", blockSize=" + blockSize + ']'); + } + } + }); + } + + // Record DELETE event if needed. + if (evts.isRecordable(EVT_GGFS_FILE_DELETED)) + pendingEvts.add(new IgniteFsEvent(path, locNode, EVT_GGFS_FILE_DELETED)); + } + + // Record CREATE event if needed. + if (evts.isRecordable(EVT_GGFS_FILE_CREATED)) + pendingEvts.add(new IgniteFsEvent(path, locNode, EVT_GGFS_FILE_CREATED)); + + return new GridGgfsSecondaryOutputStreamDescriptor(parentInfo.id(), newInfo, out); + } + + @Override public GridGgfsSecondaryOutputStreamDescriptor onFailure(Exception err) + throws IgniteCheckedException { + U.closeQuiet(out); + + U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" + + simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" + + bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err); + + if (err instanceof IgniteFsException) + throw (IgniteFsException)err; + else + throw new IgniteFsException("Failed to create the file due to secondary file system " + + "exception: " + path, err); + } + }; + + try { + return synchronizeAndExecute(task, fs, false, path.parent()); + } + finally { + for (IgniteFsEvent evt : pendingEvts) + evts.record(evt); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to create file in DUAL mode because Grid is stopping: " + path); + } + + /** + * Append to a file in DUAL mode. + * + * @param fs File system. + * @param path Path. + * @param bufSize Buffer size. + * @return Output stream descriptor. + * @throws IgniteCheckedException If output stream open for append has failed. + */ + public GridGgfsSecondaryOutputStreamDescriptor appendDual(final IgniteFsFileSystem fs, final IgniteFsPath path, + final int bufSize) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + + SynchronizationTask<GridGgfsSecondaryOutputStreamDescriptor> task = + new SynchronizationTask<GridGgfsSecondaryOutputStreamDescriptor>() { + /** Output stream to the secondary file system. */ + private OutputStream out; + + @Override public GridGgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgniteFsPath, + GridGgfsFileInfo> infos) throws Exception { + GridGgfsFileInfo info = infos.get(path); + + if (info.isDirectory()) + throw new IgniteFsException("Failed to open output stream to the file in the " + + "secondary file system because the path points to a directory: " + path); + + out = fs.append(path, bufSize, false, null); + + // Synchronize file ending. + long len = info.length(); + int blockSize = info.blockSize(); + + int remainder = (int)(len % blockSize); + + if (remainder > 0) { + int blockIdx = (int)(len / blockSize); + + IgniteFsReader reader = fs.open(path, bufSize); + + try { + ggfsCtx.data().dataBlock(info, path, blockIdx, reader).get(); + } + finally { + reader.close(); + } + } + + // Set lock and return. + info = lockInfo(info); + + metaCache.putx(info.id(), info); + + return new GridGgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(), info, out); + } + + @Override public GridGgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err) + throws IgniteCheckedException { + U.closeQuiet(out); + + U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + + ']', err); + + if (err instanceof IgniteFsException) + throw (IgniteFsException)err; + else + throw new IgniteCheckedException("Failed to append to the file due to secondary file system " + + "exception: " + path, err); + } + }; + + return synchronizeAndExecute(task, fs, true, path); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to append to file in DUAL mode because Grid is stopping: " + path); + } + + /** + * Open file in DUAL mode. + * + * @param fs Secondary file system. + * @param path Path to open. + * @param bufSize Buffer size. + * @return Input stream descriptor. + * @throws IgniteCheckedException If input stream open has failed. + */ + public GridGgfsSecondaryInputStreamDescriptor openDual(final IgniteFsFileSystem fs, final IgniteFsPath path, + final int bufSize) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + + // First, try getting file info without any transactions and synchronization. + GridGgfsFileInfo info = info(fileId(path)); + + if (info != null) { + if (!info.isFile()) + throw new IgniteFsInvalidPathException("Failed to open file (not a file): " + path); + + return new GridGgfsSecondaryInputStreamDescriptor(info, fs.open(path, bufSize)); + } + + // If failed, try synchronize. + SynchronizationTask<GridGgfsSecondaryInputStreamDescriptor> task = + new SynchronizationTask<GridGgfsSecondaryInputStreamDescriptor>() { + @Override public GridGgfsSecondaryInputStreamDescriptor onSuccess( + Map<IgniteFsPath, GridGgfsFileInfo> infos) throws Exception { + GridGgfsFileInfo info = infos.get(path); + + if (info == null) + throw new IgniteFsFileNotFoundException("File not found: " + path); + if (!info.isFile()) + throw new IgniteFsInvalidPathException("Failed to open file (not a file): " + path); + + return new GridGgfsSecondaryInputStreamDescriptor(infos.get(path), fs.open(path, bufSize)); + } + + @Override public GridGgfsSecondaryInputStreamDescriptor onFailure(@Nullable Exception err) + throws IgniteCheckedException { + U.error(log, "File open in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize + + ']', err); + + if (err instanceof IgniteFsException) + throw (IgniteCheckedException)err; + else + throw new IgniteCheckedException("Failed to open the path due to secondary file system " + + "exception: " + path, err); + } + }; + + return synchronizeAndExecute(task, fs, false, path); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to open file in DUAL mode because Grid is stopping: " + path); + } + + /** + * Synchronizes with secondary file system. + * + * @param fs File system. + * @param path Path. + * @return File info or {@code null} if file not found. + * @throws IgniteCheckedException If sync task failed. + */ + @Nullable public GridGgfsFileInfo synchronizeFileDual(final IgniteFsFileSystem fs, final IgniteFsPath path) + throws IgniteCheckedException { + assert fs != null; + assert path != null; + + if (busyLock.enterBusy()) { + try { + // First, try getting file info without any transactions and synchronization. + GridGgfsFileInfo info = info(fileId(path)); + + if (info != null) + return info; + + // If failed, try synchronize. + SynchronizationTask<GridGgfsFileInfo> task = + new SynchronizationTask<GridGgfsFileInfo>() { + @Override public GridGgfsFileInfo onSuccess(Map<IgniteFsPath, GridGgfsFileInfo> infos) + throws Exception { + return infos.get(path); + } + + @Override public GridGgfsFileInfo onFailure(@Nullable Exception err) throws IgniteCheckedException { + if (err instanceof IgniteFsException) + throw (IgniteCheckedException)err; + else + throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " + + "exception: " + path, err); + } + }; + + return synchronizeAndExecute(task, fs, false, path); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to synchronize file because Grid is stopping: " + path); + + + } + + /** + * Create directory in DUAL mode. + * + * @param fs Secondary file system. + * @param path Path to create. + * @param props Properties to be applied. + * @return {@code True} in case rename was successful. + * @throws IgniteCheckedException If directory creation failed. + */ + public boolean mkdirsDual(final IgniteFsFileSystem fs, final IgniteFsPath path, final Map<String, String> props) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + assert props != null; + + if (path.parent() == null) + return true; // No additional handling for root directory is needed. + + // Events to fire (can be done outside of a transaction). + final Deque<IgniteFsEvent> pendingEvts = new LinkedList<>(); + + SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() { + @Override public Boolean onSuccess(Map<IgniteFsPath, GridGgfsFileInfo> infos) throws Exception { + fs.mkdirs(path, props); + + assert !infos.isEmpty(); + + // Now perform synchronization again starting with the last created parent. + IgniteFsPath parentPath = null; + + for (IgniteFsPath curPath : infos.keySet()) { + if (parentPath == null || curPath.isSubDirectoryOf(parentPath)) + parentPath = curPath; + } + + assert parentPath != null; + + GridGgfsFileInfo parentPathInfo = infos.get(parentPath); + + synchronize(fs, parentPath, parentPathInfo, path, true, null); + + if (evts.isRecordable(EVT_GGFS_DIR_CREATED)) { + IgniteFsPath evtPath = path; + + while (!parentPath.equals(evtPath)) { + pendingEvts.addFirst(new IgniteFsEvent(evtPath, locNode, EVT_GGFS_DIR_CREATED)); + + evtPath = evtPath.parent(); + + assert evtPath != null; // If this fails, then ROOT does not exist. + } + } + + return true; + } + + @Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException { + U.error(log, "Directory creation in DUAL mode failed [path=" + path + ", properties=" + props + + ']', err); + + throw new IgniteCheckedException("Failed to create the path due to secondary file system exception: " + + path, err); + } + }; + + try { + return synchronizeAndExecute(task, fs, false, path.parent()); + } + finally { + for (IgniteFsEvent evt : pendingEvts) + evts.record(evt); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to create directory in DUAL mode because Grid is stopping: " + + path); + } + + /** + * Rename path in DUAL mode. + * + * @param fs Secondary file system. + * @param src Source path. + * @param dest Destination path. + * @return Operation result. + * @throws IgniteCheckedException If failed. + */ + public boolean renameDual(final IgniteFsFileSystem fs, final IgniteFsPath src, final IgniteFsPath dest) throws + IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert src != null; + assert dest != null; + + if (src.parent() == null) + return false; // Root directory cannot be renamed. + + // Events to fire (can be done outside of a transaction). + final Collection<IgniteFsEvent> pendingEvts = new LinkedList<>(); + + SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() { + @Override public Boolean onSuccess(Map<IgniteFsPath, GridGgfsFileInfo> infos) throws Exception { + GridGgfsFileInfo srcInfo = infos.get(src); + GridGgfsFileInfo srcParentInfo = infos.get(src.parent()); + GridGgfsFileInfo destInfo = infos.get(dest); + GridGgfsFileInfo destParentInfo = dest.parent() != null ? infos.get(dest.parent()) : null; + + // Source path and destination (or destination parent) must exist. + if (srcInfo == null) + throw new IgniteFsFileNotFoundException("Failed to rename (source path not found): " + src); + + if (destInfo == null && destParentInfo == null) + throw new IgniteFsFileNotFoundException("Failed to rename (destination path not found): " + + dest); + + // Delegate to the secondary file system. + fs.rename(src, dest); + + // Rename was successful, perform compensation in the local file system. + if (destInfo == null) { + // Move and rename. + assert destParentInfo != null; + + moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), dest.name(), destParentInfo.id()); + } + else { + // Move. + if (destInfo.isFile()) + throw new IgniteFsException("Failed to rename the path in the local file system " + + "because destination path already exists and it is a file: " + dest); + else + moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), src.name(), destInfo.id()); + } + + // Record event if needed. + if (srcInfo.isFile()) { + if (evts.isRecordable(EVT_GGFS_FILE_RENAMED)) + pendingEvts.add(new IgniteFsEvent( + src, + destInfo == null ? dest : new IgniteFsPath(dest, src.name()), + locNode, + EVT_GGFS_FILE_RENAMED)); + } + else if (evts.isRecordable(EVT_GGFS_DIR_RENAMED)) + pendingEvts.add(new IgniteFsEvent(src, dest, locNode, EVT_GGFS_DIR_RENAMED)); + + return true; + } + + @Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException { + U.error(log, "Path rename in DUAL mode failed [source=" + src + ", destination=" + dest + ']', + err); + + if (err instanceof IgniteFsException) + throw (IgniteCheckedException)err; + else + throw new IgniteCheckedException("Failed to rename the path due to secondary file system " + + "exception: " + src, err); + } + }; + + try { + return synchronizeAndExecute(task, fs, false, src, dest); + } + finally { + for (IgniteFsEvent evt : pendingEvts) + evts.record(evt); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to rename in DUAL mode because Grid is stopping [src=" + src + + ", dest=" + dest + ']'); + } + + /** + * Delete path in DUAL mode. + * + * @param fs Secondary file system. + * @param path Path to update. + * @param recursive Recursive flag. + * @return Operation result. + * @throws IgniteCheckedException If delete failed. + */ + public boolean deleteDual(final IgniteFsFileSystem fs, final IgniteFsPath path, final boolean recursive) + throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + assert fs != null; + assert path != null; + + SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() { + @Override public Boolean onSuccess(Map<IgniteFsPath, GridGgfsFileInfo> infos) throws Exception { + GridGgfsFileInfo info = infos.get(path); + + if (info == null) + return false; // File doesn't exist in the secondary file system. + + if (!fs.delete(path, recursive)) + return false; // Delete failed remotely. + + if (path.parent() != null) { + assert infos.containsKey(path.parent()); + + softDeleteNonTx(infos.get(path.parent()).id(), path.name(), info.id()); + } + else { + assert ROOT_ID.equals(info.id()); + + softDeleteNonTx(null, path.name(), info.id()); + } + + // Update the deleted file info with path information for delete worker. + id2InfoPrj.invoke(info.id(), new UpdatePath(path)); + + return true; // No additional handling is required. + } + + @Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException { + U.error(log, "Path delete in DUAL mode failed [path=" + path + ", recursive=" + recursive + ']', + err); + + throw new IgniteCheckedException("Failed to delete the path due to secondary file system exception: ", + err); + } + }; + + Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(TRASH_ID), path); + + delWorker.signal(); + + return res; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to delete in DUAL mode because Grid is stopping: " + path); + } + + /** + * Update path in DUAL mode. + * + * @param fs Secondary file system. + * @param path Path to update. + * @param props Properties to be applied. + * @return Update file info. + * @throws IgniteCheckedException If update failed. + */ + public GridGgfsFileInfo updateDual(final IgniteFsFileSystem fs, final IgniteFsPath path, final Map<String, String> props) + throws IgniteCheckedException { + assert fs != null; + assert path != null; + assert props != null && !props.isEmpty(); + + if (busyLock.enterBusy()) { + try { + SynchronizationTask<GridGgfsFileInfo> task = new SynchronizationTask<GridGgfsFileInfo>() { + @Override public GridGgfsFileInfo onSuccess(Map<IgniteFsPath, GridGgfsFileInfo> infos) + throws Exception { + if (infos.get(path) == null) + return null; + + fs.update(path, props); + + assert path.parent() == null || infos.get(path.parent()) != null; + + return updatePropertiesNonTx(infos.get(path.parent()).id(), infos.get(path).id(), path.name(), + props); + } + + @Override public GridGgfsFileInfo onFailure(@Nullab
<TRUNCATED>
