IGNITE-1926: IGFS: Implemented local secondary file system.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cf3bea3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cf3bea3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cf3bea3 Branch: refs/heads/ignite-3220-1 Commit: 5cf3bea32a25ccc78641f083aa7f1ac81b4187ba Parents: f5a040a Author: vozerov-gridgain <[email protected]> Authored: Mon Aug 15 13:40:41 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Aug 15 13:40:41 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteFileSystem.java | 27 +- .../igfs/secondary/IgfsSecondaryFileSystem.java | 2 +- .../local/LocalIgfsSecondaryFileSystem.java | 396 ++++++++ .../igfs/secondary/local/package-info.java | 22 + .../ignite/igfs/secondary/package-info.java | 2 +- .../internal/processors/igfs/IgfsFileImpl.java | 20 +- .../processors/igfs/IgfsMetaManager.java | 2 +- .../internal/processors/igfs/IgfsUtils.java | 53 + .../local/LocalFileSystemIgfsFile.java | 134 +++ ...fsSecondaryFileSystemPositionedReadable.java | 65 ++ ...faultIgfsSecondaryFileSystemTestAdapter.java | 117 +++ .../processors/igfs/IgfsAbstractSelfTest.java | 995 +++++++++++-------- .../igfs/IgfsDualAbstractSelfTest.java | 198 ++-- .../igfs/IgfsExUniversalFileSystemAdapter.java | 116 --- ...SecondaryFileSystemDualAbstractSelfTest.java | 76 ++ ...ondaryFileSystemDualAsyncClientSelfTest.java | 28 + ...calSecondaryFileSystemDualAsyncSelfTest.java | 32 + ...condaryFileSystemDualSyncClientSelfTest.java | 28 + ...ocalSecondaryFileSystemDualSyncSelfTest.java | 32 + ...IgfsLocalSecondaryFileSystemTestAdapter.java | 141 +++ .../IgfsSecondaryFileSystemTestAdapter.java | 118 +++ .../igfs/UniversalFileSystemAdapter.java | 109 -- .../ignite/testsuites/IgniteIgfsTestSuite.java | 9 + .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 2 +- .../ignite/igfs/Hadoop1DualAbstractTest.java | 3 +- ...oopFileSystemUniversalFileSystemAdapter.java | 139 --- ...adoopIgfsSecondaryFileSystemTestAdapter.java | 149 +++ 27 files changed, 2066 insertions(+), 949 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java index f9aeb8d..8fb4fcd 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java @@ -34,6 +34,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import org.apache.ignite.igfs.IgfsPathNotFoundException; /** * <b>IG</b>nite <b>F</b>ile <b>S</b>ystem API. It provides a typical file system "view" on a particular cache: @@ -90,7 +91,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * * @param path Path to get information for. * @return Summary object. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path is not found. + * @throws IgfsPathNotFoundException If path is not found. * @throws IgniteException If failed. */ public IgfsPathSummary summary(IgfsPath path) throws IgniteException; @@ -101,7 +102,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param path File path to read. * @return File input stream to read data from. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public IgfsInputStream open(IgfsPath path) throws IgniteException; @@ -112,7 +113,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param bufSize Read buffer size (bytes) or {@code zero} to use default value. * @return File input stream to read data from. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public IgfsInputStream open(IgfsPath path, int bufSize) throws IgniteException; @@ -124,7 +125,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is started. * @return File input stream to read data from. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public IgfsInputStream open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException; @@ -178,7 +179,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param create Create file if it doesn't exist yet. * @return File output stream to append data to. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist and create flag is {@code false}. + * @throws IgfsPathNotFoundException If path doesn't exist and create flag is {@code false}. */ public IgfsOutputStream append(IgfsPath path, boolean create) throws IgniteException; @@ -191,7 +192,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param props File properties to set only in case it file was just created. * @return File output stream to append data to. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist and create flag is {@code false}. + * @throws IgfsPathNotFoundException If path doesn't exist and create flag is {@code false}. */ public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) throws IgniteException; @@ -204,7 +205,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param accessTime Optional last access time to set. Value {@code -1} does not update access time. * @param modificationTime Optional last modification time to set. Value {@code -1} does not update * modification time. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If target was not found. + * @throws IgfsPathNotFoundException If target was not found. * @throws IgniteException If error occurred. */ public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException; @@ -218,7 +219,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param len Size of data in the file to resolve affinity for. * @return Affinity block locations. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteException; @@ -233,7 +234,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param maxLen Maximum length of a single returned block location length. * @return Affinity block locations. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) throws IgniteException; @@ -393,7 +394,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param dest Destination file path. If destination path is a directory, then source file will be placed * into destination directory with original name. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If source file doesn't exist. + * @throws IgfsPathNotFoundException If source file doesn't exist. */ public void rename(IgfsPath src, IgfsPath dest) throws IgniteException; @@ -430,9 +431,9 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * Lists file paths under the specified path. * * @param path Path to list files under. - * @return List of files under the specified path. + * @return List of paths under the specified path. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException; @@ -442,7 +443,7 @@ public interface IgniteFileSystem extends IgniteAsyncSupport { * @param path Path to list files under. * @return List of files under the specified path. * @throws IgniteException In case of error. - * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. + * @throws IgfsPathNotFoundException If path doesn't exist. */ public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException; http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 4d9d255..47a0dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -110,7 +110,7 @@ public interface IgfsSecondaryFileSystem { * Lists file paths under the specified path. * * @param path Path to list files under. - * @return List of files under the specified path. + * @return List of paths under the specified path. * @throws IgniteException In case of error. * @throws org.apache.ignite.igfs.IgfsPathNotFoundException If path doesn't exist. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java new file mode 100644 index 0000000..3d3a350 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs.secondary.local; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemIgfsFile; +import org.apache.ignite.internal.processors.igfs.secondary.local.LocalIgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.jetbrains.annotations.Nullable; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +/** + * Secondary file system which delegates to local file system. + */ +public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware { + /** Default buffer size. */ + private static final int DFLT_BUF_SIZE = 8 * 1024; + + /** Path that will be added to each passed path. */ + private String workDir; + + /** + * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. + * + * @param e Exception to check. + * @param msg Detailed error message. + * @return Appropriate exception. + */ + private IgfsException handleSecondaryFsError(IOException e, String msg) { + if (e instanceof FileNotFoundException) + return new IgfsPathNotFoundException(e); + else + return new IgfsException(msg, e); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return fileForPath(path).exists(); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + throw new UnsupportedOperationException("Update operation is not yet supported."); + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + File srcFile = fileForPath(src); + File destFile = fileForPath(dest); + + if (!srcFile.exists()) + throw new IgfsPathNotFoundException("Failed to perform rename because source path not found: " + src); + + if (srcFile.isDirectory() && destFile.isFile()) + throw new IgfsPathIsNotDirectoryException("Failed to perform rename because destination path is " + + "directory and source path is file [src=" + src + ", dest=" + dest + ']'); + + try { + if (destFile.isDirectory()) + Files.move(srcFile.toPath(), destFile.toPath().resolve(srcFile.getName())); + else if(!srcFile.renameTo(destFile)) + throw new IgfsException("Failed to perform rename (underlying file system returned false) " + + "[src=" + src + ", dest=" + dest + ']'); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to rename [src=" + src + ", dest=" + dest + ']'); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean delete(IgfsPath path, boolean recursive) { + File f = fileForPath(path); + + if (!recursive || !f.isDirectory()) + return f.delete(); + else + return deleteDirectory(f); + } + + /** + * Delete directory recursively. + * + * @param dir Directory. + * @return {@code true} if successful. + */ + private boolean deleteDirectory(File dir) { + File[] entries = dir.listFiles(); + + if (entries != null) { + for (File entry : entries) { + if (entry.isDirectory()) + deleteDirectory(entry); + else if (entry.isFile()) { + if (!entry.delete()) + return false; + } + else + throw new UnsupportedOperationException("Symlink deletion is not yet supported: " + entry); + } + } + + return dir.delete(); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + if (!mkdirs0(fileForPath(path))) + throw new IgniteException("Failed to make directories (underlying file system returned false): " + path); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + mkdirs(path); + } + + /** + * Create directories. + * + * @param dir Directory. + * @return Result. + */ + private boolean mkdirs0(@Nullable File dir) { + if (dir == null) + return true; // Nothing to create. + + if (dir.exists()) + // Already exists, so no-op. + return dir.isDirectory(); + else { + File parentDir = dir.getParentFile(); + + if (!mkdirs0(parentDir)) // Create parent first. + return false; + + boolean res = dir.mkdir(); + + if (!res) + res = dir.exists(); // Tolerate concurrent creation. + + return res; + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + File[] entries = listFiles0(path); + + if (F.isEmpty(entries)) + return Collections.emptySet(); + else { + Collection<IgfsPath> res = U.newHashSet(entries.length); + + for (File entry : entries) + res.add(igfsPath(entry)); + + return res; + } + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + File[] entries = listFiles0(path); + + if (F.isEmpty(entries)) + return Collections.emptySet(); + else { + Collection<IgfsFile> res = U.newHashSet(entries.length); + + for (File entry : entries) { + IgfsFile info = info(igfsPath(entry)); + + if (info != null) + res.add(info); + } + + return res; + } + } + + /** + * Returns an array of File object. Under the specific path. + * + * @param path IGFS path. + * @return Array of File objects. + */ + @Nullable private File[] listFiles0(IgfsPath path) { + File f = fileForPath(path); + + if (!f.exists()) + throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); + else + return f.listFiles(); + } + + /** {@inheritDoc} */ + @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { + try { + FileInputStream in = new FileInputStream(fileForPath(path)); + + return new LocalIgfsSecondaryFileSystemPositionedReadable(in, bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to open file for read: " + path); + } + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, boolean overwrite) { + return create0(path, overwrite, DFLT_BUF_SIZE); + } + + /** {@inheritDoc} */ + @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + return create0(path, overwrite, bufSize); + } + + /** {@inheritDoc} */ + @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + try { + File file = fileForPath(path); + + boolean exists = file.exists(); + + if (exists) + return new BufferedOutputStream(new FileOutputStream(file, true), bufSize); + else { + if (create) + return create0(path, false, bufSize); + else + throw new IgfsPathNotFoundException("Failed to append to file because it doesn't exist: " + path); + } + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to append to file because it doesn't exist: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) { + File f = fileForPath(path); + + if (!f.exists()) + return null; + + boolean isDir = f.isDirectory(); + + if (isDir) + return new LocalFileSystemIgfsFile(path, false, true, 0, f.lastModified(), 0, null); + else + return new LocalFileSystemIgfsFile(path, f.isFile(), false, 0, f.lastModified(), f.length(), null); + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + throw new UnsupportedOperationException("usedSpaceSize operation is not yet supported."); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + if (workDir != null) + workDir = new File(workDir).getAbsolutePath(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + // No-op. + } + + /** + * Get work directory. + * + * @return Work directory. + */ + @Nullable public String getWorkDirectory() { + return workDir; + } + + /** + * Set work directory. + * + * @param workDir Work directory. + */ + public void setWorkDirectory(@Nullable String workDir) { + this.workDir = workDir; + } + + /** + * Create file for IGFS path. + * + * @param path IGFS path. + * @return File object. + */ + private File fileForPath(IgfsPath path) { + if (workDir == null) + return new File(path.toString()); + else { + if ("/".equals(path.toString())) + return new File(workDir); + else + return new File(workDir, path.toString()); + } + } + + /** + * Create IGFS path for file. + * + * @param f File object. + * @return IFGS path. + * @throws IgfsException If failed. + */ + private IgfsPath igfsPath(File f) throws IgfsException { + String path = f.getAbsolutePath(); + + if (workDir != null) { + if (!path.startsWith(workDir)) + throw new IgfsException("Path is not located in the work directory [workDir=" + workDir + + ", path=" + path + ']'); + + path = path.substring(workDir.length(), path.length()); + } + + return new IgfsPath(path); + } + + /** + * Internal create routine. + * + * @param path Path. + * @param overwrite Overwirte flag. + * @param bufSize Buffer size. + * @return Output stream. + */ + private OutputStream create0(IgfsPath path, boolean overwrite, int bufSize) { + File file = fileForPath(path); + + boolean exists = file.exists(); + + if (exists) { + if (!overwrite) + throw new IgfsPathAlreadyExistsException("Failed to create a file because it already exists: " + path); + } + else { + File parent = file.getParentFile(); + + if (!mkdirs0(parent)) + throw new IgfsException("Failed to create parent directory for file (underlying file system " + + "returned false): " + path); + } + + try { + return new BufferedOutputStream(new FileOutputStream(file), bufSize); + } + catch (IOException e) { + throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + ']'); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/package-info.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/package-info.java new file mode 100644 index 0000000..80bdce2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains APIs for IGFS secondary file system. + */ +package org.apache.ignite.igfs.secondary.local; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/igfs/secondary/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/package-info.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/package-info.java index 4914c47..471651f 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/package-info.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/package-info.java @@ -17,6 +17,6 @@ /** * <!-- Package description. --> - * Contains APIs for IGFS secondary file system. + * Contains APIs for IGFS secondary file system base on local file system. */ package org.apache.ignite.igfs.secondary; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java index 9f79f42..984c8f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileImpl.java @@ -66,6 +66,9 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab /** Last modification time. */ private long modificationTime; + /** Flags. */ + private byte flags; + /** Properties. */ private Map<String, String> props; @@ -81,6 +84,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab * which is specified separately. * * @param igfsFile The file to copy. + * @param grpBlockSize Group block size. */ public IgfsFileImpl(IgfsFile igfsFile, long grpBlockSize) { A.notNull(igfsFile, "igfsFile"); @@ -97,25 +101,29 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab this.accessTime = igfsFile.accessTime(); this.modificationTime = igfsFile.modificationTime(); + this.flags = IgfsUtils.flags(igfsFile.isDirectory(), igfsFile.isFile()); } /** * Constructs directory info. * * @param path Path. + * @param info Entry info. + * @param globalGrpBlockSize Global group block size. */ public IgfsFileImpl(IgfsPath path, IgfsEntryInfo info, long globalGrpBlockSize) { A.notNull(path, "path"); A.notNull(info, "info"); this.path = path; + fileId = info.id(); + flags = IgfsUtils.flags(info.isDirectory(), info.isFile()); + if (info.isFile()) { blockSize = info.blockSize(); - assert blockSize > 0; // By contract file must have blockSize > 0, while directory's blockSize == 0. - len = info.length(); grpBlockSize = info.affinityKey() == null ? globalGrpBlockSize : @@ -145,12 +153,12 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab /** {@inheritDoc} */ @Override public boolean isFile() { - return blockSize > 0; + return IgfsUtils.isFile(flags); } /** {@inheritDoc} */ @Override public boolean isDirectory() { - return blockSize == 0; + return IgfsUtils.isDirectory(flags); } /** {@inheritDoc} */ @@ -214,6 +222,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab U.writeStringMap(out, props); out.writeLong(accessTime); out.writeLong(modificationTime); + out.writeByte(flags); } /** @@ -232,6 +241,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab props = U.readStringMap(in); accessTime = in.readLong(); modificationTime = in.readLong(); + flags = in.readByte(); } /** {@inheritDoc} */ @@ -245,6 +255,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab IgfsUtils.writeProperties(rawWriter, props); rawWriter.writeLong(accessTime); rawWriter.writeLong(modificationTime); + rawWriter.writeByte(flags); } /** {@inheritDoc} */ @@ -258,6 +269,7 @@ public final class IgfsFileImpl implements IgfsFile, Externalizable, Binarylizab props = IgfsUtils.readProperties(rawReader); accessTime = rawReader.readLong(); modificationTime = rawReader.readLong(); + flags = rawReader.readByte(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java index 1364491..89cadce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java @@ -1919,7 +1919,7 @@ public class IgfsMetaManager extends IgfsManager { IgfsEntryInfo newInfo = IgfsUtils.createFile( IgniteUuid.randomUuid(), - status.blockSize(), + igfsCtx.configuration().getBlockSize(), status.length(), affKey, createFileLockId(false), http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index a79d965..2e79a98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -120,6 +120,12 @@ public class IgfsUtils { /** Separator between id and name parts in the trash name. */ private static final char TRASH_NAME_SEPARATOR = '|'; + /** Flag: this is a directory. */ + private static final byte FLAG_DIR = 0x1; + + /** Flag: this is a file. */ + private static final byte FLAG_FILE = 0x2; + /** * Static initializer. */ @@ -907,4 +913,51 @@ public class IgfsUtils { return resModes; } + + /** + * Create flags value. + * + * @param isDir Directory flag. + * @param isFile File flag. + * @return Result. + */ + public static byte flags(boolean isDir, boolean isFile) { + byte res = isDir ? FLAG_DIR : 0; + + if (isFile) + res |= FLAG_FILE; + + return res; + } + + /** + * Check whether passed flags represent directory. + * + * @param flags Flags. + * @return {@code True} if this is directory. + */ + public static boolean isDirectory(byte flags) { + return hasFlag(flags, FLAG_DIR); + } + + /** + * Check whether passed flags represent file. + * + * @param flags Flags. + * @return {@code True} if this is file. + */ + public static boolean isFile(byte flags) { + return hasFlag(flags, FLAG_FILE); + } + + /** + * Check whether certain flag is set. + * + * @param flags Flags. + * @param flag Flag to check. + * @return {@code True} if flag is set. + */ + private static boolean hasFlag(byte flags, byte flag) { + return (flags & flag) == flag; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemIgfsFile.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemIgfsFile.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemIgfsFile.java new file mode 100644 index 0000000..5abe4eb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemIgfsFile.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.secondary.local; + +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.jetbrains.annotations.Nullable; + +import java.util.Collections; +import java.util.Map; + +/** + * Implementation of the IgfsFile interface for the local filesystem. + */ +public class LocalFileSystemIgfsFile implements IgfsFile { + /** Path. */ + private final IgfsPath path; + + /** Flags. */ + private final byte flags; + + /** Block size. */ + private final int blockSize; + + /** Modification time. */ + private final long modTime; + + /** Length. */ + private final long len; + + /** Properties. */ + private final Map<String, String> props; + + /** + * @param path IGFS path. + * @param isFile Path is a file. + * @param isDir Path is a directory. + * @param blockSize Block size in bytes. + * @param modTime Modification time in millis. + * @param len File length in bytes. + * @param props Properties. + */ + public LocalFileSystemIgfsFile(IgfsPath path, boolean isFile, boolean isDir, int blockSize, + long modTime, long len, Map<String, String> props) { + + assert !isDir || blockSize == 0 : "blockSize must be 0 for dirs. [blockSize=" + blockSize + ']'; + assert !isDir || len == 0 : "length must be 0 for dirs. [length=" + len + ']'; + + this.path = path; + this.flags = IgfsUtils.flags(isDir, isFile); + this.blockSize = blockSize; + this.modTime = modTime; + this.len = len; + this.props = props; + } + + /** {@inheritDoc} */ + @Override public IgfsPath path() { + return path; + } + + /** {@inheritDoc} */ + @Override public boolean isFile() { + return IgfsUtils.isFile(flags); + } + + /** {@inheritDoc} */ + @Override public boolean isDirectory() { + return IgfsUtils.isDirectory(flags); + } + + /** {@inheritDoc} */ + @Override public int blockSize() { + return blockSize; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return blockSize(); + } + + /** {@inheritDoc} */ + @Override public long accessTime() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long modificationTime() { + return modTime; + } + + /** {@inheritDoc} */ + @Override public String property(String name) throws IllegalArgumentException { + return property(name, null); + } + + /** {@inheritDoc} */ + @Nullable @Override public String property(String name, @Nullable String dfltVal) { + if (props != null) { + String res = props.get(name); + + if (res != null) + return res; + } + + return dfltVal; + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return props != null ? props : Collections.<String, String>emptyMap(); + } + + /** {@inheritDoc} */ + @Override public long length() { + return len; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java new file mode 100644 index 0000000..ebf56ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.secondary.local; + +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * Positioned readable interface for local secondary file system. + */ +public class LocalIgfsSecondaryFileSystemPositionedReadable extends BufferedInputStream + implements IgfsSecondaryFileSystemPositionedReadable { + /** Last read position. */ + private long lastReadPos; + + /** + * Constructor. + * + * @param in Input stream. + * @param bufSize Buffer size. + */ + public LocalIgfsSecondaryFileSystemPositionedReadable(FileInputStream in, int bufSize) { + super(in, bufSize); + } + + /** {@inheritDoc} */ + @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException { + if (in == null) + throw new IOException("Stream is closed."); + + if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) { + ((FileInputStream)in).getChannel().position(readPos); + + pos = 0; + count = 0; + } + + int bytesRead = read(buf, off, len); + + if (bytesRead != -1) { + // Advance last read position only if we really read some bytes from the stream. + lastReadPos = readPos + bytesRead; + } + + return bytesRead; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf3bea3/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java new file mode 100644 index 0000000..7fe587d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.util.typedef.T2; + +/** + * Adapter over {@link IgfsEx} filesystem. + */ +public class DefaultIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFileSystemTestAdapter { + /** The wrapped igfs. */ + private final IgfsEx igfsEx; + + /** + * Constructor. + * @param igfsEx the igfs to be wrapped. + */ + public DefaultIgfsSecondaryFileSystemTestAdapter(IgfsEx igfsEx) { + this.igfsEx = igfsEx; + } + + /** {@inheritDoc} */ + @Override public String name() { + return igfsEx.name(); + } + + /** {@inheritDoc} */ + @Override public boolean exists(String path) { + return igfsEx.exists(new IgfsPath(path)); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(String path) throws IOException { + igfsEx.mkdirs(new IgfsPath(path)); + } + + /** {@inheritDoc} */ + @Override public void format() throws IOException { + igfsEx.format(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public Map<String, String> properties(String path) { + return igfsEx.info(new IgfsPath(path)).properties(); + } + + /** {@inheritDoc} */ + @Override public String permissions(String path) throws IOException { + return properties(path).get(IgfsUtils.PROP_PERMISSION); + } + + /** {@inheritDoc} */ + @Override public boolean delete(String path, boolean recursive) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + return igfsEx.delete(igfsPath, recursive); + } + + /** {@inheritDoc} */ + @Override public InputStream openInputStream(String path) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + return igfsEx.open(igfsPath); + } + + /** {@inheritDoc} */ + @Override public OutputStream openOutputStream(String path, boolean append) throws IOException { + IgfsPath igfsPath = new IgfsPath(path); + + final IgfsOutputStream igfsOutputStream; + if (append) + igfsOutputStream = igfsEx.append(igfsPath, true/*create*/); + else + igfsOutputStream = igfsEx.create(igfsPath, true/*overwrite*/); + + return igfsOutputStream; + } + + /** {@inheritDoc} */ + @Override public T2<Long, Long> times(String path) throws IOException { + IgfsFile info = igfsEx.info(new IgfsPath(path)); + + if (info == null) + throw new IOException("Path not found: " + path); + + return new T2<>(info.accessTime(), info.modificationTime()); + } + + /** {@inheritDoc} */ + @Override public IgfsEx igfs() { + return igfsEx; + } +} \ No newline at end of file
