http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java new file mode 100644 index 0000000..70a2e06 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -0,0 +1,2224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.ignitefs.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.igfs.IgfsMode.*; +import static org.apache.ignite.internal.IgniteNodeAttributes.*; +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*; + +/** + * Cache-based GGFS implementation. + */ +public final class IgfsImpl implements IgfsEx { + /** Default permissions for file system entry. */ + private static final String PERMISSION_DFLT_VAL = "0777"; + + /** Default directory metadata. */ + private static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); + + /** Handshake message. */ + private final IgfsPaths secondaryPaths; + + /** Cache based structure (meta data) manager. */ + private IgfsMetaManager meta; + + /** Cache based file's data container. */ + private IgfsDataManager data; + + /** FS configuration. */ + private IgfsConfiguration cfg; + + /** Ggfs context. */ + private IgfsContext ggfsCtx; + + /** Event storage manager. */ + private GridEventStorageManager evts; + + /** Local node. */ + private ClusterNode locNode; + + /** Logger. */ + private IgniteLogger log; + + /** Mode resolver. */ + private final IgfsModeResolver modeRslvr; + + /** Connection to the secondary file system. */ + private Igfs secondaryFs; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Writers map. */ + private final ConcurrentHashMap8<IgfsPath, IgfsFileWorker> workerMap = new ConcurrentHashMap8<>(); + + /** Delete futures. */ + private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>(); + + /** Delete message listener. */ + private final GridMessageListener delMsgLsnr = new FormatMessageListener(); + + /** Format discovery listener. */ + private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener(); + + /** Local metrics holder. */ + private final IgfsLocalMetrics metrics = new IgfsLocalMetrics(); + + /** Client log directory. */ + private volatile String logDir; + + /** Message topic. */ + private Object topic; + + /** Eviction policy (if set). */ + private CacheIgfsPerBlockLruEvictionPolicy evictPlc; + + /** + * Creates GGFS instance with given context. + * + * @param ggfsCtx Context. + * @throws IgniteCheckedException In case of error. + */ + IgfsImpl(IgfsContext ggfsCtx) throws IgniteCheckedException { + assert ggfsCtx != null; + + this.ggfsCtx = ggfsCtx; + + cfg = ggfsCtx.configuration(); + log = ggfsCtx.kernalContext().log(IgfsImpl.class); + evts = ggfsCtx.kernalContext().event(); + meta = ggfsCtx.meta(); + data = ggfsCtx.data(); + secondaryFs = cfg.getSecondaryFileSystem(); + + /* Default GGFS mode. */ + IgfsMode dfltMode; + + if (secondaryFs == null) { + if (cfg.getDefaultMode() == PROXY) + throw new IgniteCheckedException("Mode cannot be PROXY if secondary file system hasn't been defined."); + + dfltMode = PRIMARY; + } + else + dfltMode = cfg.getDefaultMode(); + + Map<String, IgfsMode> cfgModes = new LinkedHashMap<>(); + Map<String, IgfsMode> dfltModes = new LinkedHashMap<>(4, 1.0f); + + dfltModes.put("/ignite/primary", PRIMARY); + + if (secondaryFs != null) { + dfltModes.put("/ignite/proxy", PROXY); + dfltModes.put("/ignite/sync", DUAL_SYNC); + dfltModes.put("/ignite/async", DUAL_ASYNC); + } + + cfgModes.putAll(dfltModes); + + if (ggfsCtx.configuration().getPathModes() != null) { + for (Map.Entry<String, IgfsMode> e : ggfsCtx.configuration().getPathModes().entrySet()) { + if (!dfltModes.containsKey(e.getKey())) + cfgModes.put(e.getKey(), e.getValue()); + else + U.warn(log, "Ignoring path mode because it conflicts with Ignite reserved path " + + "(use another path) [mode=" + e.getValue() + ", path=" + e.getKey() + ']'); + } + } + + ArrayList<T2<IgfsPath, IgfsMode>> modes = null; + + if (!cfgModes.isEmpty()) { + modes = new ArrayList<>(cfgModes.size()); + + for (Map.Entry<String, IgfsMode> mode : cfgModes.entrySet()) { + IgfsMode mode0 = secondaryFs == null ? mode.getValue() == PROXY ? PROXY : PRIMARY : mode.getValue(); + + try { + modes.add(new T2<>(new IgfsPath(mode.getKey()), mode0)); + } + catch (IllegalArgumentException e) { + throw new IgniteCheckedException("Invalid path found in mode pattern: " + mode.getKey(), e); + } + } + } + + modeRslvr = new IgfsModeResolver(dfltMode, modes); + + secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, + modeRslvr.modesOrdered()); + + // Check whether GGFS LRU eviction policy is set on data cache. + String dataCacheName = ggfsCtx.configuration().getDataCacheName(); + + for (CacheConfiguration cacheCfg : ggfsCtx.kernalContext().config().getCacheConfiguration()) { + if (F.eq(dataCacheName, cacheCfg.getName())) { + CacheEvictionPolicy evictPlc = cacheCfg.getEvictionPolicy(); + + if (evictPlc != null & evictPlc instanceof CacheIgfsPerBlockLruEvictionPolicy) + this.evictPlc = (CacheIgfsPerBlockLruEvictionPolicy)evictPlc; + + break; + } + } + + topic = F.isEmpty(name()) ? TOPIC_GGFS : TOPIC_GGFS.topic(name()); + + ggfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); + ggfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** + * @return Local node. + */ + private ClusterNode localNode() { + if (locNode == null) + locNode = ggfsCtx.kernalContext().discovery().localNode(); + + return locNode; + } + + /** {@inheritDoc} */ + @Override public void stop() { + busyLock.block(); + + // Clear interrupted flag temporarily. + boolean interrupted = Thread.interrupted(); + + // Force all workers to finish their batches. + for (IgfsFileWorker w : workerMap.values()) + w.cancel(); + + // Wait for all writers to finish their execution. + for (IgfsFileWorker w : workerMap.values()) { + try { + w.join(); + } + catch (InterruptedException e) { + U.error(log, e.getMessage(), e); + } + } + + workerMap.clear(); + + if (secondaryFs instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)secondaryFs); + + ggfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr); + ggfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr); + + if (interrupted) + Thread.currentThread().interrupt(); + } + + /** + * Create batch for the file. + * + * @param path File path in the secondary file system. + * @param out Output stream to that file. + * @return Created batch. + * @throws IgniteCheckedException In case new batch cannot be created. + */ + private IgfsFileWorkerBatch newBatch(final IgfsPath path, OutputStream out) throws IgniteCheckedException { + assert path != null; + assert out != null; + + if (enterBusy()) { + try { + IgfsFileWorkerBatch batch = new IgfsFileWorkerBatch(path, out); + + while (true) { + IgfsFileWorker worker = workerMap.get(path); + + if (worker != null) { + if (worker.addBatch(batch)) // Added batch to active worker. + break; + else + workerMap.remove(path, worker); // Worker is stopping. Remove it from map. + } + else { + worker = new IgfsFileWorker("ggfs-file-worker-" + path) { + @Override protected void onFinish() { + workerMap.remove(path, this); + } + }; + + boolean b = worker.addBatch(batch); + + assert b; + + if (workerMap.putIfAbsent(path, worker) == null) { + worker.start(); + + break; + } + } + } + + return batch; + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IgniteCheckedException("Cannot create new output stream to the secondary file system because GGFS is " + + "stopping: " + path); + } + + /** + * @return {@code True} if entered busy section. + * @throws IgniteException If failed to await caches start. + */ + private boolean enterBusy() { + meta.awaitInit(); + data.awaitInit(); + + return busyLock.enterBusy(); + } + + /** + * Await for any pending finished writes on the children paths. + * + * @param paths Paths to check. + */ + void await(IgfsPath... paths) { + assert paths != null; + + for (Map.Entry<IgfsPath, IgfsFileWorker> workerEntry : workerMap.entrySet()) { + IgfsPath workerPath = workerEntry.getKey(); + + boolean await = false; + + for (IgfsPath path : paths) { + if (workerPath.isSubDirectoryOf(path) || workerPath.isSame(path)) { + await = true; + + break; + } + } + + if (await) { + IgfsFileWorkerBatch batch = workerEntry.getValue().currentBatch(); + + if (batch != null) { + try { + batch.awaitIfFinished(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public IgfsContext context() { + return ggfsCtx; + } + + /** + * @return Mode resolver. + */ + IgfsModeResolver modeResolver() { + return modeRslvr; + } + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return cfg.getName(); + } + + /** {@inheritDoc} */ + @Override public IgfsConfiguration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public IgfsPaths proxyPaths() { + return secondaryPaths; + } + + /** {@inheritDoc} */ + @Override public String clientLogDirectory() { + return logDir; + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + this.logDir = logDir; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public IgfsStatus globalSpace() throws IgniteCheckedException { + if (enterBusy()) { + try { + IgniteBiTuple<Long, Long> space = ggfsCtx.kernalContext().grid().compute().execute( + new IgfsGlobalSpaceTask(name()), null); + + return new IgfsStatus(space.get1(), space.get2()); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get global space because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + if (enterBusy()) { + try { + if (meta.sampling(val)) { + if (val == null) + log.info("Sampling flag has been cleared. All further file system connections will perform " + + "logging depending on their configuration."); + else if (val) + log.info("Sampling flag has been set to \"true\". All further file system connections will " + + "perform logging."); + else + log.info("Sampling flag has been set to \"false\". All further file system connections will " + + "not perform logging."); + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set global sampling flag because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override @Nullable public Boolean globalSampling() { + if (enterBusy()) { + try { + try { + return meta.sampling(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get sampling state.", e); + + return false; + } + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get global sampling flag because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsLocalMetrics localMetrics() { + return metrics; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return data.groupBlockSize(); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("Check file exists: " + path); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + boolean res = false; + + switch (mode) { + case PRIMARY: + res = meta.fileId(path) != null; + + break; + + case DUAL_SYNC: + case DUAL_ASYNC: + res = meta.fileId(path) != null; + + if (!res) + res = secondaryFs.exists(path); + + break; + + default: + assert false : "Unknown mode."; + } + + return res; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("Get file info: " + path); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + IgfsFileInfo info = resolveFileInfo(path, mode); + + if (info == null) + return null; + + return new IgfsFileImpl(path, info, data.groupBlockSize()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get path info because grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary summary(IgfsPath path) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("Calculating path summary: " + path); + + IgniteUuid fileId = meta.fileId(path); + + if (fileId == null) + throw new IgfsFileNotFoundException("Failed to get path summary (path not found): " + path); + + IgfsPathSummary sum = new IgfsPathSummary(path); + + summary0(fileId, sum); + + return sum; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get path summary because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + A.notNull(props, "props"); + A.ensure(!props.isEmpty(), "!props.isEmpty()"); + + if (log.isDebugEnabled()) + log.debug("Set file properties [path=" + path + ", props=" + props + ']'); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + else if (mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + await(path); + + IgfsFileInfo info = meta.updateDual(secondaryFs, path, props); + + if (info == null) + return null; + + return new IgfsFileImpl(path, info, data.groupBlockSize()); + } + + List<IgniteUuid> fileIds = meta.fileIds(path); + + IgniteUuid fileId = fileIds.get(fileIds.size() - 1); + + if (fileId == null) + return null; + + IgniteUuid parentId = fileIds.size() > 1 ? fileIds.get(fileIds.size() - 2) : null; + + IgfsFileInfo info = meta.updateProperties(parentId, fileId, path.name(), props); + + if (info != null) { + if (evts.isRecordable(EVT_GGFS_META_UPDATED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_META_UPDATED, props)); + + return new IgfsFileImpl(path, info, data.groupBlockSize()); + } + else + return null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to update file because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + if (enterBusy()) { + try { + A.notNull(src, "src"); + A.notNull(dest, "dest"); + + if (log.isDebugEnabled()) + log.debug("Rename file [src=" + src + ", dest=" + dest + ']'); + + IgfsMode mode = modeRslvr.resolveMode(src); + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(src); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + src); + + if (src.equals(dest)) + return; // Rename to itself is a no-op. + + // Cannot rename root directory. + if (src.parent() == null) + throw new IgfsInvalidPathException("Failed to rename root directory."); + + // Cannot move directory of upper level to self sub-dir. + if (dest.isSubDirectoryOf(src)) + throw new IgfsInvalidPathException("Failed to rename directory (cannot move directory of " + + "upper level to self sub-dir) [src=" + src + ", dest=" + dest + ']'); + + if (evictExclude(src, mode == PRIMARY) != evictExclude(dest, modeRslvr.resolveMode(dest) == PRIMARY)) + throw new IgfsInvalidPathException("Cannot move file to a path with different eviction " + + "exclude setting (need to copy and remove)"); + + if (!childrenModes.equals(Collections.singleton(PRIMARY))) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + await(src, dest); + + meta.renameDual(secondaryFs, src, dest); + + return; + } + + IgfsPath destParent = dest.parent(); + + // Resolve source file info. + FileDescriptor srcDesc = getFileDescriptor(src); + + // File not found. + if (srcDesc == null || srcDesc.parentId == null) { + if (mode == PRIMARY) + checkConflictWithPrimary(src); + + throw new IgfsFileNotFoundException("Failed to rename (source path not found): " + src); + } + + String srcFileName = src.name(); + + // Resolve destination file info. + FileDescriptor destDesc = getFileDescriptor(dest); + + String destFileName; + + boolean newDest = destDesc == null; + + if (newDest) { + assert destParent != null; + + // Use parent directory for destination parent and destination path name as destination name. + destDesc = getFileDescriptor(destParent); + + // Destination directory doesn't exist. + if (destDesc == null) + throw new IgfsFileNotFoundException("Failed to rename (destination directory does not " + + "exist): " + dest); + + destFileName = dest.name(); + } + else + // Use destination directory for destination parent and source path name as destination name. + destFileName = srcFileName; + + // Can move only into directory, but not into file. + if (destDesc.isFile) + throw new IgfsParentNotDirectoryException("Failed to rename (destination is not a directory): " + + dest); + + meta.move(srcDesc.fileId, srcFileName, srcDesc.parentId, destFileName, destDesc.fileId); + + if (srcDesc.isFile) { // Renamed a file. + if (evts.isRecordable(EVT_GGFS_FILE_RENAMED)) + evts.record(new IgfsEvent( + src, + newDest ? dest : new IgfsPath(dest, destFileName), + localNode(), + EVT_GGFS_FILE_RENAMED)); + } + else { // Renamed a directory. + if (evts.isRecordable(EVT_GGFS_DIR_RENAMED)) + evts.record(new IgfsEvent(src, dest, localNode(), EVT_GGFS_DIR_RENAMED)); + } + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set rename path because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("Deleting file [path=" + path + ", recursive=" + recursive + ']'); + + IgfsMode mode = modeRslvr.resolveMode(path); + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + boolean res = false; + + FileDescriptor desc = getFileDescriptor(path); + + if (childrenModes.contains(PRIMARY)) { + if (desc != null) + res = delete0(desc, path.parent(), recursive); + else if (mode == PRIMARY) + checkConflictWithPrimary(path); + } + + if (childrenModes.contains(DUAL_SYNC) || childrenModes.contains(DUAL_ASYNC)) { + assert secondaryFs != null; + + await(path); + + res |= meta.deleteDual(secondaryFs, path, recursive); + } + + // Record event if needed. + if (res && desc != null) { + if (desc.isFile) { + if (evts.isRecordable(EVT_GGFS_FILE_DELETED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_DELETED)); + } + else if (evts.isRecordable(EVT_GGFS_DIR_DELETED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_DIR_DELETED)); + } + + return res; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set file times because Grid is stopping."); + } + + /** + * Internal procedure for (optionally) recursive file and directory deletion. + * + * @param desc File descriptor of file or directory to delete. + * @param parentPath Parent path. If specified, events will be fired for each deleted file + * or directory. If not specified, events will not be fired. + * @param recursive Recursive deletion flag. + * @return {@code True} if file was successfully deleted. If directory is not empty and + * {@code recursive} flag is false, will return {@code false}. + * @throws IgniteCheckedException In case of error. + */ + private boolean delete0(FileDescriptor desc, @Nullable IgfsPath parentPath, boolean recursive) + throws IgniteCheckedException { + IgfsPath curPath = parentPath == null ? new IgfsPath() : new IgfsPath(parentPath, desc.fileName); + + if (desc.isFile) { + deleteFile(curPath, desc, true); + + return true; + } + else { + if (recursive) { + meta.softDelete(desc.parentId, desc.fileName, desc.fileId); + + return true; + } + else { + Map<String, IgfsListingEntry> infoMap = meta.directoryListing(desc.fileId); + + if (F.isEmpty(infoMap)) { + deleteFile(curPath, desc, true); + + return true; + } + else + // Throw exception if not empty and not recursive. + throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not empty " + + "and recursive flag is not set)"); + } + } + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + mkdirs(path, null); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("Make directories: " + path); + + if (props == null) + props = DFLT_DIR_META; + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + else if (mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + await(path); + + meta.mkdirsDual(secondaryFs, path, props); + + return; + } + + List<IgniteUuid> ids = meta.fileIds(path); + List<String> components = path.components(); + + assert ids.size() == components.size() + 1 : "Components doesn't contain ROOT element" + + " [ids=" + ids + ", components=" + components + ']'; + + IgniteUuid parentId = ROOT_ID; + + IgfsPath curPath = path.root(); + + for (int step = 0, size = components.size(); step < size; step++) { + IgniteUuid fileId = ids.get(step + 1); // Skip the first ROOT element. + + if (fileId == null) { + IgfsFileInfo fileInfo = new IgfsFileInfo(true, props); // Create new directory. + + String fileName = components.get(step); // Get current component name. + + curPath = new IgfsPath(curPath, fileName); + + try { + // Fails only if parent is not a directory or if modified concurrently. + IgniteUuid oldId = meta.putIfAbsent(parentId, fileName, fileInfo); + + fileId = oldId == null ? fileInfo.id() : oldId; // Update node ID. + + if (oldId == null && evts.isRecordable(EVT_GGFS_DIR_CREATED)) + evts.record(new IgfsEvent(curPath, localNode(), EVT_GGFS_DIR_CREATED)); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create directory [path=" + path + ", parentId=" + parentId + + ", fileName=" + fileName + ", step=" + step + ", e=" + e.getMessage() + ']'); + + // Check directory with such name already exists. + IgfsFileInfo stored = meta.info(meta.fileId(parentId, fileName)); + + if (stored == null) + throw new IgfsException(e); + + if (!stored.isDirectory()) + throw new IgfsParentNotDirectoryException("Failed to create directory (parent " + + "element is not a directory)"); + + fileId = stored.id(); // Update node ID. + } + } + + assert fileId != null; + + parentId = fileId; + } + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set file times because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("List directory: " + path); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path); + + Collection<String> files = new HashSet<>(); + + if (childrenModes.contains(DUAL_SYNC) || childrenModes.contains(DUAL_ASYNC)) { + assert secondaryFs != null; + + Collection<IgfsPath> children = secondaryFs.listPaths(path); + + for (IgfsPath child : children) + files.add(child.name()); + } + + IgniteUuid fileId = meta.fileId(path); + + if (fileId != null) + files.addAll(meta.directoryListing(fileId).keySet()); + else if (mode == PRIMARY) { + checkConflictWithPrimary(path); + + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + + return F.viewReadOnly(files, new C1<String, IgfsPath>() { + @Override public IgfsPath apply(String e) { + return new IgfsPath(path, e); + } + }); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set file times because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (log.isDebugEnabled()) + log.debug("List directory details: " + path); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path); + + Collection<IgfsFile> files = new HashSet<>(); + + if (childrenModes.contains(DUAL_SYNC) || childrenModes.contains(DUAL_ASYNC)) { + assert secondaryFs != null; + + Collection<IgfsFile> children = secondaryFs.listFiles(path); + + for (IgfsFile child : children) { + IgfsFileInfo fsInfo = new IgfsFileInfo(cfg.getBlockSize(), child.length(), + evictExclude(path, false), child.properties()); + + files.add(new IgfsFileImpl(child.path(), fsInfo, data.groupBlockSize())); + } + } + + IgniteUuid fileId = meta.fileId(path); + + if (fileId != null) { + IgfsFileInfo info = meta.info(fileId); + + // Handle concurrent deletion. + if (info != null) { + if (info.isFile()) + // If this is a file, return its description. + return Collections.<IgfsFile>singleton(new IgfsFileImpl(path, info, + data.groupBlockSize())); + + // Perform the listing. + for (Map.Entry<String, IgfsListingEntry> e : info.listing().entrySet()) { + IgfsPath p = new IgfsPath(path, e.getKey()); + + files.add(new IgfsFileImpl(p, e.getValue(), data.groupBlockSize())); + } + } + } + else if (mode == PRIMARY) { + checkConflictWithPrimary(path); + + throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); + } + + return files; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set file times because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + return metrics().localSpaceSize(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path) { + return open(path, cfg.getStreamBufferSize(), cfg.getSequentialReadsBeforePrefetch()); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + return open(path, bufSize, cfg.getSequentialReadsBeforePrefetch()); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + A.ensure(bufSize >= 0, "bufSize >= 0"); + A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0"); + + if (log.isDebugEnabled()) + log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']'); + + if (bufSize == 0) + bufSize = cfg.getStreamBufferSize(); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + else if (mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize); + + IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(ggfsCtx, path, desc.info(), + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics); + + if (evts.isRecordable(EVT_GGFS_FILE_OPENED_READ)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_OPENED_READ)); + + return os; + } + + IgfsFileInfo info = meta.info(meta.fileId(path)); + + if (info == null) { + checkConflictWithPrimary(path); + + throw new IgfsFileNotFoundException("File not found: " + path); + } + + if (!info.isFile()) + throw new IgfsInvalidPathException("Failed to open file (not a file): " + path); + + // Input stream to read data from grid cache with separate blocks. + IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(ggfsCtx, path, info, + cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics); + + if (evts.isRecordable(EVT_GGFS_FILE_OPENED_READ)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_OPENED_READ)); + + return os; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to open file because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { + return create0(path, cfg.getStreamBufferSize(), overwrite, null, 0, null, true); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + return create0(path, bufSize, overwrite, null, replication, props, false); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, + @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) { + return create0(path, bufSize, overwrite, affKey, replication, props, false); + } + + /** + * Create new file. + * + * @param path Path. + * @param bufSize Buffer size. + * @param overwrite Overwrite flag. + * @param affKey Affinity key. + * @param replication Replication factor. + * @param props Properties. + * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method. + * @return Output stream. + */ + private IgfsOutputStream create0( + final IgfsPath path, + final int bufSize, + final boolean overwrite, + @Nullable IgniteUuid affKey, + final int replication, + @Nullable Map<String, String> props, + final boolean simpleCreate + ) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + A.ensure(bufSize >= 0, "bufSize >= 0"); + + if (log.isDebugEnabled()) + log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" + + overwrite + ", props=" + props + ']'); + + IgfsMode mode = modeRslvr.resolveMode(path); + + IgfsFileWorkerBatch batch = null; + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + else if (mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + await(path); + + IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate, + props, overwrite, bufSize, (short)replication, groupBlockSize(), affKey); + + batch = newBatch(path, desc.out()); + + IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(), + bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch); + + if (evts.isRecordable(EVT_GGFS_FILE_OPENED_WRITE)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_OPENED_WRITE)); + + return os; + } + + // Re-create parents when working in PRIMARY mode. In DUAL mode this is done by MetaManager. + IgfsPath parent = path.parent(); + + // Create missing parent directories if necessary. + if (parent != null) + mkdirs(parent, props); + + List<IgniteUuid> ids = meta.fileIds(path); + + // Resolve parent ID for file. + IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; + + if (parentId == null) + throw new IgfsInvalidPathException("Failed to resolve parent directory: " + path); + + String fileName = path.name(); + + // Constructs new file info. + IgfsFileInfo info = new IgfsFileInfo(cfg.getBlockSize(), affKey, evictExclude(path, true), + props); + + // Add new file into tree structure. + while (true) { + IgniteUuid oldId = meta.putIfAbsent(parentId, fileName, info); + + if (oldId == null) + break; + + if (!overwrite) + throw new IgfsPathAlreadyExistsException("Failed to create file (file already exists): " + + path); + + IgfsFileInfo oldInfo = meta.info(oldId); + + if (oldInfo.isDirectory()) + throw new IgfsPathAlreadyExistsException("Failed to create file (path points to a " + + "directory): " + path); + + // Remove old file from the tree. + // Only one file is deleted, so we use internal data loader. + deleteFile(path, new FileDescriptor(parentId, fileName, oldId, oldInfo.isFile()), false); + + if (evts.isRecordable(EVT_GGFS_FILE_DELETED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_DELETED)); + } + + if (evts.isRecordable(EVT_GGFS_FILE_CREATED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_CREATED)); + + info = meta.lock(info.id()); + + IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, info, parentId, + bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch); + + if (evts.isRecordable(EVT_GGFS_FILE_OPENED_WRITE)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_OPENED_WRITE)); + + return os; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to create file times because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, boolean create) { + return append(path, cfg.getStreamBufferSize(), create, null); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(final IgfsPath path, final int bufSize, boolean create, + @Nullable Map<String, String> props) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + A.ensure(bufSize >= 0, "bufSize >= 0"); + + if (log.isDebugEnabled()) + log.debug("Open file for appending [path=" + path + ", bufSize=" + bufSize + ", create=" + create + + ", props=" + props + ']'); + + IgfsMode mode = modeRslvr.resolveMode(path); + + IgfsFileWorkerBatch batch = null; + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + else if (mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + + await(path); + + IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize); + + batch = newBatch(path, desc.out()); + + return new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(), + bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch); + } + + List<IgniteUuid> ids = meta.fileIds(path); + + IgfsFileInfo info = meta.info(ids.get(ids.size() - 1)); + + // Resolve parent ID for the file. + IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; + + if (info == null) { + if (!create) { + checkConflictWithPrimary(path); + + throw new IgfsFileNotFoundException("File not found: " + path); + } + + if (parentId == null) + throw new IgfsInvalidPathException("Failed to resolve parent directory: " + path); + + info = new IgfsFileInfo(cfg.getBlockSize(), /**affinity key*/null, evictExclude(path, + mode == PRIMARY), props); + + IgniteUuid oldId = meta.putIfAbsent(parentId, path.name(), info); + + if (oldId != null) + info = meta.info(oldId); + + if (evts.isRecordable(EVT_GGFS_FILE_CREATED)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_CREATED)); + } + + if (!info.isFile()) + throw new IgfsInvalidPathException("Failed to open file (not a file): " + path); + + info = meta.lock(info.id()); + + if (evts.isRecordable(EVT_GGFS_FILE_OPENED_WRITE)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_OPENED_WRITE)); + + return new IgfsEventAwareOutputStream(path, info, parentId, bufSize == 0 ? + cfg.getStreamBufferSize() : bufSize, mode, batch); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to append file times because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + if (accessTime == -1 && modificationTime == -1) + return; + + FileDescriptor desc = getFileDescriptor(path); + + if (desc == null) { + checkConflictWithPrimary(path); + + throw new IgfsFileNotFoundException("Failed to update times (path not found): " + path); + } + + // Cannot update times for root. + if (desc.parentId == null) + return; + + meta.updateTimes(desc.parentId, desc.fileId, desc.fileName, accessTime, modificationTime); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to set file times because Grid is stopping."); + } + + /** + * Checks if given path exists in secondary file system and throws exception if so. + * + * @param path Path to check. + * @throws IgniteCheckedException If path exists. + */ + private void checkConflictWithPrimary(IgfsPath path) throws IgniteCheckedException { + if (secondaryFs != null) { + if (secondaryFs.info(path) != null) { + throw new IgniteCheckedException("Path mapped to a PRIMARY mode found in secondary file " + + "system. Remove path from secondary file system or change path mapping: " + path); + } + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + return affinity(path, start, len, 0L); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + A.ensure(start >= 0, "start >= 0"); + A.ensure(len >= 0, "len >= 0"); + + if (log.isDebugEnabled()) + log.debug("Get affinity for file block [path=" + path + ", start=" + start + ", len=" + len + ']'); + + IgfsMode mode = modeRslvr.resolveMode(path); + + if (mode == PROXY) + throw new IgniteException("PROXY mode cannot be used in GGFS directly: " + path); + + // Check memory first. + IgniteUuid fileId = meta.fileId(path); + IgfsFileInfo info = meta.info(fileId); + + if (info == null && mode != PRIMARY) { + assert mode == DUAL_SYNC || mode == DUAL_ASYNC; + assert secondaryFs != null; + + // Synchronize + info = meta.synchronizeFileDual(secondaryFs, path); + } + + if (info == null) + throw new IgfsFileNotFoundException("File not found: " + path); + + if (!info.isFile()) + throw new IgfsInvalidPathException("Failed to get affinity info for file (not a file): " + + path); + + return data.affinity(info, start, len, maxLen); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get affinity because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public IgfsMetrics metrics() { + if (enterBusy()) { + try { + IgfsPathSummary sum = new IgfsPathSummary(); + + summary0(ROOT_ID, sum); + + long secondarySpaceSize = 0; + + if (secondaryFs != null) { + try { + secondarySpaceSize = secondaryFs.usedSpaceSize(); + } + catch (IgniteException e) { + LT.warn(log, e, "Failed to get secondary file system consumed space size."); + + secondarySpaceSize = -1; + } + } + + return new IgfsMetricsAdapter( + ggfsCtx.data().spaceSize(), + ggfsCtx.data().maxSpaceSize(), + secondarySpaceSize, + sum.directoriesCount(), + sum.filesCount(), + metrics.filesOpenedForRead(), + metrics.filesOpenedForWrite(), + metrics.readBlocks(), + metrics.readBlocksSecondary(), + metrics.writeBlocks(), + metrics.writeBlocksSecondary(), + metrics.readBytes(), + metrics.readBytesTime(), + metrics.writeBytes(), + metrics.writeBytesTime()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get metrics because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + metrics.reset(); + } + + /** {@inheritDoc} */ + @Override public long size(IgfsPath path) { + if (enterBusy()) { + try { + A.notNull(path, "path"); + + IgniteUuid nextId = meta.fileId(path); + + if (nextId == null) + return 0; + + IgfsPathSummary sum = new IgfsPathSummary(path); + + summary0(nextId, sum); + + return sum.totalLength(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get path size because Grid is stopping."); + } + + /** + * Calculates size of directory or file for given ID. + * + * @param fileId File ID. + * @param sum Summary object that will collect information. + * @throws IgniteCheckedException If failed. + */ + private void summary0(IgniteUuid fileId, IgfsPathSummary sum) throws IgniteCheckedException { + assert sum != null; + + IgfsFileInfo info = meta.info(fileId); + + if (info != null) { + if (info.isDirectory()) { + if (!ROOT_ID.equals(info.id())) + sum.directoriesCount(sum.directoriesCount() + 1); + + for (IgfsListingEntry entry : info.listing().values()) + summary0(entry.fileId(), sum); + } + else { + sum.filesCount(sum.filesCount() + 1); + sum.totalLength(sum.totalLength() + info.length()); + } + } + } + + /** {@inheritDoc} */ + @Override public void format() { + try { + formatAsync().get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Formats the file system removing all existing entries from it. + * + * @return Future. + */ + IgniteInternalFuture<?> formatAsync() { + try { + IgniteUuid id = meta.softDelete(null, null, ROOT_ID); + + if (id == null) + return new GridFinishedFuture<Object>(ggfsCtx.kernalContext()); + else { + GridFutureAdapter<Object> fut = new GridFutureAdapter<>(ggfsCtx.kernalContext()); + + GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut); + + if (oldFut != null) + return oldFut; + else { + if (!meta.exists(id)) { + // Safety in case response message was received before we put future into collection. + fut.onDone(); + + delFuts.remove(id, fut); + } + + return fut; + } + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<Object>(ggfsCtx.kernalContext(), e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { + Collection<IgniteUuid> ids = meta.pendingDeletes(); + + if (!ids.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Constructing delete future for trash entries: " + ids); + + GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>(ggfsCtx.kernalContext()); + + for (IgniteUuid id : ids) { + GridFutureAdapter<Object> fut = new GridFutureAdapter<>(ggfsCtx.kernalContext()); + + IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut); + + if (oldFut != null) + resFut.add(oldFut); + else { + if (meta.exists(id)) + resFut.add(fut); + else { + fut.onDone(); + + delFuts.remove(id, fut); + } + } + } + + resFut.markInitialized(); + + return resFut; + } + else + return new GridFinishedFuture<>(ggfsCtx.kernalContext()); + } + + /** + * Get file descriptor for specified path. + * + * @param path Path to file. + * @return Detailed file descriptor or {@code null}, if file does not exist. + * @throws IgniteCheckedException If failed. + */ + @Nullable private FileDescriptor getFileDescriptor(IgfsPath path) throws IgniteCheckedException { + List<IgniteUuid> ids = meta.fileIds(path); + IgfsFileInfo fileInfo = meta.info(ids.get(ids.size() - 1)); + + if (fileInfo == null) + return null; // File does not exist. + + // Resolve parent ID for removed file. + IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null; + + return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile()); + } + + /** + * Remove file from the file system (structure and data). + * + * @param path Path of the deleted file. + * @param desc Detailed file descriptor to remove. + * @param rmvLocked Whether to remove this entry in case it is has explicit lock. + * @throws IgniteCheckedException If failed. + */ + private void deleteFile(IgfsPath path, FileDescriptor desc, boolean rmvLocked) throws IgniteCheckedException { + IgniteUuid parentId = desc.parentId; + IgniteUuid fileId = desc.fileId; + + if (parentId == null || ROOT_ID.equals(fileId)) { + assert parentId == null && ROOT_ID.equals(fileId) : "Invalid file descriptor: " + desc; + + return; // Never remove the root directory! + } + + if (TRASH_ID.equals(fileId)) + return; // Never remove trash directory. + + meta.removeIfEmpty(parentId, desc.fileName, fileId, path, rmvLocked); + } + + /** + * Check whether GGFS with the same name exists among provided attributes. + * + * @param attrs Attributes. + * @return {@code True} in case GGFS with the same name exists among provided attributes + */ + private boolean sameGgfs(IgfsAttributes[] attrs) { + if (attrs != null) { + String ggfsName = name(); + + for (IgfsAttributes attr : attrs) { + if (F.eq(ggfsName, attr.ggfsName())) + return true; + } + } + return false; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) { + try { + return executeAsync(task, rslvr, paths, arg).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { + try { + return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { + try { + return executeAsync(taskCls, rslvr, paths, arg).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, + long maxRangeSize, @Nullable T arg) { + try { + return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Executes GGFS task asynchronously. + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return Execution future. + */ + <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) { + return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + } + + /** + * Executes GGFS task with overridden maximum range length (see + * {@link org.apache.ignite.configuration.IgfsConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param task Task to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Optional maximum range length. If {@code 0}, then by default all consecutive + * GGFS blocks will be included. + * @param arg Optional task argument. + * @return Execution future. + */ + <T, R> IgniteInternalFuture<R> executeAsync(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { + return ggfsCtx.kernalContext().task().execute(task, new IgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, + skipNonExistentFiles, maxRangeLen, arg)); + } + + /** + * Executes GGFS task asynchronously. + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param arg Optional task argument. + * @return Execution future. + */ + <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { + return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); + } + + /** + * Executes GGFS task asynchronously with overridden maximum range length (see + * {@link org.apache.ignite.configuration.IgfsConfiguration#getMaximumTaskRangeLength()} for more information). + * + * @param taskCls Task class to execute. + * @param rslvr Optional resolver to control split boundaries. + * @param paths Collection of paths to be processed within this task. + * @param skipNonExistentFiles Whether to skip non existent files. If set to {@code true} non-existent files will + * be ignored. Otherwise an exception will be thrown. + * @param maxRangeLen Maximum range length. + * @param arg Optional task argument. + * @return Execution future. + */ + <T, R> IgniteInternalFuture<R> executeAsync(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) { + return ggfsCtx.kernalContext().task().execute((Class<IgfsTask<T, R>>)taskCls, + new IgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgfsPath path, boolean primary) { + assert path != null; + + try { + // Exclude all PRIMARY files + the ones specified in eviction policy as exclusions. + return primary || evictPlc == null || evictPlc.exclude(path); + } + catch (IgniteCheckedException e) { + LT.error(log, e, "Failed to check whether the path must be excluded from evictions: " + path); + + return false; + } + } + + /** + * Resolve file info for the given path and the given mode. + * + * @param path Path. + * @param mode Mode. + * @return File info or {@code null} in case file is not found. + * @throws IgniteCheckedException If failed. + */ + private IgfsFileInfo resolveFileInfo(IgfsPath path, IgfsMode mode) throws IgniteCheckedException { + assert path != null; + assert mode != null; + + IgfsFileInfo info = null; + + switch (mode) { + case PRIMARY: + info = meta.info(meta.fileId(path)); + + break; + + case DUAL_SYNC: + case DUAL_ASYNC: + info = meta.info(meta.fileId(path)); + + if (info == null) { + IgfsFile status = secondaryFs.info(path); + + if (status != null) + info = status.isDirectory() ? new IgfsFileInfo(true, status.properties()) : + new IgfsFileInfo(status.blockSize(), status.length(), null, null, false, + status.properties()); + } + + break; + + default: + assert false : "Unknown mode: " + mode; + } + + return info; + } + + /** {@inheritDoc} */ + @Override public IgniteFs withAsync() { + return new IgfsAsyncImpl(this); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throw new IllegalStateException("Asynchronous mode is not enabled."); + } + + /** Detailed file descriptor. */ + private static final class FileDescriptor { + /** Parent file ID. */ + @Nullable + private final IgniteUuid parentId; + + /** File name. */ + private final String fileName; + + /** File ID. */ + private final IgniteUuid fileId; + + /** File is plain data file or directory. */ + private final boolean isFile; + + /** + * Constructs detailed file descriptor. + * + * @param parentId Parent file ID. + * @param fileName File name. + * @param fileId File ID. + * @param isFile {@code True} if file. + */ + private FileDescriptor(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, boolean isFile) { + assert fileName != null; + + this.parentId = parentId; + this.fileName = fileName; + + this.fileId = fileId; + this.isFile = isFile; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = parentId != null ? parentId.hashCode() : 0; + + res = 31 * res + fileName.hashCode(); + res = 31 * res + fileId.hashCode(); + res = 31 * res + (isFile ? 1231 : 1237); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + FileDescriptor that = (FileDescriptor)o; + + return fileId.equals(that.fileId) && isFile == that.isFile && fileName.equals(that.fileName) && + (parentId == null ? that.parentId == null : parentId.equals(that.parentId)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FileDescriptor.class, this); + } + } + + /** + * GGFS output stream extension that fires events. + */ + private class IgfsEventAwareOutputStream extends IgfsOutputStreamImpl { + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(false); + + /** + * Constructs file output stream. + * + * @param path Path to stored file. + * @param fileInfo File info. + * @param parentId Parent ID. + * @param bufSize The size of the buffer to be used. + * @param mode GGFS mode. + * @param batch Optional secondary file system batch. + * @throws IgniteCheckedException In case of error. + */ + IgfsEventAwareOutputStream(IgfsPath path, IgfsFileInfo fileInfo, + IgniteUuid parentId, int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch) + throws IgniteCheckedException { + super(ggfsCtx, path, fileInfo, parentId, bufSize, mode, batch, metrics); + + metrics.incrementFilesOpenedForWrite(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") + @Override protected void onClose() throws IOException { + if (closeGuard.compareAndSet(false, true)) { + super.onClose(); + + metrics.decrementFilesOpenedForWrite(); + + if (evts.isRecordable(EVT_GGFS_FILE_CLOSED_WRITE)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_CLOSED_WRITE, bytes())); + } + } + } + + /** + * GGFS input stream extension that fires events. + */ + private class IgfsEventAwareInputStream extends IgfsInputStreamImpl { + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(false); + + /** + * Constructor. + * + * @param ggfsCtx GGFS context. + * @param path Path to stored file. + * @param fileInfo File info. + * @param prefetchBlocks Prefetch blocks. + * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered. + * @param secReader Optional secondary file system reader. + * @param metrics Metrics. + */ + IgfsEventAwareInputStream(IgfsContext ggfsCtx, IgfsPath path, IgfsFileInfo fileInfo, + int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsReader secReader, + IgfsLocalMetrics metrics) { + super(ggfsCtx, path, fileInfo, prefetchBlocks, seqReadsBeforePrefetch, secReader, metrics); + + metrics.incrementFilesOpenedForRead(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") + @Override public void close() throws IOException { + if (closeGuard.compareAndSet(false, true)) { + super.close(); + + metrics.decrementFilesOpenedForRead(); + + if (evts.isRecordable(EVT_GGFS_FILE_CLOSED_READ)) + evts.record(new IgfsEvent(path, localNode(), EVT_GGFS_FILE_CLOSED_READ, bytes())); + } + } + } + + /** + * Space calculation task. + */ + @GridInternal + private static class IgfsGlobalSpaceTask extends ComputeTaskSplitAdapter<Object, IgniteBiTuple<Long, Long>> { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** + * @param ggfsName GGFS name. + */ + private IgfsGlobalSpaceTask(@Nullable String ggfsName) { + this.ggfsName = ggfsName; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJob> res = new ArrayList<>(gridSize); + + for (int i = 0; i < gridSize; i++) { + res.add(new ComputeJobAdapter() { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite g; + + @Nullable @Override public IgniteBiTuple<Long, Long> execute() { + IgniteFs ggfs = ((IgniteKernal)g).context().ggfs().ggfs(ggfsName); + + if (ggfs == null) + return F.t(0L, 0L); + + IgfsMetrics metrics = ggfs.metrics(); + + long loc = metrics.localSpaceSize(); + + return F.t(loc, metrics.maxSpaceSize()); + } + }); + } + + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteBiTuple<Long, Long> reduce(List<ComputeJobResult> results) { + long used = 0; + long max = 0; + + for (ComputeJobResult res : results) { + IgniteBiTuple<Long, Long> data = res.getData(); + + if (data != null) { + used += data.get1(); + max += data.get2(); + } + } + + return F.t(used, max); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + // Never failover. + return ComputeJobResultPolicy.WAIT; + } + } + + /** + * Format message listener required for format action completion. + */ + private class FormatMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof IgfsDeleteMessage) { + ClusterNode node = ggfsCtx.kernalContext().discovery().node(nodeId); + + if (node != null) { + if (sameGgfs((IgfsAttributes[])node.attribute(ATTR_GGFS))) { + IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg; + + try { + msg0.finishUnmarshal(ggfsCtx.kernalContext().config().getMarshaller(), null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e); + + return; + } + + assert msg0.id() != null; + + GridFutureAdapter<?> fut = delFuts.remove(msg0.id()); + + if (fut != null) { + if (msg0.error() == null) + fut.onDone(); + else + fut.onDone(msg0.error()); + } + } + } + } + } + } + + /** + * Discovery listener required for format actions completion. + */ + private class FormatDiscoveryListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + DiscoveryEvent evt0 = (DiscoveryEvent)evt; + + if (evt0.eventNode() != null) { + if (sameGgfs((IgfsAttributes[])evt0.eventNode().attribute(ATTR_GGFS))) { + Collection<IgniteUuid> rmv = new HashSet<>(); + + for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) { + IgniteUuid id = fut.getKey(); + + try { + if (!meta.exists(id)) { + fut.getValue().onDone(); + + rmv.add(id); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to check file existence: " + id, e); + } + } + + for (IgniteUuid id : rmv) + delFuts.remove(id); + } + } + } + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + if (enterBusy()) { + try { + return data.nextAffinityKey(null); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get next affinity key because Grid is stopping."); + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + IgfsMode mode = F.isEmpty(cfg.getPathModes()) ? cfg.getDefaultMode() : + modeRslvr.resolveMode(new IgfsPath(path)); + + return mode == PROXY; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java new file mode 100644 index 0000000..51e57db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.*; + +import java.io.*; + +/** + * Implementation adapter providing necessary methods. + */ +public abstract class IgfsInputStreamAdapter extends IgfsInputStream { + /** {@inheritDoc} */ + @Override public long length() { + return fileInfo().length(); + } + + /** + * Gets file info for opened file. + * + * @return File info. + */ + public abstract IgfsFileInfo fileInfo(); + + /** + * Reads bytes from given position. + * + * @param pos Position to read from. + * @param len Number of bytes to read. + * @return Array of chunks with respect to chunk file representation. + * @throws IOException If read failed. + */ + public abstract byte[][] readChunks(long pos, int len) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamDescriptor.java new file mode 100644 index 0000000..e64ed46 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamDescriptor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import java.io.*; + +/** + * GGFS input stream descriptor - includes stream id and length. + */ +public class IgfsInputStreamDescriptor implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Stream id. */ + private long streamId; + + /** Available length. */ + private long len; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgfsInputStreamDescriptor() { + // No-op. + } + + /** + * Input stream descriptor constructor. + * + * @param streamId Stream id. + * @param len Available length. + */ + public IgfsInputStreamDescriptor(long streamId, long len) { + this.streamId = streamId; + this.len = len; + } + + /** + * @return Stream ID. + */ + public long streamId() { + return streamId; + } + + /** + * @return Available length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(streamId); + out.writeLong(len); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + streamId = in.readLong(); + len = in.readLong(); + } +}
