IGNITE-6992: Hadoop: fixed MR problem with HDFS access when Kerberos is enabled. This closes #3097.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/438760ed Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/438760ed Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/438760ed Branch: refs/heads/ignite-7016 Commit: 438760ed7f9d37bb72de5e5a38d46ce2450544f8 Parents: 5fa5ae7 Author: Evgenii Zhuravlev <[email protected]> Authored: Wed Nov 29 13:50:13 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Nov 29 13:50:13 2017 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopDefaultJobInfo.java | 15 ++++- .../processors/hadoop/HadoopJobInfo.java | 7 +++ .../processors/hadoop/impl/HadoopUtils.java | 45 +++++++++++++- .../impl/fs/HadoopFileSystemCacheUtils.java | 34 ++++++----- .../hadoop/impl/proto/HadoopClientProtocol.java | 8 ++- .../processors/hadoop/impl/v2/HadoopV2Job.java | 32 ++++++++-- .../hadoop/impl/v2/HadoopV2TaskContext.java | 62 +++++++++++++------- .../impl/HadoopAbstractMapReduceTest.java | 2 +- .../hadoop/impl/HadoopGroupingTest.java | 2 +- .../hadoop/impl/HadoopJobTrackerSelfTest.java | 4 +- .../impl/HadoopMapReduceEmbeddedSelfTest.java | 2 +- .../hadoop/impl/HadoopPlannerMockJob.java | 6 ++ .../hadoop/impl/HadoopSortingTest.java | 4 +- .../impl/HadoopTaskExecutionSelfTest.java | 10 ++-- .../hadoop/impl/HadoopTasksV1Test.java | 2 +- .../hadoop/impl/HadoopTasksV2Test.java | 2 +- .../hadoop/impl/HadoopTeraSortTest.java | 2 +- .../hadoop/impl/HadoopV2JobSelfTest.java | 2 +- .../collections/HadoopAbstractMapTest.java | 6 ++ .../HadoopExternalTaskExecutionSelfTest.java | 4 +- 20 files changed, 189 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index d4a29b2..a66f501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -52,6 +52,9 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** User name. */ private String user; + /** Credentials. */ + private byte[] credentials; + /** * Default constructor required by {@link Externalizable}. */ @@ -69,12 +72,13 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { * @param props All other properties of the job. */ public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, - Map<String, String> props) { + Map<String, String> props, byte[] credentials) { this.jobName = jobName; this.user = user; this.hasCombiner = hasCombiner; this.numReduces = numReduces; this.props = props; + this.credentials = credentials; } /** {@inheritDoc} */ @@ -127,6 +131,11 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ + @Override public byte[] credentials() { + return credentials; + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, jobName); U.writeString(out, user); @@ -135,6 +144,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { out.writeInt(numReduces); IgfsUtils.writeStringMap(out, props); + + U.writeByteArray(out, credentials); } /** {@inheritDoc} */ @@ -146,6 +157,8 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { numReduces = in.readInt(); props = IgfsUtils.readStringMap(in); + + credentials = U.readByteArray(in); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index 4cc8f80..3dffbc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -69,6 +69,13 @@ public interface HadoopJobInfo { String user(); /** + * Gets credentials. + * + * @return Credentials. + */ + byte[] credentials(); + + /** * Creates new job instance for the given ID. * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution. * This method will be called once for the same ID on one node, though it can be called on the same host http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java index 767e10a..89c60b9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.impl; import com.google.common.primitives.Longs; import com.google.common.primitives.UnsignedBytes; +import java.io.DataInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -27,6 +28,8 @@ import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; @@ -211,10 +214,12 @@ public class HadoopUtils { * Creates JobInfo from hadoop configuration. * * @param cfg Hadoop configuration. + * @param credentials Credentials. * @return Job info. * @throws IgniteCheckedException If failed. */ - public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { + public static HadoopDefaultJobInfo createJobInfo(Configuration cfg, byte[] credentials) + throws IgniteCheckedException { JobConf jobConf = new JobConf(cfg); boolean hasCombiner = jobConf.get("mapred.combiner.class") != null @@ -269,7 +274,8 @@ public class HadoopUtils { for (Map.Entry<String, String> entry : jobConf) props.put(entry.getKey(), entry.getValue()); - return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); + return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props, + credentials); } /** @@ -394,4 +400,39 @@ public class HadoopUtils { return len1 - len2; } + + /** + * Deserialization of Hadoop Writable object. + * + * @param writable Writable object to deserialize to. + * @param bytes byte array to deserialize. + */ + public static void deserialize(Writable writable, byte[] bytes) throws IOException { + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes)); + + writable.readFields(dataIn); + + dataIn.close(); + } + + /** + * Create UserGroupInformation for specified user and credentials. + * + * @param user User. + * @param credentialsBytes Credentials byte array. + */ + public static UserGroupInformation createUGI(String user, byte[] credentialsBytes) throws IOException { + Credentials credentials = new Credentials(); + + HadoopUtils.deserialize(credentials, credentialsBytes); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + + ugi.addCredentials(credentials); + + if (credentials.numberOfTokens() > 0) + ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.TOKEN); + + return ugi; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java index 0b673e9..f48d21d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/fs/HadoopFileSystemCacheUtils.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.fs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ignite.IgniteException; import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; import org.apache.ignite.internal.util.GridStringBuilder; @@ -41,27 +42,32 @@ public class HadoopFileSystemCacheUtils { return new HadoopLazyConcurrentMap<>( new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { @Override public FileSystem createValue(FsCacheKey key) throws IOException { - try { - assert key != null; + assert key != null; - // Explicitly disable FileSystem caching: - URI uri = key.uri(); + // Explicitly disable FileSystem caching: + URI uri = key.uri(); - String scheme = uri.getScheme(); + String scheme = uri.getScheme(); - // Copy the configuration to avoid altering the external object. - Configuration cfg = new Configuration(key.configuration()); + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); - cfg.setBoolean(prop, true); + cfg.setBoolean(prop, true); - return FileSystem.get(uri, cfg, key.user()); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() == + UserGroupInformation.AuthenticationMethod.TOKEN) + return FileSystem.get(uri, cfg); + else { + try { + return FileSystem.get(uri, cfg, key.user()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IOException("Failed to create file system due to interrupt.", e); + throw new IOException("Failed to create file system due to interrupt.", e); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java index 7fc0e77..811b0c2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.ClusterMetrics; @@ -122,8 +123,13 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); + byte[] credentials = null; + + if (ts != null) + credentials = WritableUtils.toByteArray(ts); + HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class, - jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)); + jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials)); if (status == null) throw new IOException("Failed to submit job (null status obtained): " + jobId); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java index 2a85cb8..28b4d6b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.impl.v2; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils; import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter; @@ -317,7 +320,7 @@ public class HadoopV2Job extends HadoopJobEx { } /** {@inheritDoc} */ - @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { + @Override public void initialize(final boolean external, final UUID locNodeId) throws IgniteCheckedException { assert locNodeId != null; this.locNodeId = locNodeId; @@ -325,15 +328,36 @@ public class HadoopV2Job extends HadoopJobEx { ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader()); try { - rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId)); + if (jobInfo.credentials() == null) + rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId)); + else { + UserGroupInformation ugi = HadoopUtils.createUGI(jobInfo.user(), jobInfo.credentials()); + + try { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override public Void run() throws Exception { + rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, + jobId)); + + return null; + } + }); + } + catch (IOException | InterruptedException e) { + throw new IgniteCheckedException(e); + } + } if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true)) { - U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + " job property is set to true; please disable " + - "it if job tasks rely on mutable static state."); + U.warn(log, JOB_SHARED_CLASSLOADER.propertyName() + + " job property is set to true; please disable " + "it if job tasks rely on mutable static state."); sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId)); } } + catch (IOException e) { + throw new IgniteCheckedException(e); + } finally { HadoopCommonUtils.restoreContextClassLoader(oldLdr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index 6127822..c362b0c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask; import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask; @@ -548,41 +549,58 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** {@inheritDoc} */ @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { - String user = job.info().user(); + if (job.info().credentials() == null) { + String user = job.info().user(); - user = IgfsUtils.fixUserName(user); + user = IgfsUtils.fixUserName(user); - assert user != null; + assert user != null; - String ugiUser; + String ugiUser; - try { - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - assert currUser != null; + assert currUser != null; - ugiUser = currUser.getShortUserName(); - } - catch (IOException ioe) { - throw new IgniteCheckedException(ioe); - } + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } - try { - if (F.eq(user, ugiUser)) - // if current UGI context user is the same, do direct call: - return c.call(); - else { - UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return c.call(); + else { + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return c.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + else { + try { + UserGroupInformation ugi = HadoopUtils.createUGI(job.info().user(), job.info().credentials()); return ugi.doAs(new PrivilegedExceptionAction<T>() { - @Override public T run() throws Exception { + @Override + public T run() throws Exception { return c.call(); } }); } - } - catch (Exception e) { - throw new IgniteCheckedException(e); + catch (Exception e) { + throw new IgniteCheckedException(e); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java index 4928e3d..fc6d7f8 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java @@ -207,7 +207,7 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null)); fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java index 2de2d19..d27a234 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java @@ -127,7 +127,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest { } grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(30000); + createJobInfo(job.getConfiguration(), null)).get(30000); assertTrue(HadoopGroupingTestState.values().isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java index 381652e..c3b3040 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java @@ -121,7 +121,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { HadoopJobId jobId = new HadoopJobId(globalId, 1); - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null)); checkStatus(jobId, false); @@ -168,7 +168,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { HadoopJobId jobId = new HadoopJobId(globalId, 1); - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null)); checkStatus(jobId, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java index 6eb16af..21b7ee2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java @@ -143,7 +143,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { job.setJarByClass(HadoopWordCount2.class); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); + createJobInfo(job.getConfiguration(), null)); fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java index 28c8264..b3368bd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java @@ -178,5 +178,11 @@ public class HadoopPlannerMockJob extends HadoopJobEx { return null; } + + @Override public byte[] credentials() { + throwUnsupported(); + + return null; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java index 2e85cce..bb11ccb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java @@ -117,7 +117,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest { X.printerrln("Data generation started."); grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())).get(180000); + createJobInfo(job.getConfiguration(), null)).get(180000); X.printerrln("Data generation complete."); @@ -148,7 +148,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest { X.printerrln("Job started."); grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(180000); + createJobInfo(job.getConfiguration(), null)).get(180000); X.printerrln("Job complete."); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java index c27a67f..2394ada 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java @@ -143,7 +143,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { job.setJarByClass(getClass()); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); + createJobInfo(job.getConfiguration(), null)); fut.get(); @@ -188,7 +188,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null)); fut.get(); @@ -226,7 +226,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { job.setJarByClass(getClass()); final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3), - createJobInfo(job.getConfiguration())); + createJobInfo(job.getConfiguration(), null)); GridTestUtils.assertThrows(log, new Callable<Object>() { @Override public Object call() throws Exception { @@ -313,7 +313,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg, null)); if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -364,7 +364,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { assertFalse(killRes); - final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg)); + final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg, null)); if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java index 1d7f3e4..ca96551 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java @@ -46,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { setupFileSystems(jobConf); - HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); + HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf, null); UUID uuid = new UUID(0, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java index 61e3e46..0fcd358 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java @@ -65,7 +65,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile); - HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); + HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration(), null); UUID uuid = new UUID(0, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java index d8b74ce..46752a8 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java @@ -178,7 +178,7 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest { HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration(), null)); fut.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java index 2c2f049..041f0bc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java @@ -80,7 +80,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopDefaultJobInfo info = createJobInfo(cfg); + HadoopDefaultJobInfo info = createJobInfo(cfg, null); final UUID uuid = UUID.randomUUID(); http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java index b9dcae1..49be0a4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java @@ -177,5 +177,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { return null; } + + @Override public byte[] credentials() { + assert false; + + return null; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/438760ed/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index 0afd689..1246078 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -117,7 +117,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest job.setJarByClass(getClass()); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); + createJobInfo(job.getConfiguration(), null)); fut.get(); } @@ -153,7 +153,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest job.setJarByClass(getClass()); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); + createJobInfo(job.getConfiguration(), null)); try { fut.get();
