http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java deleted file mode 100644 index 079f92e..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDefaultFileSystemFactoryDelegate.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopFileSystemFactoryDelegate; -import org.apache.ignite.lifecycle.LifecycleAware; - -import java.io.IOException; - -/** - * Hadoop file system factory delegate for non-standard factories. - */ -public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate { - /** Factory. */ - private final HadoopFileSystemFactory factory; - - /** - * Constructor. - * - * @param factory Factory. - */ - public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) { - assert factory != null; - - this.factory = factory; - } - - /** {@inheritDoc} */ - @Override public FileSystem get(String usrName) throws IOException { - return (FileSystem)factory.get(usrName); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - if (factory instanceof LifecycleAware) - ((LifecycleAware)factory).start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - if (factory instanceof LifecycleAware) - ((LifecycleAware)factory).stop(); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java deleted file mode 100644 index 7559836..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegateImpl.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; -import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopFileSystemCounterWriterDelegate; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.T2; - -import java.io.IOException; -import java.io.PrintStream; -import java.util.Map; - -/** - * Counter writer delegate implementation. - */ -@SuppressWarnings("unused") -public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSystemCounterWriterDelegate { - /** */ - private static final String USER_MACRO = "${USER}"; - - /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; - - /** - * Constructor. - * - * @param proxy Proxy (not used). - */ - public HadoopFileSystemCounterWriterDelegateImpl(IgniteHadoopFileSystemCounterWriter proxy) { - // No-op. - } - - /** {@inheritDoc} */ - public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { - Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); - - final HadoopJobInfo jobInfo = job.info(); - - final HadoopJobId jobId = job.id(); - - for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) - hadoopCfg.set(e.getKey(), e.getValue()); - - String user = jobInfo.user(); - - user = IgfsUtils.fixUserName(user); - - String dir = jobInfo.property(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY); - - if (dir == null) - dir = DEFAULT_COUNTER_WRITER_DIR; - - Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString()); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - try { - hadoopCfg.set(MRJobConfig.USER_NAME, user); - - FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); - - fs.mkdirs(jobStatPath); - - try (PrintStream out = new PrintStream(fs.create( - new Path(jobStatPath, IgniteHadoopFileSystemCounterWriter.PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2<String, Long> evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); - } - - out.flush(); - } - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java deleted file mode 100644 index 3ab4c56..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathExistsException; -import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsParentNotDirectoryException; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; -import org.apache.ignite.igfs.IgfsPathNotFoundException; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopDelegateUtils; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopFileSystemFactoryDelegate; -import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopIgfsSecondaryFileSystemDelegate; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties; -import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable; -import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo; -import org.apache.ignite.internal.processors.igfs.IgfsFileImpl; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * Secondary file system implementation. - */ -@SuppressWarnings("unused") -public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSecondaryFileSystemDelegate { - /** The default user name. It is used if no user context is set. */ - private final String dfltUsrName; - - /** Factory. */ - private final HadoopFileSystemFactoryDelegate factory; - - /** - * Constructor. - * - * @param proxy Proxy. - */ - public HadoopIgfsSecondaryFileSystemDelegateImpl(IgniteHadoopIgfsSecondaryFileSystem proxy) { - assert proxy.getFileSystemFactory() != null; - - dfltUsrName = IgfsUtils.fixUserName(proxy.getDefaultUserName()); - - HadoopFileSystemFactory factory0 = proxy.getFileSystemFactory(); - - if (factory0 == null) - factory0 = new CachingHadoopFileSystemFactory(); - - factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSystemForUser().exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); - - final FileSystem fileSys = fileSystemForUser(); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSystemForUser().rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSystemForUser().delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSystemForUser().mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - try { - if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsPath> res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSystemForUser().listStatus(convert(path)); - - if (statuses == null) - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - - Collection<IgfsFile> res = new ArrayList<>(statuses.length); - - for (FileStatus s : statuses) { - IgfsEntryInfo fsInfo = s.isDirectory() ? - IgfsUtils.createDirectory( - IgniteUuid.randomUuid(), - null, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ) : - IgfsUtils.createFile( - IgniteUuid.randomUuid(), - (int)s.getBlockSize(), - s.getLen(), - null, - null, - false, - properties(s), - s.getAccessTime(), - s.getModificationTime() - ); - - res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSystemForUser().create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - HadoopIgfsProperties props0 = - new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); - - try { - return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize, - (short) replication, blockSize, null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - try { - return fileSystemForUser().append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSystemForUser().getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map<String, String> props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - // By convention directory has blockSize == 0, while file has blockSize > 0: - return isDirectory() ? 0 : (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return props; - } - }; - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException { - try { - // We don't use FileSystem#getUsed() since it counts only the files - // in the filesystem root, not all the files recursively. - fileSystemForUser().setTimes(convert(path), modificationTime, accessTime); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed set times for path: " + path); - } - } - - /** {@inheritDoc} */ - public void start() { - factory.start(); - } - - /** {@inheritDoc} */ - public void stop() { - factory.stop(); - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSystemForUser().getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - return cast(detailMsg, e); - } - - /** - * Cast IO exception to IGFS exception. - * - * @param e IO exception. - * @return IGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsPathNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return IGFS attributes. - */ - private static Map<String, String> properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - HashMap<String, String> res = new HashMap<>(3); - - res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort())); - res.put(IgfsUtils.PROP_USER_NAME, status.getOwner()); - res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup()); - - return res; - } - - /** - * Gets the FileSystem for the current context user. - * @return the FileSystem instance, never null. - */ - private FileSystem fileSystemForUser() { - String user = IgfsUserContext.currentUser(); - - if (F.isEmpty(user)) - user = IgfsUtils.fixUserName(dfltUsrName); - - assert !F.isEmpty(user); - - try { - return (FileSystem)factory.get(user); - } - catch (IOException ioe) { - throw new IgniteException(ioe); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopIgfsSecondaryFileSystemDelegateImpl.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java deleted file mode 100644 index 19c470e..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopKerberosFileSystemFactoryDelegate.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.delegate; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; - -/** - * Kerberos Hadoop file system factory delegate. - */ -public class HadoopKerberosFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate { - /** The re-login interval. */ - private long reloginInterval; - - /** Time of last re-login attempt, in system milliseconds. */ - private volatile long lastReloginTime; - - /** - * Constructor. - * - * @param proxy Proxy. - */ - public HadoopKerberosFileSystemFactoryDelegate(KerberosHadoopFileSystemFactory proxy) { - super(proxy); - } - - /** {@inheritDoc} */ - @Override public FileSystem getWithMappedName(String name) throws IOException { - reloginIfNeeded(); - - return super.getWithMappedName(name); - } - - /** {@inheritDoc} */ - @Override protected FileSystem create(String usrName) throws IOException, InterruptedException { - UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName, - UserGroupInformation.getLoginUser()); - - return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() { - @Override public FileSystem run() throws Exception { - return FileSystem.get(fullUri, cfg); - } - }); - } - - @Override public void start() throws IgniteException { - super.start(); - - KerberosHadoopFileSystemFactory proxy0 = (KerberosHadoopFileSystemFactory)proxy; - - A.ensure(!F.isEmpty(proxy0.getKeyTab()), "keyTab cannot not be empty."); - A.ensure(!F.isEmpty(proxy0.getKeyTabPrincipal()), "keyTabPrincipal cannot not be empty."); - A.ensure(proxy0.getReloginInterval() >= 0, "reloginInterval cannot not be negative."); - - reloginInterval = proxy0.getReloginInterval(); - - try { - UserGroupInformation.setConfiguration(cfg); - UserGroupInformation.loginUserFromKeytab(proxy0.getKeyTabPrincipal(), proxy0.getKeyTab()); - } - catch (IOException ioe) { - throw new IgniteException("Failed login from keytab [keyTab=" + proxy0.getKeyTab() + - ", keyTabPrincipal=" + proxy0.getKeyTabPrincipal() + ']', ioe); - } - } - - /** - * Re-logins the user if needed. - * First, the re-login interval defined in factory is checked. The re-login attempts will be not more - * frequent than one attempt per {@code reloginInterval}. - * Second, {@code UserGroupInformation.checkTGTAndReloginFromKeytab()} method invoked that gets existing - * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login. - * - * <p>This operation expected to be called upon each operation with the file system created with the factory. - * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there - * is no need to invoke it otherwise specially. - * - * @throws IOException If login fails. - */ - private void reloginIfNeeded() throws IOException { - long now = System.currentTimeMillis(); - - if (now >= lastReloginTime + reloginInterval) { - UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - - lastReloginTime = now; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java deleted file mode 100644 index 1ecbee5..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.IOException; -import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.internal.util.GridStringBuilder; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; - -/** - * File system cache utility methods used by Map-Reduce tasks and jobs. - */ -public class HadoopFileSystemCacheUtils { - /** - * A common static factory method. Creates new HadoopLazyConcurrentMap. - * @return a new HadoopLazyConcurrentMap. - */ - public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { - return new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { - @Override public FileSystem createValue(FsCacheKey key) throws IOException { - try { - assert key != null; - - // Explicitly disable FileSystem caching: - URI uri = key.uri(); - - String scheme = uri.getScheme(); - - // Copy the configuration to avoid altering the external object. - Configuration cfg = new Configuration(key.configuration()); - - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); - - cfg.setBoolean(prop, true); - - return FileSystem.get(uri, cfg, key.user()); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Failed to create file system due to interrupt.", e); - } - } - } - ); - } - - /** - * Gets non-null user name as per the Hadoop viewpoint. - * @param cfg the Hadoop job configuration, may be null. - * @return the user name, never null. - */ - private static String getMrHadoopUser(Configuration cfg) throws IOException { - String user = cfg.get(MRJobConfig.USER_NAME); - - if (user == null) - user = IgniteHadoopFileSystem.getFsHadoopUser(); - - return user; - } - - /** - * Common method to get the V1 file system in MapRed engine. - * It gets the filesystem for the user specified in the - * configuration with {@link MRJobConfig#USER_NAME} property. - * The file systems are created and cached in the given map upon first request. - * - * @param uri The file system uri. - * @param cfg The configuration. - * @param map The caching map. - * @return The file system. - * @throws IOException On error. - */ - public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) - throws IOException { - assert map != null; - assert cfg != null; - - final String usr = getMrHadoopUser(cfg); - - assert usr != null; - - if (uri == null) - uri = FileSystem.getDefaultUri(cfg); - - final FileSystem fs; - - try { - final FsCacheKey key = new FsCacheKey(uri, usr, cfg); - - fs = map.getOrCreate(key); - } - catch (IgniteException ie) { - throw new IOException(ie); - } - - assert fs != null; - assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); - - return fs; - } - - /** - * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). - * @param uri0 The uri. - * @param cfg The cfg. - * @return Correct URI. - */ - private static URI fixUri(URI uri0, Configuration cfg) { - if (uri0 == null) - return FileSystem.getDefaultUri(cfg); - - String scheme = uri0.getScheme(); - String authority = uri0.getAuthority(); - - if (authority == null) { - URI dfltUri = FileSystem.getDefaultUri(cfg); - - if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) - return dfltUri; - } - - return uri0; - } - - /** - * Note that configuration is not a part of the key. - * It is used solely to initialize the first instance - * that is created for the key. - */ - public static final class FsCacheKey { - /** */ - private final URI uri; - - /** */ - private final String usr; - - /** */ - private final String equalityKey; - - /** */ - private final Configuration cfg; - - /** - * Constructor - */ - public FsCacheKey(URI uri, String usr, Configuration cfg) { - assert uri != null; - assert usr != null; - assert cfg != null; - - this.uri = fixUri(uri, cfg); - this.usr = usr; - this.cfg = cfg; - - this.equalityKey = createEqualityKey(); - } - - /** - * Creates String key used for equality and hashing. - */ - private String createEqualityKey() { - GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); - - if (uri.getScheme() != null) - sb.a(uri.getScheme().toLowerCase()); - - sb.a("://"); - - if (uri.getAuthority() != null) - sb.a(uri.getAuthority().toLowerCase()); - - return sb.toString(); - } - - /** - * The URI. - */ - public URI uri() { - return uri; - } - - /** - * The User. - */ - public String user() { - return usr; - } - - /** - * The Configuration. - */ - public Configuration configuration() { - return cfg; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - return equalityKey.equals(((FsCacheKey)obj).equalityKey); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return equalityKey.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return equalityKey; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java deleted file mode 100644 index 68c0dc4..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FsConstants; -import org.jetbrains.annotations.Nullable; - -/** - * Utilities for configuring file systems to support the separate working directory per each thread. - */ -public class HadoopFileSystemsUtils { - /** Name of the property for setting working directory on create new local FS instance. */ - public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; - - /** - * Setup wrappers of filesystems to support the separate working directory. - * - * @param cfg Config for setup. - */ - public static void setupFileSystems(Configuration cfg) { - cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); - cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", - HadoopLocalFileSystemV2.class.getName()); - } - - /** - * Gets the property name to disable file system cache. - * @param scheme The file system URI scheme. - * @return The property name. If scheme is null, - * returns "fs.null.impl.disable.cache". - */ - public static String disableFsCachePropertyName(@Nullable String scheme) { - return String.format("fs.%s.impl.disable.cache", scheme); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java deleted file mode 100644 index 681cddb..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.jsr166.ConcurrentHashMap8; - -/** - * Maps values by keys. - * Values are created lazily using {@link ValueFactory}. - * - * Despite of the name, does not depend on any Hadoop classes. - */ -public class HadoopLazyConcurrentMap<K, V extends Closeable> { - /** The map storing the actual values. */ - private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); - - /** The factory passed in by the client. Will be used for lazy value creation. */ - private final ValueFactory<K, V> factory; - - /** Lock used to close the objects. */ - private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); - - /** Flag indicating that this map is closed and cleared. */ - private boolean closed; - - /** - * Constructor. - * @param factory the factory to create new values lazily. - */ - public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { - this.factory = factory; - - assert getClass().getClassLoader() == Ignite.class.getClassLoader(); - } - - /** - * Gets cached or creates a new value of V. - * Never returns null. - * @param k the key to associate the value with. - * @return the cached or newly created value, never null. - * @throws IgniteException on error - */ - public V getOrCreate(K k) { - ValueWrapper w = map.get(k); - - if (w == null) { - closeLock.readLock().lock(); - - try { - if (closed) - throw new IllegalStateException("Failed to create value for key [" + k - + "]: the map is already closed."); - - final ValueWrapper wNew = new ValueWrapper(k); - - w = map.putIfAbsent(k, wNew); - - if (w == null) { - wNew.init(); - - w = wNew; - } - } - finally { - closeLock.readLock().unlock(); - } - } - - try { - V v = w.getValue(); - - assert v != null; - - return v; - } - catch (IgniteCheckedException ie) { - throw new IgniteException(ie); - } - } - - /** - * Clears the map and closes all the values. - */ - public void close() throws IgniteCheckedException { - closeLock.writeLock().lock(); - - try { - if (closed) - return; - - closed = true; - - Exception err = null; - - Set<K> keySet = map.keySet(); - - for (K key : keySet) { - V v = null; - - try { - v = map.get(key).getValue(); - } - catch (IgniteCheckedException ignore) { - // No-op. - } - - if (v != null) { - try { - v.close(); - } - catch (Exception err0) { - if (err == null) - err = err0; - } - } - } - - map.clear(); - - if (err != null) - throw new IgniteCheckedException(err); - } - finally { - closeLock.writeLock().unlock(); - } - } - - /** - * Helper class that drives the lazy value creation. - */ - private class ValueWrapper { - /** Future. */ - private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); - - /** the key */ - private final K key; - - /** - * Creates new wrapper. - */ - private ValueWrapper(K key) { - this.key = key; - } - - /** - * Initializes the value using the factory. - */ - private void init() { - try { - final V v0 = factory.createValue(key); - - if (v0 == null) - throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); - - fut.onDone(v0); - } - catch (Throwable e) { - fut.onDone(e); - } - } - - /** - * Gets the available value or blocks until the value is initialized. - * @return the value, never null. - * @throws IgniteCheckedException on error. - */ - V getValue() throws IgniteCheckedException { - return fut.get(); - } - } - - /** - * Interface representing the factory that creates map values. - * @param <K> the type of the key. - * @param <V> the type of the value. - */ - public interface ValueFactory <K, V> { - /** - * Creates the new value. Should never return null. - * - * @param key the key to create value for - * @return the value. - * @throws IOException On failure. - */ - public V createValue(K key) throws IOException; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java deleted file mode 100644 index cbb007f..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.File; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class HadoopLocalFileSystemV1 extends LocalFileSystem { - /** - * Creates new local file system. - */ - public HadoopLocalFileSystemV1() { - super(new HadoopRawLocalFileSystem()); - } - - /** {@inheritDoc} */ - @Override public File pathToFile(Path path) { - return ((HadoopRawLocalFileSystem)getRaw()).convert(path); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java deleted file mode 100644 index 2484492..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ChecksumFs; -import org.apache.hadoop.fs.DelegateToFileSystem; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.local.LocalConfigKeys; - -import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI; - -/** - * Local file system replacement for Hadoop jobs. - */ -public class HadoopLocalFileSystemV2 extends ChecksumFs { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { - super(new DelegateFS(cfg)); - } - - /** - * Creates new local file system. - * - * @param uri URI. - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { - this(cfg); - } - - /** - * Delegate file system. - */ - private static class DelegateFS extends DelegateToFileSystem { - /** - * Creates new local file system. - * - * @param cfg Configuration. - * @throws IOException If failed. - * @throws URISyntaxException If failed. - */ - public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { - super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return LocalConfigKeys.getServerDefaults(); - } - - /** {@inheritDoc} */ - @Override public boolean isValidName(String src) { - return true; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java deleted file mode 100644 index 0aac4a3..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -/** - * This class lists parameters that can be specified in Hadoop configuration. - * Hadoop configuration can be specified in {@code core-site.xml} file - * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: - * <ul> - * <li> - * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides - * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()} - * IGFS data node configuration property. - * </li> - * <li> - * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If - * {@code true}, then all file system operations will be logged to a file. - * </li> - * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li> - * <li> - * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before - * it gets flushed to log file. Higher values will imply greater performance, but will increase delay - * before record appears in the log file. - * </li> - * <li> - * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data - * node to which client is connected. If {@code true}, file will not be distributed and will be written - * to a single data node. Default value is {@code true}. - * </li> - * <li> - * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to - * local data node if it has enough free space. After some time it can be redistributed across nodes though. - * </li> - * </ul> - * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in - * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. - * <p> - * Sample configuration that can be placed to {@code core-site.xml} file: - * <pre name="code" class="xml"> - * <property> - * <name>fs.igfs.127.0.0.1:10500.log.enabled</name> - * <value>true</value> - * </property> - * <property> - * <name>fs.igfs.127.0.0.1:10500.log.dir</name> - * <value>/home/apache/ignite/log/sampling</value> - * </property> - * <property> - * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name> - * <value>16</value> - * </property> - * </pre> - * Parameters could also be specified per mapreduce job, e.g. - * <pre name="code" class="bash"> - * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4 - * </pre> - * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest - * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. - */ -public class HadoopParameters { - /** Parameter name for control over file colocation write mode. */ - public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; - - /** Parameter name for custom sequential reads before prefetch value. */ - public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = - "fs.igfs.%s.open.sequential_reads_before_prefetch"; - - /** Parameter name for client logger directory. */ - public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; - - /** Parameter name for log batch size. */ - public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; - - /** Parameter name for log enabled flag. */ - public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; - - /** Parameter name for prefer local writes flag. */ - public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java deleted file mode 100644 index b8fc8e7..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; -import java.net.URI; -import java.nio.file.Files; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsConstants; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Local file system implementation for Hadoop. - */ -public class HadoopRawLocalFileSystem extends FileSystem { - /** Working directory for each thread. */ - private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() { - @Override protected Path initialValue() { - return getInitialWorkingDirectory(); - } - }; - - /** - * Converts Hadoop path to local path. - * - * @param path Hadoop path. - * @return Local path. - */ - File convert(Path path) { - checkPath(path); - - if (path.isAbsolute()) - return new File(path.toUri().getPath()); - - return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - return makeQualified(new Path(System.getProperty("user.home"))); - } - - /** {@inheritDoc} */ - @Override public Path getInitialWorkingDirectory() { - File f = new File(System.getProperty("user.dir")); - - return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setConf(conf); - - String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); - - if (initWorkDir != null) - setWorkingDirectory(new Path(initWorkDir)); - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return FsConstants.LOCAL_FS_URI; - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return new FSDataInputStream(new InStream(checkExists(convert(f)))); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, - short replication, long blockSize, Progressable progress) throws IOException { - File file = convert(f); - - if (!overwrite && !file.createNewFile()) - throw new IOException("Failed to create new file: " + f.toUri()); - - return out(file, false, bufSize); - } - - /** - * @param file File. - * @param append Append flag. - * @return Output stream. - * @throws IOException If failed. - */ - private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { - return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), - bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); - } - - /** {@inheritDoc} */ - @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { - return out(convert(f), true, bufSize); - } - - /** {@inheritDoc} */ - @Override public boolean rename(Path src, Path dst) throws IOException { - return convert(src).renameTo(convert(dst)); - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - File file = convert(f); - - if (file.isDirectory() && !recursive) - throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); - - return U.delete(file); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - workDir.set(fixRelativePart(dir)); - - checkPath(dir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workDir.get(); - } - - /** {@inheritDoc} */ - @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { - if(f == null) - throw new IllegalArgumentException("mkdirs path arg is null"); - - Path parent = f.getParent(); - - File p2f = convert(f); - - if(parent != null) { - File parent2f = convert(parent); - - if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) - throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); - - } - - return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - return fileStatus(checkExists(convert(f))); - } - - /** - * @return File status. - */ - private FileStatus fileStatus(File file) throws IOException { - boolean dir = file.isDirectory(); - - java.nio.file.Path path = dir ? null : file.toPath(); - - return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), - /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? - new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); - } - - /** - * @param file File. - * @return Same file. - * @throws FileNotFoundException If does not exist. - */ - private static File checkExists(File file) throws FileNotFoundException { - if (!file.exists()) - throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); - - return file; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - File file = convert(f); - - if (checkExists(file).isFile()) - return new FileStatus[] {fileStatus(file)}; - - File[] files = file.listFiles(); - - FileStatus[] res = new FileStatus[files.length]; - - for (int i = 0; i < res.length; i++) - res[i] = fileStatus(files[i]); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return true; - } - - /** {@inheritDoc} */ - @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { - Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileLinkStatus(Path f) throws IOException { - return getFileStatus(getLinkTarget(f)); - } - - /** {@inheritDoc} */ - @Override public Path getLinkTarget(Path f) throws IOException { - File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); - - return new Path(file.toURI()); - } - - /** - * Input stream. - */ - private static class InStream extends InputStream implements Seekable, PositionedReadable { - /** */ - private final RandomAccessFile file; - - /** - * @param f File. - * @throws IOException If failed. - */ - public InStream(File f) throws IOException { - file = new RandomAccessFile(f, "r"); - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - return file.read(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - return file.read(b, off, len); - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - file.close(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - long pos0 = file.getFilePointer(); - - file.seek(pos); - int res = file.read(buf, off, len); - - file.seek(pos0); - - return res; - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { - if (read(pos, buf, off, len) != len) - throw new IOException(); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - file.seek(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() throws IOException { - return file.getFilePointer(); - } - - /** {@inheritDoc} */ - @Override public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java deleted file mode 100644 index fe43596..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.jetbrains.annotations.Nullable; - -/** - * Facade for communication with grid. - */ -public interface HadoopIgfs { - /** - * Perform handshake. - * - * @param logDir Log directory. - * @return Future with handshake result. - * @throws IgniteCheckedException If failed. - */ - public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; - - /** - * Close connection. - * - * @param force Force flag. - */ - public void close(boolean force); - - /** - * Command to retrieve file info for some IGFS path. - * - * @param path Path to get file info for. - * @return Future for info operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to update file properties. - * - * @param path IGFS path to update properties. - * @param props Properties to update. - * @return Future for update operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Sets last access time and last modification time for a file. - * - * @param path Path to update times. - * @param accessTime Last access time to set. - * @param modificationTime Last modification time to set. - * @throws IgniteCheckedException If failed. - */ - public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, - IOException; - - /** - * Command to rename given path. - * - * @param src Source path. - * @param dest Destination path. - * @return Future for rename operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; - - /** - * Command to delete given path. - * - * @param path Path to delete. - * @param recursive {@code True} if deletion is recursive. - * @return Future for delete operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; - - /** - * Command to get affinity for given path, offset and length. - * - * @param path Path to get affinity for. - * @param start Start position (offset). - * @param len Data length. - * @return Future for affinity command. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, - IOException; - - /** - * Gets path summary. - * - * @param path Path to get summary for. - * @return Future that will be completed when summary is received. - * @throws IgniteCheckedException If failed. - */ - public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to create directories. - * - * @param path Path to create. - * @return Future for mkdirs operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Command to get list of files in directory. - * - * @param path Path to list. - * @return Future for listFiles operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to get directory listing. - * - * @param path Path to list. - * @return Future for listPaths operation. - * @throws IgniteCheckedException If failed. - */ - public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Performs status request. - * - * @return Status response. - * @throws IgniteCheckedException If failed. - */ - public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, - IOException; - - /** - * Command to create file and open it for output. - * - * @param path Path to file. - * @param overwrite If {@code true} then old file contents will be lost. - * @param colocate If {@code true} and called on data node, file will be written on that node. - * @param replication Replication factor. - * @param props File properties for creation. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; - - /** - * Open file for output appending data to the end of a file. - * - * @param path Path to file. - * @param create If {@code true}, file will be created if does not exist. - * @param props File properties. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException, IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java deleted file mode 100644 index d610091..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import org.apache.ignite.IgniteCheckedException; - -/** - * Communication exception indicating a problem between file system and IGFS instance. - */ -public class HadoopIgfsCommunicationException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given throwable as a nested cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public HadoopIgfsCommunicationException(Exception cause) { - super(cause); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - */ - public HadoopIgfsCommunicationException(String msg) { - super(msg); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - * @param cause Cause. - */ - public HadoopIgfsCommunicationException(String msg, Exception cause) { - super(msg, cause); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java deleted file mode 100644 index 014e2a1..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.IOException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.jetbrains.annotations.Nullable; - -/** - * Extended IGFS server interface. - */ -public interface HadoopIgfsEx extends HadoopIgfs { - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param delegate Stream delegate. - * @param lsnr Event listener. - */ - public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param delegate Stream delegate. - */ - public void removeEventListener(HadoopIgfsStreamDelegate delegate); - - /** - * Asynchronously reads specified amount of bytes from opened input stream. - * - * @param delegate Stream delegate. - * @param pos Position to read from. - * @param len Data length to read. - * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining - * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will - * be the result of read future. - * @param outOff Output offset. - * @param outLen Output length. - * @return Read data. - */ - public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len, - @Nullable final byte[] outBuf, final int outOff, final int outLen); - - /** - * Writes data to the stream with given streamId. This method does not return any future since - * no response to write request is sent. - * - * @param delegate Stream delegate. - * @param data Data to write. - * @param off Offset. - * @param len Length. - * @throws IOException If failed. - */ - public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; - - /** - * Close server stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; - - /** - * Flush output stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; - - /** - * The user this Igfs instance works on behalf of. - * @return the user name. - */ - public String user(); -} \ No newline at end of file