http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 7dca049..f23c62c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener /** IGFS name. */ private final String igfs; + /** The user this out proc is performing on behalf of. */ + private final String userName; + /** Client log. */ private final Log log; @@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { - this(host, port, grid, igfs, false, log); + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { + this(host, port, grid, igfs, false, log, user); } /** @@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { - this(null, port, grid, igfs, true, log); + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { + this(null, port, grid, igfs, true, log, user); } /** @@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) throws IOException { assert host != null && !shmem || host == null && shmem : "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; @@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; + this.userName = IgfsUtils.fixUserName(user); io = HadoopIgfsIpcIo.get(log, endpoint); @@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(INFO); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(UPDATE); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.accessTime(accessTime); msg.modificationTime(modificationTime); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(RENAME); msg.path(src); msg.destinationPath(dest); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(DELETE); msg.path(path); msg.flag(recursive); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.start(start); msg.length(len); + msg.userName(userName); return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); } @@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(PATH_SUMMARY); msg.path(path); + msg.userName(userName); return io.send(msg).chain(SUMMARY_RES).get(); } @@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(MAKE_DIRECTORIES); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_FILES); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_COL_RES).get(); } @@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_PATHS); msg.path(path); + msg.userName(userName); return io.send(msg).chain(PATH_COL_RES).get(); } @@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(OPEN_READ); msg.path(path); msg.flag(false); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(true); msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.properties(props); msg.replication(replication); msg.blockSize(blockSize); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(create); msg.properties(props); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener } }; } + + /** {@inheritDoc} */ + @Override public String user() { + return userName; + } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index 1dada21..7d0db49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs { /** Logger. */ private final Log log; + /** The user name this wrapper works on behalf of. */ + private final String userName; + /** * Constructor. * @@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { try { this.authority = authority; this.endpoint = new HadoopIgfsEndpoint(authority); this.logDir = logDir; this.conf = conf; this.log = log; + this.userName = user; } catch (IgniteCheckedException e) { throw new IOException("Failed to parse endpoint: " + authority, e); @@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsInProc(igfs, log); + hadoop = new HadoopIgfsInProc(igfs, log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } catch (IOException | IgniteCheckedException e) { if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + if (hadoop != null) + hadoop.close(true); if (log.isDebugEnabled()) log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); @@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { try { hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log); + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 2e04ac1..b170125 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { + ctx = job.getTaskContext(info); + + return ctx.runAsJobOwner(new Callable<Void>() { + @Override public Void call() throws Exception { + call0(); + + return null; + } + }); + } + + /** + * Implements actual task running. + * @throws IgniteCheckedException + */ + void call0() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); Throwable err = null; @@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { HadoopPerformanceCounter perfCntr = null; try { - ctx = job.getTaskContext(info); - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); perfCntr.onTaskSubmit(info, submitTs); @@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { if (ctx != null) ctx.cleanupTaskEnvironment(); } - - return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index d265ca8..d754039 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop.v2; import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.JobID; @@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob { new ConcurrentHashMap8<>(); /** Pooling task context class and thus class loading environment. */ - private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); @@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob { hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader(); - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(clsLdr); - - jobConf = new JobConf(); + jobConf = HadoopUtils.safeCreateJobConf(); HadoopFileSystemsUtils.setupFileSystems(jobConf); @@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { + try { + FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true); + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob { if (old != null) return old.get(); - Class<?> cls = taskCtxClsPool.poll(); + Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); try { if (cls == null) { @@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); - cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); fullCtxClsQueue.add(cls); } @@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void cleanupStagingDirectory() { - if (rsrcMgr != null) - rsrcMgr.cleanupStagingDirectory(); + rsrcMgr.cleanupStagingDirectory(); + } + + /** + * Getter for job configuration. + * @return The job configuration. + */ + public JobConf jobConf() { + return jobConf; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 6f6bfa1..2f64e77 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -40,6 +40,9 @@ import java.util.*; * files are needed to be placed on local files system. */ public class HadoopV2JobResourceManager { + /** File type Fs disable caching property name. */ + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file"); + /** Hadoop job context. */ private final JobContextImpl ctx; @@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager { try { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); - if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) + if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false)) FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); } finally { @@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true); if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + - stagingDir); + throw new IgniteCheckedException("Failed to find map-reduce submission " + + "directory (does not exist): " + stagingDir); if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + throw new IgniteCheckedException("Failed to copy job submission directory " + + "contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + + ", jobId=" + jobId + ']'); } File jarJobFile = new File(jobLocDir, "job.jar"); @@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager { } } else if (!jobLocDir.mkdirs()) - throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + throw new IgniteCheckedException("Failed to create local job directory: " + + jobLocDir.getAbsolutePath()); setLocalFSWorkingDirectory(jobLocDir); } @@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = srcPath.getFileSystem(cfg); + FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); if (!archivesPath.exists() && !archivesPath.mkdir()) throw new IOException("Failed to create directory " + - "[path=" + archivesPath + ", jobId=" + jobId + ']'); + "[path=" + archivesPath + ", jobId=" + jobId + ']'); File archiveFile = new File(archivesPath, locName); @@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) - stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); + HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true); } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index e9c859bd..e89feba 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; +import java.security.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -239,9 +243,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - FileSystem fs = FileSystem.get(jobConf()); - - HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + FileSystem.get(jobConf()); LocalFileSystem locFs = FileSystem.getLocal(jobConf()); @@ -421,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); + try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); @@ -450,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { + String user = job.info().user(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + String ugiUser; + + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + assert currUser != null; + + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return c.call(); + else { + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return c.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java index b94d9d1..b9f8179 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.mapreduce.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * @return Configuration. */ private Configuration config(int port) { - Configuration conf = new Configuration(); + Configuration conf = HadoopUtils.safeCreateConfiguration(); setupFileSystems(conf); @@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { ctx.getCounter(TestCounter.COUNTER2).increment(1); int sum = 0; - for (IntWritable value : values) { + for (IntWritable value : values) sum += value.get(); - } ctx.write(key, new IntWritable(sum)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index d11cabb..9bcd5de 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -39,6 +40,7 @@ import org.jsr166.*; import java.io.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** Thread count for multithreaded tests. */ private static final int THREAD_CNT = 8; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA if (mode != PRIMARY) cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(), - secondaryFileSystemConfigPath())); + secondaryFileSystemConfigPath(), SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); cfg.setManagementPort(-1); @@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath())); - fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + + // Create Fs on behalf of the client user: + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { try { @@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** @throws Exception If failed. */ public void testStatus() throws Exception { + Path file1 = new Path("/file1"); - try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class), + try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class), Options.CreateOpts.perms(FsPermission.getDefault()))) { file.write(new byte[1024 * 1024]); } FsStatus status = fs.getFsStatus(); + assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner()); + assertEquals(4, grid(0).cluster().nodes().size()); long used = 0, max = 0; @@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), - Options.CreateOpts.perms(FsPermission.getDefault())); + try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), + Options.CreateOpts.perms(FsPermission.getDefault()))) { - for (long i = 0; i < cnt; i++) - out.writeLong(i); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - out.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - FSDataInputStream in = fs.open(file, 1024); + try (FSDataInputStream in = fs.open(file, 1024)) { - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); - - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 9e84c51..b089995 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra primaryConfFullPath = null; SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null); + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); - primaryFs = provider.createFileSystem(); + primaryFs = provider.createFileSystem(null); primaryFsUri = provider.uri(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java index d3440fc..c0f73af 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java @@ -73,10 +73,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true)); FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index e8ef414..f215efb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -43,6 +44,7 @@ import org.jsr166.*; import java.io.*; import java.lang.reflect.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA /** Secondary file system configuration path. */ private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** Secondary endpoint configuration. */ protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; @@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500"; } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true); @@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem); - fs = FileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + assertNotNull(clientUgi); + + // Create the Fs on behalf of the specific user: + clientUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = FileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } @@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setDefaultMode(mode); if (mode != PRIMARY) - cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH)); + cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); @@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, true, 1024); - - for (long i = 0; i < cnt; i++) - out.writeLong(i); + try (FSDataOutputStream out = fs.create(file, true, 1024)) { - out.close(); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - FSDataInputStream in = fs.open(file, 1024); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); + try (FSDataInputStream in = fs.open(file, 1024)) { - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA String path = fs.getFileStatus(file).getPath().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file")); + assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file")); } /** @throws Exception If failed. */ @@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA public void testGetWorkingDirectoryIfDefault() throws Exception { String path = fs.getWorkingDirectory().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous"))); + assertTrue(path.endsWith("/user/" + getClientFsUser())); } /** @throws Exception If failed. */ @@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA @SuppressWarnings("OctalInteger") public void testMkdirs() throws Exception { Path fsHome = new Path(PRIMARY_URI); - Path dir = new Path(fsHome, "/tmp/staging"); - Path nestedDir = new Path(dir, "nested"); + final Path dir = new Path(fsHome, "/tmp/staging"); + final Path nestedDir = new Path(dir, "nested"); - FsPermission dirPerm = FsPermission.createImmutable((short)0700); - FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); + final FsPermission dirPerm = FsPermission.createImmutable((short)0700); + final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); assertTrue(fs.mkdirs(dir, dirPerm)); assertTrue(fs.mkdirs(nestedDir, nestedDirPerm)); assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index b92b213..fcfd587 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest try { switchHandlerErrorFlag(true); - HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG); + HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null); client.handshake(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java index e103c5f..2c17ba9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null); + cache.clear(); // avoid influence of previous tests in the same process. + String name = "igfs:" + getTestGridName(0) + "@"; Configuration cfg = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index af1a1e1..e8a0a6f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** Initial REST port. */ private int restPort = REST_PORT; + /** Secondary file system REST endpoint configuration. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + /** Initial classpath. */ private static String initCp; @@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS configuration. */ - public FileSystemConfiguration igfsConfiguration() { + public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setName(igfsName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java index d10ee5c..c66cdf3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop; import com.google.common.base.*; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.junits.common.*; import org.jsr166.*; @@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); + String cfgPath = "config/hadoop/default-config.xml"; + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath); + + IgniteConfiguration cfg = tup.get1(); + + cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes. + + igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java index 8cf31a2..5f90bd4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*; * Test file systems for the working directory multi-threading support. */ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + /** the number of threads */ private static final int THREAD_COUNT = 3; /** {@inheritDoc} */ @@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { try { int curThreadNum = threadNum.getAndIncrement(); - FileSystem fs = FileSystem.get(uri, cfg); - - HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); - if ("file".equals(uri.getScheme())) FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); @@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { } /** - * Test IGFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testIgfs() throws Exception { - testFileSystem(URI.create(igfsScheme())); - } - - /** - * Test HDFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testHdfs() throws Exception { - testFileSystem(URI.create("hdfs://localhost/")); - } - - /** * Test LocalFS multi-thread working directory. * * @throws Exception If fails. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 8a3a0ac..66c14b5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Test of whole cycle of map-reduce processing via Job tracker. */ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:[email protected]:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; } /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsEx.PROP_USER_NAME); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsEx.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. */ @@ -59,9 +132,14 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); + final int red = 10_000; + final int blue = 20_000; + final int green = 15_000; + final int yellow = 7_000; - for (int i = 0; i < 8; i++) { + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); + + for (int i = 0; i < 3; i++) { igfs.delete(new IgfsPath(PATH_OUTPUT), true); boolean useNewMapper = (i & 1) == 0; @@ -71,7 +149,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { JobConf jobConf = new JobConf(); jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser("yyy"); + jobConf.setUser(USER); jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); //To split into about 40 items for v2 @@ -105,13 +183,19 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { checkJobStatistics(jobId); + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + useNewReducer, - "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") + "blue\t" + blue + "\n" + + "green\t" + green + "\n" + + "red\t" + red + "\n" + + "yellow\t" + yellow + "\n", + readAndSortFile(outFile) ); } } @@ -182,7 +266,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { } } - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); assert GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -212,4 +296,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; } } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index 8dc9830..eee5c8b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ - @Override public FileSystemConfiguration igfsConfiguration() { + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = super.igfsConfiguration(); cfg.setFragmentizerEnabled(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java index aaf0f92..6930020 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.net.*; @@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Hadoop job. * @throws IOException If fails. */ - public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception; /** * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API @@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), igfs.info(inFile).length() - fileBlock1.length()); - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); @@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); @@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testReduceTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); @@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testCombinerTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob("/", "/"); + HadoopJob gridJob = getHadoopJob("/", "/"); HadoopTestTaskContext ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); @@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context of combine task with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); @@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index b41a260..48e83cc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws IOException If fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); @@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index b677c63..e73fae3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws Exception if fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); @@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ebc89f4..f3b9307 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopDefaultJobInfo info = createJobInfo(cfg); + + HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + + HadoopJob job = info.createJob(id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index b4ed5e1..9395c5e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; /** * Abstract class for maps test. @@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { assert false; } + /** {@inheritDoc} */ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { assert false; } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java index 8a046e0..89bf830 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { int sigma = max((int)ceil(precission * exp), 5); - X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission + " sigma: " + sigma); - assertTrue(abs(exp - levelsCnts[level]) <= sigma); + assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java new file mode 100644 index 0000000..cfad322 --- /dev/null +++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.hibernate; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.hibernate.*; +import org.hibernate.cfg.*; + +import javax.cache.integration.*; +import java.io.*; +import java.net.*; + +/** + * Hibernate-based cache store session listener. + * <p> + * This listener creates a new Hibernate session for each store + * session. If there is an ongoing cache transaction, a corresponding + * Hibernate transaction is created as well. + * <p> + * The Hibernate session is saved as a store session + * {@link CacheStoreSession#attachment() attachment}. + * The listener guarantees that the session will be + * available for any store operation. If there is an + * ongoing cache transaction, all operations within this + * transaction will share a DB transaction. + * <p> + * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)} + * method can be implemented if {@link CacheHibernateStoreSessionListener} + * is configured: + * <pre name="code" class="java"> + * private static class Store extends CacheStoreAdapter<Integer, Integer> { + * @CacheStoreSessionResource + * private CacheStoreSession ses; + * + * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException { + * // Get Hibernate session from the current store session. + * Session hibSes = ses.attachment(); + * + * // Persist the value. + * hibSes.persist(entry.getValue()); + * } + * } + * </pre> + * Hibernate session will be automatically created by the listener + * at the start of the session and closed when it ends. + * <p> + * {@link CacheHibernateStoreSessionListener} requires that either + * {@link #setSessionFactory(SessionFactory)} session factory} + * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file} + * is provided. If non of them is set, exception is thrown. Is both are provided, + * session factory will be used. + */ +public class CacheHibernateStoreSessionListener implements CacheStoreSessionListener, LifecycleAware { + /** Hibernate session factory. */ + private SessionFactory sesFactory; + + /** Hibernate configuration file path. */ + private String hibernateCfgPath; + + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** Whether to close session on stop. */ + private boolean closeSesOnStop; + + /** + * Sets Hibernate session factory. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. + * + * @param sesFactory Session factory. + */ + public void setSessionFactory(SessionFactory sesFactory) { + this.sesFactory = sesFactory; + } + + /** + * Gets Hibernate session factory. + * + * @return Session factory. + */ + public SessionFactory getSessionFactory() { + return sesFactory; + } + + /** + * Sets hibernate configuration path. + * <p> + * Either session factory or configuration file is required. + * If none is provided, exception will be thrown on startup. + * + * @param hibernateCfgPath Hibernate configuration path. + */ + public void setHibernateConfigurationPath(String hibernateCfgPath) { + this.hibernateCfgPath = hibernateCfgPath; + } + + /** + * Gets hibernate configuration path. + * + * @return Hibernate configuration path. + */ + public String getHibernateConfigurationPath() { + return hibernateCfgPath; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public void start() throws IgniteException { + if (sesFactory == null && F.isEmpty(hibernateCfgPath)) + throw new IgniteException("Either session factory or Hibernate configuration file is required by " + + getClass().getSimpleName() + '.'); + + if (!F.isEmpty(hibernateCfgPath)) { + if (sesFactory == null) { + try { + URL url = new URL(hibernateCfgPath); + + sesFactory = new Configuration().configure(url).buildSessionFactory(); + } + catch (MalformedURLException ignored) { + // No-op. + } + + if (sesFactory == null) { + File cfgFile = new File(hibernateCfgPath); + + if (cfgFile.exists()) + sesFactory = new Configuration().configure(cfgFile).buildSessionFactory(); + } + + if (sesFactory == null) + sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory(); + + if (sesFactory == null) + throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath); + + closeSesOnStop = true; + } + else + U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() + + " will be ignored (session factory is already set)."); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed()) + sesFactory.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + if (ses.attachment() == null) { + try { + Session hibSes = sesFactory.openSession(); + + ses.attach(hibSes); + + if (ses.isWithinTransaction()) + hibSes.beginTransaction(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + Session hibSes = ses.attach(null); + + if (hibSes != null) { + try { + Transaction tx = hibSes.getTransaction(); + + if (commit) { + hibSes.flush(); + + if (tx.isActive()) + tx.commit(); + } + else if (tx.isActive()) + tx.rollback(); + } + catch (HibernateException e) { + throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e); + } + finally { + hibSes.close(); + } + } + } +}
