http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java new file mode 100644 index 0000000..a277fc8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamSelfTestSuite.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import org.apache.ignite.stream.socket.*; + +import junit.framework.*; + +/** + * Stream test suite. + */ +public class IgniteStreamSelfTestSuite extends TestSuite { + /** + * @return Stream tests suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Stream Test Suite"); + + suite.addTest(new TestSuite(SocketStreamerSelfTest.class)); + + return suite; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index 32cd038..1c75a7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -67,7 +67,7 @@ public class IgniteUtilSelfTestSuite extends TestSuite { suite.addTestSuite(GridNioSelfTest.class); suite.addTestSuite(GridNioFilterChainSelfTest.class); suite.addTestSuite(GridNioSslSelfTest.class); - suite.addTestSuite(GridNioDelimitedBufferTest.class); + suite.addTestSuite(GridNioDelimitedBufferSelfTest.class); return suite; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java index b496f60..48991e8 100644 --- a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java +++ b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/gce/TcpDiscoveryGoogleStorageIpFinder.java @@ -68,34 +68,37 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.*; * Note that this finder is shared by default (see {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder#isShared()}. */ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapter { - /* Default object's content. */ + /** Default object's content. */ private final static ByteArrayInputStream OBJECT_CONTENT = new ByteArrayInputStream(new byte[0]); /** Grid logger. */ @LoggerResource private IgniteLogger log; - /* Google Cloud Platform's project name.*/ + /** Google Cloud Platform's project name.*/ private String projectName; - /* Google Storage bucket name. */ + /** Google Storage bucket name. */ private String bucketName; - /* Service account p12 private key file name. */ - private String serviceAccountP12FilePath; + /** Service account p12 private key file name. */ + private String srvcAccountP12FilePath; - /* Service account id. */ - private String serviceAccountId; + /** Service account id. */ + private String srvcAccountId; - /* Google storage. */ + /** Google storage. */ private Storage storage; - /* Init routine guard. */ + /** Init routine guard. */ private final AtomicBoolean initGuard = new AtomicBoolean(); - /* Init routine latch. */ + /** Init routine latch. */ private final CountDownLatch initLatch = new CountDownLatch(1); + /** + * + */ public TcpDiscoveryGoogleStorageIpFinder() { setShared(true); } @@ -221,7 +224,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt */ @IgniteSpiConfiguration(optional = false) public void setServiceAccountP12FilePath(String p12FileName) { - this.serviceAccountP12FilePath = p12FileName; + this.srvcAccountP12FilePath = p12FileName; } /** @@ -235,7 +238,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt */ @IgniteSpiConfiguration(optional = false) public void setServiceAccountId(String id) { - this.serviceAccountId = id; + this.srvcAccountId = id; } /** @@ -245,13 +248,13 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt */ private void init() throws IgniteSpiException { if (initGuard.compareAndSet(false, true)) { - if (serviceAccountId == null || - serviceAccountP12FilePath == null || + if (srvcAccountId == null || + srvcAccountP12FilePath == null || projectName == null || bucketName == null) { throw new IgniteSpiException( "One or more of the required parameters is not set [serviceAccountId=" + - serviceAccountId + ", serviceAccountP12FilePath=" + serviceAccountP12FilePath + ", projectName=" + + srvcAccountId + ", serviceAccountP12FilePath=" + srvcAccountP12FilePath + ", projectName=" + projectName + ", bucketName=" + bucketName + "]"); } @@ -265,12 +268,12 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt throw new IgniteSpiException(e); } - GoogleCredential credential; + GoogleCredential cred; try { - credential = new GoogleCredential.Builder().setTransport(httpTransport) - .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId) - .setServiceAccountPrivateKeyFromP12File(new File(serviceAccountP12FilePath)) + cred = new GoogleCredential.Builder().setTransport(httpTransport) + .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(srvcAccountId) + .setServiceAccountPrivateKeyFromP12File(new File(srvcAccountP12FilePath)) .setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL)).build(); } @@ -279,7 +282,7 @@ public class TcpDiscoveryGoogleStorageIpFinder extends TcpDiscoveryIpFinderAdapt } try { - storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), credential) + storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), cred) .setApplicationName(projectName).build(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 66e9761..d910507 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; /** */ - private static final String DEFAULT_USER_NAME = "anonymous"; - - /** */ public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; /** */ @@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException { - Configuration hadoopCfg = new Configuration(); + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); String user = jobInfo.user(); - if (F.isEmpty(user)) - user = DEFAULT_USER_NAME; + user = IgfsUtils.fixUserName(user); String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); @@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); try { - FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index ba891f8..6a630fb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*; import java.io.*; import java.net.*; @@ -37,15 +38,45 @@ import java.util.*; import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; /** - * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * In fact, this class deals with different FileSystems depending on the user context, + * see {@link IgfsUserContext#currentUser()}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable { - /** Hadoop file system. */ - private final FileSystem fileSys; - - /** Properties of file system */ +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem { + /** Properties of file system, see {@link #properties()} + * + * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} + * See {@link IgfsEx#SECONDARY_FS_URI} + * See {@link IgfsEx#SECONDARY_FS_USER_NAME} + * */ private final Map<String, String> props = new HashMap<>(); + /** Secondary file system provider. */ + private final SecondaryFileSystemProvider secProvider; + + /** The default user name. It is used if no user context is set. */ + private final String dfltUserName; + + /** FileSystem instance created for the default user. + * Stored outside the fileSysLazyMap due to performance reasons. */ + private final FileSystem dfltFs; + + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) { + try { + assert !F.isEmpty(key); + + return secProvider.createFileSystem(key); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + /** * Simple constructor that is to be used by default. * @@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @throws IgniteCheckedException In case of error. */ public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) - throws IgniteCheckedException { + @Nullable String userName) throws IgniteCheckedException { // Treat empty uri and userName arguments as nulls to improve configuration usability: if (F.isEmpty(uri)) uri = null; @@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys if (F.isEmpty(userName)) userName = null; + this.dfltUserName = IgfsUtils.fixUserName(userName); + try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName); + this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); - fileSys = secProvider.createFileSystem(); + // File system creation for the default user name. + // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: + this.dfltFs = secProvider.createFileSystem(dfltUserName); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } - uri = secProvider.uri().toString(); + assert dfltFs != null; - if (!uri.endsWith("/")) - uri += "/"; + uri = secProvider.uri().toString(); - if (cfgPath != null) - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + if (!uri.endsWith("/")) + uri += "/"; - if (userName != null) - props.put(SECONDARY_FS_USER_NAME, userName); + if (cfgPath != null) + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } + props.put(SECONDARY_FS_URI, uri); + props.put(SECONDARY_FS_USER_NAME, dfltUserName); } /** @@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @return Hadoop path. */ private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); + URI uri = fileSysForUser().getUri(); return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); } @@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @param detailMsg Detailed error message. * @return Appropriate exception. */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - return !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); } + return cast(detailMsg, e); + } /** * Cast IO exception to IGFS exception. @@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { try { - return fileSys.exists(convert(path)); + return fileSysForUser().exists(convert(path)); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); @@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + final FileSystem fileSys = fileSysForUser(); + try { if (props0.userName() != null || props0.groupName() != null) fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); @@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public void rename(IgfsPath src, IgfsPath dest) { // Delegate to the secondary file system. try { - if (!fileSys.rename(convert(src), convert(dest))) + if (!fileSysForUser().rename(convert(src), convert(dest))) throw new IgfsException("Failed to rename (secondary file system returned false) " + "[src=" + src + ", dest=" + dest + ']'); } @@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean delete(IgfsPath path, boolean recursive) { try { - return fileSys.delete(convert(path), recursive); + return fileSysForUser().delete(convert(path), recursive); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); @@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path) { try { - if (!fileSys.mkdirs(convert(path))) + if (!fileSysForUser().mkdirs(convert(path))) throw new IgniteException("Failed to make directories [path=" + path + "]"); } catch (IOException e) { @@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { try { - if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); } catch (IOException e) { @@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsPath> listPaths(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsFile> listFiles(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize); + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize); } /** {@inheritDoc} */ @Override public OutputStream create(IgfsPath path, boolean overwrite) { try { - return fileSys.create(convert(path), overwrite); + return fileSysForUser().create(convert(path), overwrite); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); @@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); + return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short)replication, blockSize, null); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + @@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) { try { - return fileSys.append(convert(path), bufSize); + return fileSysForUser().append(convert(path), bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); @@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsFile info(final IgfsPath path) { try { - final FileStatus status = fileSys.getFileStatus(convert(path)); + final FileStatus status = fileSysForUser().getFileStatus(convert(path)); if (status == null) return null; @@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys try { // We don't use FileSystem#getUsed() since it counts only the files // in the filesystem root, not all the files recursively. - return fileSys.getContentSummary(new Path("/")).getSpaceConsumed(); + return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed(); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to get used space size of file system."); @@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys } /** {@inheritDoc} */ - @Nullable @Override public Map<String, String> properties() { + @Override public Map<String, String> properties() { return props; } /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { + @Override public void close() throws IgniteException { + Exception e = null; + try { - fileSys.close(); + dfltFs.close(); } - catch (IOException e) { - throw new IgniteCheckedException(e); + catch (Exception e0) { + e = e0; + } + + try { + fileSysLazyMap.close(); + } + catch (IgniteCheckedException ice) { + if (e == null) + e = ice; } + + if (e != null) + throw new IgniteException(e); } /** * Gets the underlying {@link FileSystem}. + * This method is used solely for testing. * @return the underlying Hadoop {@link FileSystem}. */ public FileSystem fileSystem() { - return fileSys; + return fileSysForUser(); + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSysForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = dfltUserName; // default is never empty. + + assert !F.isEmpty(user); + + if (F.eq(user, dfltUserName)) + return dfltFs; // optimization + + return fileSysLazyMap.getOrCreate(user); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 1f53a06..9d94e5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -97,21 +97,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>(){ - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){ - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; + /** working directory. */ + private Path workingDir; /** Default replication factor. */ private short dfltReplication; @@ -129,6 +116,9 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Secondary URI string. */ private URI secondaryUri; + /** The user name this file system was created on behalf of. */ + private String user; + /** IGFS mode resolver. */ private IgfsModeResolver modeRslvr; @@ -153,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Custom-provided sequential reads before prefetch. */ private int seqReadsBeforePrefetch; - /** The cache was disabled when the instance was creating. */ - private boolean cacheEnabled; - /** {@inheritDoc} */ @Override public URI getUri() { if (uri == null) @@ -182,6 +169,22 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** + * Gets non-null user name as per the Hadoop file system viewpoint. + * @return the user name, never null. + */ + public static String getFsHadoopUser() throws IOException { + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + + String user = currUgi.getShortUserName(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + return user; + } + + /** * Public setter that can be used by direct users of FS or Visor. * * @param colocateFileWrites Whether all ongoing file writes should be colocated. @@ -207,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem { setConf(cfg); - String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); - - cacheEnabled = !cfg.getBoolean(disableCacheName, false); - mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); if (!IGFS_SCHEME.equals(name.getScheme())) @@ -221,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + user = getFsHadoopUser(); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -244,7 +243,7 @@ public class IgniteHadoopFileSystem extends FileSystem { String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -289,13 +288,12 @@ public class IgniteHadoopFileSystem extends FileSystem { String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = secProvider.createFileSystem(user); - secondaryFs = secProvider.createFileSystem(); secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -306,6 +304,9 @@ public class IgniteHadoopFileSystem extends FileSystem { "will have no effect): " + e.getMessage()); } } + + // set working directory to the home directory of the current Fs user: + setWorkingDirectory(null); } finally { leaveBusy(); @@ -337,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override protected void finalize() throws Throwable { super.finalize(); - close0(); + close(); } /** {@inheritDoc} */ @Override public void close() throws IOException { - if (cacheEnabled && get(getUri(), getConf()) == this) - return; - - close0(); + if (closeGuard.compareAndSet(false, true)) + close0(); } /** @@ -354,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem { * @throws IOException If failed. */ private void close0() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (LOG.isDebugEnabled()) - LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); - if (rmtClient == null) - return; + if (rmtClient == null) + return; - super.close(); + super.close(); - rmtClient.close(false); + rmtClient.close(false); - if (clientLog.isLogEnabled()) - clientLog.close(); + if (clientLog.isLogEnabled()) + clientLog.close(); - if (secondaryFs != null) - U.closeQuiet(secondaryFs); + if (secondaryFs != null) + U.closeQuiet(secondaryFs); - // Reset initialized resources. - uri = null; - rmtClient = null; - } + // Reset initialized resources. + uri = null; + rmtClient = null; } /** {@inheritDoc} */ @@ -849,22 +846,11 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); + Path path = new Path("/user/" + user); return path.makeQualified(getUri(), null); } - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(null); - } - /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { if (newPath == null) { @@ -873,7 +859,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - workingDir.set(homeDir); + workingDir = homeDir; } else { Path fixedNewPath = fixRelativePart(newPath); @@ -886,13 +872,13 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - workingDir.set(fixedNewPath); + workingDir = fixedNewPath; } } /** {@inheritDoc} */ @Override public Path getWorkingDirectory() { - return workingDir.get(); + return workingDir; } /** {@inheritDoc} */ @@ -1153,7 +1139,7 @@ public class IgniteHadoopFileSystem extends FileSystem { return null; return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(convert(workingDir.get()), path.toUri().getPath()); + new IgfsPath(convert(workingDir), path.toUri().getPath()); } /** @@ -1191,9 +1177,16 @@ public class IgniteHadoopFileSystem extends FileSystem { */ @SuppressWarnings("deprecation") private FileStatus convert(IgfsFile file) { - return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(), - file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"), + return new FileStatus( + file.length(), + file.isDirectory(), + getDefaultReplication(), + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(PROP_USER_NAME, user), + file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + @@ -1247,4 +1240,12 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 9cfb79b..8330143 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -40,6 +39,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.configuration.FileSystemConfiguration.*; +import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; @@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; + /** The name of the user this File System created on behalf of. */ + private final String user; + /** Working directory. */ private IgfsPath workingDir; /** URI. */ - private URI uri; + private final URI uri; /** Authority. */ private String uriAuthority; @@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; + user = getFsHadoopUser(); + try { initialize(name, cfg); } @@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea throw e; } - workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + workingDir = new IgfsPath("/user/" + user); } /** {@inheritDoc} */ @@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = secProvider.createAbstractFileSystem(user); - secondaryFs = secProvider.createAbstractFileSystem(); secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), + file.property(PROP_USER_NAME, user), file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { @@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } -} + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index d0a327e..2e855d0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 00be422..68a9ef6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.io.*; +import java.net.*; import java.util.*; /** @@ -57,6 +63,41 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } + /** * Wraps native split. * @@ -126,11 +167,13 @@ public class HadoopUtils { break; case PHASE_REDUCE: - assert status.totalReducerCnt() > 0; - setupProgress = 1; mapProgress = 1; - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; break; @@ -300,9 +343,242 @@ public class HadoopUtils { } /** - * Constructor. + * Creates {@link Configuration} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link Configuration}. */ - private HadoopUtils() { - // No-op. + public static Configuration safeCreateConfiguration() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader()); + + try { + return new Configuration(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Creates {@link JobConf} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link JobConf}. + */ + public static JobConf safeCreateJobConf() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader()); + + try { + return new JobConf(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It creates the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * @param uri the file system uri. + * @param cfg the configuration. + * @return the file system + * @throws IOException + */ + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + if (doCacheFs) { + try { + fs = getWithCaching(uri, cfg, usr); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + } + else { + try { + fs = FileSystem.get(uri, cfg, usr); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException(ie); + } + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } + + /** + * Gets FileSystem caching it in static Ignite cache. The cache is a singleton + * for each class loader. + * + * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. + * The Configuration is not a part of the key. This means that for the given key file system is + * initialized only once with the Configuration passed in upon the file system creation. + * + * @param uri The file system URI. + * @param cfg The configuration. + * @param usr The user to create file system for. + * @return The file system: either created, or taken from the cache. + */ + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { + FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + return fileSysLazyMap.getOrCreate(key); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + public static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index 27805f8..dd679de 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -19,26 +19,26 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.security.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; import java.net.*; +import java.security.*; /** * Encapsulates logic of secondary filesystem creation. */ public class SecondaryFileSystemProvider { /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = new Configuration(); + private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); /** The secondary filesystem URI, never null. */ private final URI uri; - /** Optional user name to log into secondary filesystem with. */ - private @Nullable final String userName; - /** * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be * specified either explicitly or in the configuration provided. @@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider { * property in the provided configuration. * @param secConfPath the secondary Fs path (file path on the local file system, optional). * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @param userName User name. * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath, @Nullable String userName) throws IOException { - this.userName = userName; - + final @Nullable String secConfPath) throws IOException { if (secConfPath != null) { URL url = U.resolveIgniteUrl(secConfPath); @@ -79,7 +76,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme()); + String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } @@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider { * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. * @throws IOException */ - public FileSystem createFileSystem() throws IOException { + public FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + final FileSystem fileSys; - if (userName == null) - fileSys = FileSystem.get(uri, cfg); - else { - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + fileSys = FileSystem.get(uri, cfg, userName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IOException("Failed to create file system due to interrupt.", e); - } + throw new IOException("Failed to create file system due to interrupt.", e); } return fileSys; @@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider { /** * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException + * @throws IOException in case of error. */ - public AbstractFileSystem createAbstractFileSystem() throws IOException { - return AbstractFileSystem.get(uri, cfg); + public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + + String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + + UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); + + try { + return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() { + @Override public AbstractFileSystem run() throws IOException { + return AbstractFileSystem.get(uri, cfg); + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", ie); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java deleted file mode 100644 index 509f443..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class HadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index f3f51d4..d90bc28 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.hadoop.fs.v1.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils { public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgniteHadoopFileSystem) - ((IgniteHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof HadoopDistributedFileSystem) - ((HadoopDistributedFileSystem)fs).setUser(userName); - } - - /** * Setup wrappers of filesystems to support the separate working directory. * * @param cfg Config for setup. @@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils { cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); - - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..71b38c4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; +import org.jsr166.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V extends Closeable> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + closed = true; + + Exception err = null; + + Set<K> keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter<V> fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java index 2f19226..b9c5113 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @throws IOException If failed. */ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 44e531e..47ba0e8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** Logger. */ private final Log log; + /** The user this Igfs works on behalf of. */ + private final String user; + /** * Constructor. * * @param igfs Target IGFS. * @param log Log. */ - public HadoopIgfsInProc(IgfsEx igfs, Log log) { + public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { + this.user = IgfsUtils.fixUserName(userName); + this.igfs = igfs; + this.log = log; bufSize = igfs.configuration().getBlockSize() * 2; } /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); + @Override public IgfsHandshakeResponse handshake(final String logDir) { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply() { + igfs.clientLogDirectory(logDir); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + }); } /** {@inheritDoc} */ @@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.info(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.info(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - return igfs.update(path, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.update(path, props); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { try { - igfs.setTimes(path, accessTime, modificationTime); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.setTimes(path, accessTime, modificationTime); + + return null; + } + }); return true; } @@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { try { - igfs.rename(src, dest); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.rename(src, dest); + + return null; + } + }); return true; } @@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { try { - return igfs.delete(path, recursive); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() { + @Override public Boolean apply() { + return igfs.delete(path, recursive); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsStatus fsStatus() throws IgniteCheckedException { try { - return igfs.globalSpace(); + return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() { + @Override public IgfsStatus call() throws IgniteCheckedException { + return igfs.globalSpace(); + } + }); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + "stopping."); } + catch (IgniteCheckedException | RuntimeException | Error e) { + throw e; + } + catch (Exception e) { + throw new AssertionError("Must never go there."); + } } /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listPaths(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply() { + return igfs.listPaths(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listFiles(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply() { + return igfs.listFiles(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - igfs.mkdirs(path, props); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.mkdirs(path, props); + + return null; + } + }); return true; } @@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.summary(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply() { + return igfs.summary(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len) throws IgniteCheckedException { try { - return igfs.affinity(path, start, len); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply() { + return igfs.affinity(path, start, len); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, + final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { if (lsnr0 != null && log.isDebugEnabled()) log.debug("Removed stream event listener [delegate=" + delegate + ']'); } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index 0264e7b..3561e95 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -41,7 +41,7 @@ import java.util.concurrent.locks.*; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public class HadoopIgfsIpcIo implements HadoopIgfsIo { /** Logger. */ - private Log log; + private final Log log; /** Request futures map. */ private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
