[IGNITE-218]: intermediate commit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dedf5385 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dedf5385 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dedf5385 Branch: refs/heads/ignite-218 Commit: dedf5385b6e8ceb5e4924f0dfb60e7e80454709f Parents: 7a659ce Author: iveselovskiy <[email protected]> Authored: Fri Apr 17 14:40:21 2015 +0300 Committer: iveselovskiy <[email protected]> Committed: Fri Apr 17 14:40:21 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 34 +--- .../internal/igfs/common/IgfsMarshaller.java | 32 ---- .../igfs/common/IgfsPathControlRequest.java | 14 +- .../internal/processors/igfs/IgfsUtils.java | 16 ++ .../fs/IgniteHadoopFileSystemCounterWriter.java | 4 +- .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 34 ++-- .../ignite/hadoop/fs/LazyConcurrentMap.java | 177 ------------------ .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 4 +- .../internal/processors/hadoop/HadoopUtils.java | 3 +- .../hadoop/SecondaryFileSystemProvider.java | 10 +- .../hadoop/fs/HadoopFileSystemsUtils.java | 19 -- .../hadoop/fs/HadoopLazyConcurrentMap.java | 179 +++++++++++++++++++ .../hadoop/igfs/HadoopIgfsInProc.java | 2 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 2 +- .../hadoop/igfs/HadoopIgfsWrapper.java | 7 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 23 ++- .../processors/hadoop/v2/HadoopV2Job.java | 3 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 9 +- .../processors/hadoop/HadoopStartup.java | 3 +- parent/pom.xml | 2 +- 20 files changed, 270 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index 926e84d..e48507f 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -18,7 +18,6 @@ package org.apache.ignite.igfs; import org.apache.ignite.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -47,18 +46,12 @@ public abstract class IgfsUserContext { * must return exactly the Exception thrown from the callable. */ public static <T> T doAs(String user, final Callable<T> cllbl) { - // TODO: Use A.ensure(); - if (F.isEmpty(user)) - throw new IllegalArgumentException("Failed to use null or empty user name."); - - // TODO: Remove. - user = user.intern(); + A.ensure(!F.isEmpty(user), "Failed to use null or empty user name."); final String ctxUser = userStackThreadLocal.get(); try { - // TODO: Equals: F.eq - if (ctxUser == user) + if (F.eq(ctxUser, user)) return cllbl.call(); // correct context is already there userStackThreadLocal.set(user); @@ -77,27 +70,12 @@ public abstract class IgfsUserContext { /** * Gets the current context user. - * If this method is invoked outside of any doAs() on call stack, it will return null. - * Note that the returned user name is always interned, so - * you may compare the names using '==' reference equality. - * @return the current user, never null. + * If this method is invoked outside of any {@link #doAs(String, Callable)} on the call stack, it will return null. + * Otherwise it will return the user name set in the most lower {@link #doAs(String, Callable)} call + * on the call stack. + * @return the current user, may be null. */ @Nullable public static String currentUser() { return userStackThreadLocal.get(); } - - /** - * Provides non-null interned user name. - * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, - * which is the current process owner user. - * @param user a user name to be fixed. - * @return non-null interned user name. - */ - // TODO: Move to IgfsUtils. - public static String fixUserName(@Nullable String user) { - if (F.isEmpty(user)) - user = FileSystemConfiguration.DFLT_USER_NAME; - - return user.intern(); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index a4c7830..6a6f22a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -301,8 +301,6 @@ public class IgfsMarshaller { } } - assert msg != null; - msg.command(cmd); return msg; @@ -344,34 +342,4 @@ public class IgfsMarshaller { return null; } - - /** - * Writes string to output. - * - * @param out Data output. - * @param str String. - * @throws IOException If write failed. - */ - private void writeString(DataOutput out, @Nullable String str) throws IOException { - out.writeBoolean(str != null); - - if (str != null) - out.writeUTF(str); - } - - /** - * Reads string from input. - * - * @param in Data input. - * @return Read string. - * @throws IOException If read failed. - */ - @Nullable private String readString(DataInput in) throws IOException { - boolean hasStr = in.readBoolean(); - - if (hasStr) - return in.readUTF(); - - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index cfc8f16..2f6e6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.igfs.common; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -63,7 +64,7 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; - // TODO: COmments. + /** The user name this control request is made on behalf of. */ private String userName; /** @@ -239,14 +240,21 @@ public class IgfsPathControlRequest extends IgfsMessage { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } - // TODO: COmments. + /** + * Getter for the user name. + * @return user name. + */ public final String userName() { assert userName != null; return userName; } + /** + * Setter for the user name. + * @param userName the user name. + */ public final void userName(String userName) { - this.userName = IgfsUserContext.fixUserName(userName); + this.userName = IgfsUtils.fixUserName(userName); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 2a915ec..558ef8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; import java.lang.reflect.*; @@ -83,4 +85,18 @@ public class IgfsUtils { private IgfsUtils() { // No-op. } + + /** + * Provides non-null interned user name. + * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, + * which is the current process owner user. + * @param user a user name to be fixed. + * @return non-null interned user name. + */ + public static String fixUserName(@Nullable String user) { + if (F.isEmpty(user)) + user = FileSystemConfiguration.DFLT_USER_NAME; + + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 3b8c28e..821acdb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -79,8 +79,8 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg)) { fs.mkdirs(jobStatPath); - // TODO: OUt-of-bound - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { + try (PrintStream out = new PrintStream(fs.create( + new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { for (T2<String, Long> evt : perfCntr.evts()) { out.print(evt.get1()); out.print(':'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index a6db645..024cc68 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -24,12 +24,12 @@ import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import org.apache.ignite.hadoop.fs.LazyConcurrentMap.*; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*; import java.io.*; import java.net.*; @@ -57,8 +57,12 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** The default user name. It is used if no user context is set. */ private final String dfltUserName; + /** FileSystem instance created for the default user. + * Stored outside the fileSysLazyMap due to performance reasons. */ + private final FileSystem dfltFs; + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private final LazyConcurrentMap<String, FileSystem> fileSysLazyMap = new LazyConcurrentMap<>( + private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( new ValueFactory<String, FileSystem>() { @Override public FileSystem createValue(String key) { try { @@ -116,20 +120,20 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys if (F.isEmpty(userName)) userName = null; - this.dfltUserName = IgfsUserContext.fixUserName(userName); + this.dfltUserName = IgfsUtils.fixUserName(userName); try { this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); + + // File system creation for the default user name. + // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: + this.dfltFs = secProvider.createFileSystem(dfltUserName); } catch (IOException e) { throw new IgniteCheckedException(e); } - // Test filesystem creation for the default user name. - // The value is stored in the 'fileSysLazyMap' cache. - FileSystem fileSys = fileSysLazyMap.getOrCreate(dfltUserName); - - assert fileSys != null; + assert dfltFs != null; uri = secProvider.uri().toString(); @@ -351,9 +355,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { - // TODO: Out of bounds. - return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); + return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short)replication, blockSize, null); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + @@ -465,13 +468,11 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - final LazyConcurrentMap<String,FileSystem> map = fileSysLazyMap; + final HadoopLazyConcurrentMap<String,FileSystem> map = fileSysLazyMap; if (map == null) return; // already cleared. - fileSysLazyMap = null; // 'this' will be unusable after #close(). - List<IOException> ioExs = new LinkedList<>(); Set<String> keySet = map.keySet(); @@ -516,6 +517,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys assert !F.isEmpty(user); + if (F.eq(user, dfltUserName)) + return dfltFs; // optimization + return fileSysLazyMap.getOrCreate(user); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java deleted file mode 100644 index 4c592af..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; -import org.jsr166.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Maps values by keys. - * Values are created lazily using {@link ValueFactory}. - */ -// TODO: Remove from public. -// TODO: Consistent naming (Hadoop prefix if in Hadoop module). -public class LazyConcurrentMap<K, V> { - /** The map storing the actual values. */ - private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); - - /** The factory passed in by the client. Will be used for lazy value creation. */ - private final ValueFactory<K, V> factory; - - /** - * Constructor. - * @param factory the factory to create new values lazily. - */ - public LazyConcurrentMap(ValueFactory<K, V> factory) { - this.factory = factory; - } - - /** - * Gets cached or creates a new value of V. - * Never returns null. - * @param k the key to associate the value with. - * @return the cached or newly created value, never null. - * @throws IgniteException on error - */ - public V getOrCreate(K k) { - final ValueWrapper wNew = new ValueWrapper(k); - - ValueWrapper w = map.putIfAbsent(k, wNew); - - if (w == null) { - // new wrapper 'w' has been put, so init the value: - wNew.init(); - - w = wNew; - } - - try { - V v = w.getValue(); - - assert v != null; - - return v; - } - catch (InterruptedException ie) { - throw new IgniteException(ie); - } - } - - /** - * Gets the value without any attempt to create a new one. - * @param k the key - * @return the value, or null if there is no value for this key. - */ - public @Nullable V get(K k) { - ValueWrapper w = map.get(k); - - if (w == null) - return null; - - try { - return w.getValue(); - } - catch (InterruptedException ie) { - throw new IgniteException(ie); - } - } - - /** - * Gets the keySet of this map, - * the contract is as per {@link ConcurrentMap#keySet()} - * @return the set of keys, never null. - */ - public Set<K> keySet() { - return map.keySet(); - } - - /** - * Clears the map. - * Follows the contract of {@link ConcurrentMap#clear()} - */ - public void clear() { - map.clear(); - } - - - /** - * Helper class that drives the lazy value creation. - */ - private class ValueWrapper { - /** Value creation latch */ - private final CountDownLatch vlueCrtLatch = new CountDownLatch(1); - - /** the key */ - private final K key; - - /** the value */ - private V v; - - /** - * Creates new wrapper. - */ - private ValueWrapper(K key) { - this.key = key; - } - - /** - * Initializes the value using the factory. - */ - private void init() { - final V v0 = factory.createValue(key); - - if (v0 == null) - throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); - - v = v0; - - vlueCrtLatch.countDown(); - } - - /** - * Blocks until the value is initialized. - * @return the value - * @throws InterruptedException - */ - @Nullable V getValue() throws InterruptedException { - // TODO: Use U.await(vlueCrtLatch) instead. - vlueCrtLatch.await(); - - return v; - } - } - - /** - * Interface representing the factory that creates map values. - * @param <K> the type of the key. - * @param <V> the type of the value. - */ - public interface ValueFactory <K, V> { - /** - * Creates the new value. Must never return null. - * @param key the key to create value for - * @return the value. - * @throws IgniteException on failure. - */ - public V createValue(K key); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 6fd6d1b..46c9ba4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -180,7 +180,7 @@ public class IgniteHadoopFileSystem extends FileSystem { String user = null; // ------------------------------------------- - // TODO: Temporary workaround. + // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct // ugi.doAs() closure. @@ -194,7 +194,7 @@ public class IgniteHadoopFileSystem extends FileSystem { user = currUgi.getShortUserName(); } - user = IgfsUserContext.fixUserName(user); + user = IgfsUtils.fixUserName(user); assert user != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 04c5ec2..d493bd4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -126,7 +126,8 @@ public class HadoopUtils { break; case PHASE_REDUCE: - // TODO: Create ticket: why PHASE_REDUCE could have 0 reducers. + // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers? + // See https://issues.apache.org/jira/browse/IGNITE-764 setupProgress = 1; mapProgress = 1; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index e49da8e..26fead9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.security.*; -import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -54,9 +54,7 @@ public class SecondaryFileSystemProvider { * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath/*, @Nullable String userName*/) throws IOException { - //this.userName = userName; - + final @Nullable String secConfPath) throws IOException { if (secConfPath != null) { URL url = U.resolveIgniteUrl(secConfPath); @@ -92,7 +90,7 @@ public class SecondaryFileSystemProvider { * @throws IOException */ public FileSystem createFileSystem(String userName) throws IOException { - userName = IgfsUserContext.fixUserName(userName); + userName = IgfsUtils.fixUserName(userName); final FileSystem fileSys; @@ -117,7 +115,7 @@ public class SecondaryFileSystemProvider { * @throws IOException in case of error. */ public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { - userName = IgfsUserContext.fixUserName(userName); + userName = IgfsUtils.fixUserName(userName); // if (userName == null) // return AbstractFileSystem.get(uri, cfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index 7631ae9..d90bc28 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.hadoop.fs.v1.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -29,20 +27,6 @@ public class HadoopFileSystemsUtils { /** Name of the property for setting working directory on create new local FS instance. */ public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; -// /** -// * Set user name and default working directory for current thread if it's supported by file system. -// * -// * @param fs File system. -// * @param userName User name. -// */ -// @Deprecated // TODO: remove this method. -// public static void setUser(FileSystem fs, String userName) { -// if (fs instanceof IgniteHadoopFileSystem) -// ((IgniteHadoopFileSystem)fs).setUser(userName); -//// else if (fs instanceof HadoopDistributedFileSystem) -//// ((HadoopDistributedFileSystem)fs).setUser(userName); -// } - /** * Setup wrappers of filesystems to support the separate working directory. * @@ -52,8 +36,5 @@ public class HadoopFileSystemsUtils { cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); - -// // TODO: this should be removed: -// cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..cdafdde --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * Currently only {@link #clear()} method can remove a value. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap<K, V> { + /** The map storing the actual values. */ + private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory<K, V> factory; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + final ValueWrapper wNew = new ValueWrapper(k); + + ValueWrapper w = map.putIfAbsent(k, wNew); + + if (w == null) { + // new wrapper 'w' has been put, so init the value: + wNew.init(); + + w = wNew; + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteInterruptedCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Gets the value without any attempt to create a new one. + * @param k the key + * @return the value, or null if there is no value for this key. + */ + public @Nullable V get(K k) { + ValueWrapper w = map.get(k); + + if (w == null) + return null; + + try { + return w.getValue(); + } + catch (IgniteInterruptedCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Gets the keySet of this map, + * the contract is as per {@link ConcurrentMap#keySet()} + * @return the set of keys, never null. + */ + public Set<K> keySet() { + return map.keySet(); + } + + /** + * Clears the map. + * Follows the contract of {@link ConcurrentMap#clear()} + */ + public void clear() { + map.clear(); + } + + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Value creation latch */ + private final CountDownLatch vlueCrtLatch = new CountDownLatch(1); + + /** the key */ + private final K key; + + /** the value */ + private V v; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + v = v0; + + vlueCrtLatch.countDown(); + } + + /** + * Blocks until the value is initialized. + * @return the value + * @throws IgniteInterruptedCheckedException if interrupted during wait. + */ + @Nullable V getValue() throws IgniteInterruptedCheckedException { + U.await(vlueCrtLatch); + + return v; + } + } + + /** + * Interface representing the factory that creates map values. + * @param <K> the type of the key. + * @param <V> the type of the value. + */ + public interface ValueFactory <K, V> { + /** + * Creates the new value. Must never return null. + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 771388a..ed7f296 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -56,7 +56,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { * @param log Log. */ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { - this.user = IgfsUserContext.fixUserName(userName); + this.user = IgfsUtils.fixUserName(userName); this.igfs = igfs; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 639f2eb..f23c62c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -141,7 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; - this.userName = IgfsUserContext.fixUserName(user); + this.userName = IgfsUtils.fixUserName(user); io = HadoopIgfsIpcIo.get(log, endpoint); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index b650318..7d0db49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -66,8 +66,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ - // TODO: Out of bounds. - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) throws IOException { + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { try { this.authority = authority; this.endpoint = new HadoopIgfsEndpoint(authority); @@ -373,7 +373,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { } catch (IOException | IgniteCheckedException e) { if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + if (hadoop != null) + hadoop.close(true); if (log.isDebugEnabled()) log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index b47bedd..131c870 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -126,15 +126,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { else { // do the call in the context of 'user': try { - final String ticketCachePath; - - if (job instanceof HadoopV2Job) { - Configuration conf = ((HadoopV2Job)job).jobConf(); - - ticketCachePath = conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); - } - else - ticketCachePath = job.info().property(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); @@ -150,6 +142,19 @@ public abstract class HadoopRunnableTask implements Callable<Void> { } /** + * Gets the job property. + */ + private String getJobProperty(String key) { + if (job instanceof HadoopV2Job) { + Configuration conf = ((HadoopV2Job)job).jobConf(); + + return conf.get(key); + } + else + return job.info().property(key); + } + + /** * Runnable task call implementation * @return null. * @throws IgniteCheckedException http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 0d40f34..8ffab14 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop.v2; import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.JobID; @@ -323,7 +322,7 @@ public class HadoopV2Job implements HadoopJob { /** * Getter for job configuration. - * @return + * @return the job configuration */ public JobConf jobConf() { return jobConf; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index f75425e..7dcd10b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -166,12 +166,15 @@ public class HadoopV2JobResourceManager { // TODO: Out of bounds. try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) { if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + + throw new IgniteCheckedException("Failed to find map-reduce " + + "submission directory (does not exist): " + stagingDir); if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + throw new IgniteCheckedException("Failed to copy job submission " + + "directory contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + + ", jobId=" + jobId + ']'); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java index 1d398b5..1a93223 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java @@ -39,8 +39,7 @@ public class HadoopStartup { public static Configuration configuration() { Configuration cfg = new Configuration(); - // TODO: Remove. - cfg.set("fs.defaultFS", "igfs://igfs@localhost:10500"); + cfg.set("fs.defaultFS", "igfs://igfs@localhost"); cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dedf5385/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index cb84f7f..661b310 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -35,7 +35,7 @@ <properties> <ignite.edition>fabric</ignite.edition> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.4.1</hadoop.version> <spring.version>4.1.0.RELEASE</spring.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>
