#[IGNITE-218]: save temporary state for review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56cb251f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56cb251f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56cb251f Branch: refs/heads/ignite-218 Commit: 56cb251fa11c23f4390e049c455d0f64388be8a0 Parents: c79d26a Author: iveselovskiy <[email protected]> Authored: Mon Jun 1 12:22:42 2015 +0300 Committer: iveselovskiy <[email protected]> Committed: Mon Jun 1 12:22:42 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopTaskContext.java | 18 ++-- .../fs/IgniteHadoopFileSystemCounterWriter.java | 7 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 24 ++--- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +- .../internal/processors/hadoop/HadoopUtils.java | 1 + .../hadoop/v2/HadoopV2JobResourceManager.java | 6 +- .../hadoop/v2/HadoopV2TaskContext.java | 24 +---- .../hadoop/HadoopAbstractSelfTest.java | 1 - .../processors/hadoop/HadoopMapReduceTest.java | 94 +++++++++++++++++++- .../testsuites/IgniteHadoopTestSuite.java | 2 +- 10 files changed, 122 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index 47c55bd..d0fafc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -190,12 +190,14 @@ public abstract class HadoopTaskContext { public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; /** - * - * @param user - * @param callable - * @param <T> - * @return - * @throws IgniteCheckedException - */ - public abstract <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException; + * Executes a callable on behalf of the specified user. + * In case of embedded task execution the implementation of this method + * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with. + * @param user The user name. + * @param c The callable. + * @param <T> The return type of the Callable. + * @return The result of the callable. + * @throws IgniteCheckedException On any error in callable. + */ + public abstract <T> T runAs(String user, Callable<T> c) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index cb4f19b..bbafcd7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -39,9 +40,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; /** */ - private static final String DEFAULT_USER_NAME = "anonymous"; - - /** */ public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; /** */ @@ -61,8 +59,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter String user = jobInfo.user(); - if (F.isEmpty(user)) - user = DEFAULT_USER_NAME; + user = IgfsUtils.fixUserName(user); String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index bbb8c5f..328120b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; @@ -173,27 +172,16 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** - * Gets non-null and interned user name as per the Hadoop file system viewpoint. + * Gets non-null user name as per the Hadoop file system viewpoint. * @return the user name, never null. */ - public static String getFsHadoopUser(Configuration cfg) throws IOException { + public static String getFsHadoopUser() throws IOException { String user = null; -// // ------------------------------------------- -// // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 -// // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect -// // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct -// // ugi.doAs() closure. -// if (cfg != null) -// user = cfg.get(MRJobConfig.USER_NAME); -// // ------------------------------------------- + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); - if (user == null) { - UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); - - if (currUgi != null) - user = currUgi.getShortUserName(); - } + if (currUgi != null) + user = currUgi.getShortUserName(); user = IgfsUtils.fixUserName(user); @@ -242,7 +230,7 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - user = getFsHadoopUser(cfg); + user = getFsHadoopUser(); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index f3fbe9c..8330143 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; - user = getFsHadoopUser(cfg); + user = getFsHadoopUser(); try { initialize(name, cfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index ca3a6c5..12015af 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -303,6 +303,7 @@ public class HadoopUtils { return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); } + // TODO: after disagniostic & testing leave only one method "safeCreateConfiguration()" /** * * @return http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 7bc0fb0..8f1e1ab 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -95,7 +95,7 @@ public class HadoopV2JobResourceManager { } /** - * Gets non-null and interned user name as per the Hadoop viewpoint. + * Gets non-null user name as per the Hadoop viewpoint. * @param cfg the Hadoop job configuration, may be null. * @return the user name, never null. */ @@ -103,7 +103,9 @@ public class HadoopV2JobResourceManager { String user = cfg.get(MRJobConfig.USER_NAME); if (user == null) - user = IgniteHadoopFileSystem.getFsHadoopUser(cfg); + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + X.println("##### Mr user = [" + user + "]"); // TODO: remove return user; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 0bbe1d7..ee10b01 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -452,7 +452,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext { } } - @Override public <T> T runAs(final String user, final Callable<T> callable) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public <T> T runAs(final String user, final Callable<T> c) throws IgniteCheckedException { String ugiUser; try { UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); @@ -466,16 +467,13 @@ public class HadoopV2TaskContext extends HadoopTaskContext { try { if (F.eq(user, ugiUser)) // if current UGI context user is the same, do direct call: - return callable.call(); + return c.call(); else { - // do the call in the context of 'user': -// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); -// UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); return ugi.doAs(new PrivilegedExceptionAction<T>() { @Override public T run() throws Exception { - return callable.call(); + return c.call(); } }); } @@ -484,18 +482,4 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } - -// /** -// * Gets the job property. -// */ -// private String getJobProperty(String key) { -// if (job instanceof HadoopV2Job) { -// Configuration conf = ((HadoopV2Job)job).jobConf(); -// -// return conf.get(key); -// } -// else -// return job.info().property(key); -// } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index af1a1e1..f41eb17 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index f96eb74..7d09433 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -24,13 +24,13 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; @@ -215,4 +215,96 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; } } + +// /** +// * Startup secondary file system. +// * +// * @throws Exception If failed. +// */ +// private void startUpSecondary() throws Exception { +// FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); +// +// igfsCfg.setDataCacheName("partitioned"); +// igfsCfg.setMetaCacheName("replicated"); +// igfsCfg.setName("igfs-secondary"); +// igfsCfg.setBlockSize(512 * 1024); +// igfsCfg.setDefaultMode(PRIMARY); +// +// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); +// +// endpointCfg.setType(IgfsIpcEndpointType.TCP); +// endpointCfg.setPort(11500); +// +// igfsCfg.setIpcEndpointConfiguration(endpointCfg); +// +// CacheConfiguration cacheCfg = defaultCacheConfiguration(); +// +// cacheCfg.setName("partitioned"); +// cacheCfg.setCacheMode(PARTITIONED); +// cacheCfg.setNearConfiguration(null); +// cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); +// cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); +// cacheCfg.setBackups(0); +// cacheCfg.setAtomicityMode(TRANSACTIONAL); +// +// CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); +// +// metaCacheCfg.setName("replicated1"); +// metaCacheCfg.setCacheMode(REPLICATED); +// metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); +// metaCacheCfg.setAtomicityMode(TRANSACTIONAL); +// +// IgniteConfiguration cfg = new IgniteConfiguration(); +// +// cfg.setGridName("igfs-grid-secondary"); +// +// TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); +// +// discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); +// +// cfg.setDiscoverySpi(discoSpi); +// cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); +// cfg.setFileSystemConfiguration(igfsCfg); +// +// cfg.setLocalHost("127.0.0.1"); +// +// G.start(cfg); +// } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { +// startUpSecondary(); + + super.beforeTest(); + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() { + + FileSystemConfiguration fsCfg = super.igfsConfiguration(); +// +// fsCfg.setName("igfs-secondary"); +// fsCfg.setDefaultMode(PRIMARY); +// +// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); +// +// endpointCfg.setType(IgfsIpcEndpointType.TCP); +// endpointCfg.setPort(11500); +// +// fsCfg.setIpcEndpointConfiguration(endpointCfg); +// +// try { +// +// fsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( +// "igfs://igfs-secondary:[email protected]:11500/", +// "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); +// } +// catch (Exception e) { +// throw new IgniteException(e); +// } + + return fsCfg; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 149decc..179f7f0 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite { downloadHadoop(); downloadHive(); - ClassLoader ldr = new HadoopClassLoader(null, "test"); // TestSuite.class.getClassLoader(); + HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
