http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java deleted file mode 100644 index 4b1121c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ /dev/null @@ -1,560 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.DataInput; -import java.io.File; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.Comparator; -import java.util.UUID; -import java.util.concurrent.Callable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContextImpl; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; -import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; -import org.apache.ignite.internal.processors.hadoop.HadoopTask; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; -import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1CleanupTask; -import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1MapTask; -import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Partitioner; -import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1ReduceTask; -import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1SetupTask; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.unwrapSplit; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; - -/** - * Context for task execution. - */ -public class HadoopV2TaskContext extends HadoopTaskContext { - /** */ - private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; - - /** Lazy per-user file system cache used by the Hadoop task. */ - private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap - = createHadoopLazyConcurrentMap(); - - /** - * This method is called with reflection upon Job finish with class loader of each task. - * This will clean up all the Fs created for specific task. - * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders - * are different. - * - * @throws IgniteCheckedException On error. - */ - public static void close() throws IgniteCheckedException { - fsMap.close(); - } - - /** - * Check for combiner grouping support (available since Hadoop 2.3). - */ - static { - boolean ok; - - try { - JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); - - ok = true; - } - catch (NoSuchMethodException ignore) { - ok = false; - } - - COMBINE_KEY_GROUPING_SUPPORTED = ok; - } - - /** Flag is set if new context-object code is used for running the mapper. */ - private final boolean useNewMapper; - - /** Flag is set if new context-object code is used for running the reducer. */ - private final boolean useNewReducer; - - /** Flag is set if new context-object code is used for running the combiner. */ - private final boolean useNewCombiner; - - /** */ - private final JobContextImpl jobCtx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Current task. */ - private volatile HadoopTask task; - - /** Local node ID */ - private final UUID locNodeId; - - /** Counters for task. */ - private final HadoopCounters cntrs = new HadoopCountersImpl(); - - /** - * @param taskInfo Task info. - * @param job Job. - * @param jobId Job ID. - * @param locNodeId Local node ID. - * @param jobConfDataInput DataInput for read JobConf. - */ - public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId, - @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { - super(taskInfo, job); - this.locNodeId = locNodeId; - - // Before create JobConf instance we should set new context class loader. - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - - try { - JobConf jobConf = new JobConf(); - - try { - jobConf.readFields(jobConfDataInput); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - // For map-reduce jobs prefer local writes. - jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true); - - jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId())); - - useNewMapper = jobConf.getUseNewMapper(); - useNewReducer = jobConf.getUseNewReducer(); - useNewCombiner = jobConf.getCombinerClass() == null; - } - finally { - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - - /** {@inheritDoc} */ - @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { - return cntrs.counter(grp, name, cls); - } - - /** {@inheritDoc} */ - @Override public HadoopCounters counters() { - return cntrs; - } - - /** - * Creates appropriate task from current task info. - * - * @return Task. - */ - private HadoopTask createTask() { - boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT; - - switch (taskInfo().type()) { - case SETUP: - return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo()); - - case MAP: - return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo()); - - case REDUCE: - return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) : - new HadoopV1ReduceTask(taskInfo(), true); - - case COMBINE: - return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) : - new HadoopV1ReduceTask(taskInfo(), false); - - case COMMIT: - case ABORT: - return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) : - new HadoopV1CleanupTask(taskInfo(), isAbort); - - default: - return null; - } - } - - /** {@inheritDoc} */ - @Override public void run() throws IgniteCheckedException { - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); - - try { - try { - task = createTask(); - } - catch (Throwable e) { - if (e instanceof Error) - throw e; - - throw transformException(e); - } - - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - try { - task.run(this); - } - catch (Throwable e) { - if (e instanceof Error) - throw e; - - throw transformException(e); - } - } - finally { - task = null; - - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - cancelled = true; - - HadoopTask t = task; - - if (t != null) - t.cancel(); - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment() throws IgniteCheckedException { - File locDir; - - switch(taskInfo().type()) { - case MAP: - case REDUCE: - job().prepareTaskEnvironment(taskInfo()); - - locDir = taskLocalDir(locNodeId, taskInfo()); - - break; - - default: - locDir = jobLocalDir(locNodeId, taskInfo().jobId()); - } - - ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); - - try { - FileSystem.get(jobConf()); - - LocalFileSystem locFs = FileSystem.getLocal(jobConf()); - - locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath())); - } - catch (Throwable e) { - if (e instanceof Error) - throw (Error)e; - - throw transformException(e); - } - finally { - HadoopUtils.restoreContextClassLoader(oldLdr); - } - } - - /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { - job().cleanupTaskEnvironment(taskInfo()); - } - - /** - * Creates Hadoop attempt ID. - * - * @return Attempt ID. - */ - public TaskAttemptID attemptId() { - TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber()); - - return new TaskAttemptID(tid, taskInfo().attempt()); - } - - /** - * @param type Task type. - * @return Hadoop task type. - */ - private TaskType taskType(HadoopTaskType type) { - switch (type) { - case SETUP: - return TaskType.JOB_SETUP; - case MAP: - case COMBINE: - return TaskType.MAP; - - case REDUCE: - return TaskType.REDUCE; - - case COMMIT: - case ABORT: - return TaskType.JOB_CLEANUP; - - default: - return null; - } - } - - /** - * Gets job configuration of the task. - * - * @return Job configuration. - */ - public JobConf jobConf() { - return jobCtx.getJobConf(); - } - - /** - * Gets job context of the task. - * - * @return Job context. - */ - public JobContextImpl jobContext() { - return jobCtx; - } - - /** {@inheritDoc} */ - @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { - Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null); - - if (partClsOld != null) - return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf()); - - try { - return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf()); - } - catch (ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Gets serializer for specified class. - * - * @param cls Class. - * @param jobConf Job configuration. - * @return Appropriate serializer. - */ - @SuppressWarnings("unchecked") - private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { - A.notNull(cls, "cls"); - - SerializationFactory factory = new SerializationFactory(jobConf); - - Serialization<?> serialization = factory.getSerialization(cls); - - if (serialization == null) - throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName()); - - if (serialization.getClass() == WritableSerialization.class) - return new HadoopWritableSerialization((Class<? extends Writable>)cls); - - return new HadoopSerializationWrapper(serialization, cls); - } - - /** {@inheritDoc} */ - @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { - return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf()); - } - - /** {@inheritDoc} */ - @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { - return getSerialization(jobCtx.getMapOutputValueClass(), jobConf()); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> sortComparator() { - return (Comparator<Object>)jobCtx.getSortComparator(); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> groupComparator() { - Comparator<?> res; - - switch (taskInfo().type()) { - case COMBINE: - res = COMBINE_KEY_GROUPING_SUPPORTED ? - jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator(); - - break; - - case REDUCE: - res = jobContext().getGroupingComparator(); - - break; - - default: - return null; - } - - if (res != null && res.getClass() != sortComparator().getClass()) - return (Comparator<Object>)res; - - return null; - } - - /** - * @param split Split. - * @return Native Hadoop split. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings("unchecked") - public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException { - if (split instanceof HadoopExternalSplit) - return readExternalSplit((HadoopExternalSplit)split); - - if (split instanceof HadoopSplitWrapper) - return unwrapSplit((HadoopSplitWrapper)split); - - throw new IllegalStateException("Unknown split: " + split); - } - - /** - * @param split External split. - * @return Native input split. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { - Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - - FileSystem fs; - - try { - // This assertion uses .startsWith() instead of .equals() because task class loaders may - // be reused between tasks of the same job. - assert ((HadoopClassLoader)getClass().getClassLoader()).name() - .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true)); - - // We also cache Fs there, all them will be cleared explicitly upon the Job end. - fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - try ( - FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { - - in.seek(split.offset()); - - String clsName = Text.readString(in); - - Class<?> cls = jobConf().getClassByName(clsName); - - assert cls != null; - - Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls); - - Deserializer deserializer = serialization.getDeserializer(cls); - - deserializer.open(in); - - Object res = deserializer.deserialize(null); - - deserializer.close(); - - assert res != null; - - return res; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { - String user = job.info().user(); - - user = IgfsUtils.fixUserName(user); - - assert user != null; - - String ugiUser; - - try { - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - - assert currUser != null; - - ugiUser = currUser.getShortUserName(); - } - catch (IOException ioe) { - throw new IgniteCheckedException(ioe); - } - - try { - if (F.eq(user, ugiUser)) - // if current UGI context user is the same, do direct call: - return c.call(); - else { - UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); - - return ugi.doAs(new PrivilegedExceptionAction<T>() { - @Override public T run() throws Exception { - return c.call(); - } - }); - } - } - catch (Exception e) { - throw new IgniteCheckedException(e); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java deleted file mode 100644 index f46f068..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import org.apache.hadoop.io.Writable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** - * Optimized serialization for Hadoop {@link Writable} types. - */ -public class HadoopWritableSerialization implements HadoopSerialization { - /** */ - private final Class<? extends Writable> cls; - - /** - * @param cls Class. - */ - public HadoopWritableSerialization(Class<? extends Writable> cls) { - assert cls != null; - - this.cls = cls; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { - assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass(); - - try { - ((Writable)obj).write(out); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { - Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj); - - try { - w.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return w; - } - - /** {@inheritDoc} */ - @Override public void close() { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java deleted file mode 100644 index 5a20a75..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.ignite.configuration.HadoopConfiguration; - -/** - * Hadoop client protocol tests in embedded process mode. - */ -public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest { - /** {@inheritDoc} */ - @Override public HadoopConfiguration hadoopConfiguration(String gridName) { - HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - // TODO: IGNITE-404: Uncomment when fixed. - //cfg.setExternalExecution(false); - - return cfg; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java deleted file mode 100644 index 1344e26..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ /dev/null @@ -1,654 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.StringTokenizer; -import java.util.UUID; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.protocol.ClientProtocol; -import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest; -import org.apache.ignite.internal.processors.hadoop.HadoopUtils; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; - -/** - * Hadoop client protocol tests in external process mode. - */ -@SuppressWarnings("ResultOfMethodCallIgnored") -public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { - /** Input path. */ - private static final String PATH_INPUT = "/input"; - - /** Output path. */ - private static final String PATH_OUTPUT = "/output"; - - /** Job name. */ - private static final String JOB_NAME = "myJob"; - - /** Setup lock file. */ - private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-setup.file"); - - /** Map lock file. */ - private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-map.file"); - - /** Reduce lock file. */ - private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-reduce.file"); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected boolean restEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - - setupLockFile.delete(); - mapLockFile.delete(); - reduceLockFile.delete(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - -// IgniteHadoopClientProtocolProvider.cliMap.clear(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - setupLockFile.createNewFile(); - mapLockFile.createNewFile(); - reduceLockFile.createNewFile(); - - setupLockFile.deleteOnExit(); - mapLockFile.deleteOnExit(); - reduceLockFile.deleteOnExit(); - - super.beforeTest(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format(); - - setupLockFile.delete(); - mapLockFile.delete(); - reduceLockFile.delete(); - - super.afterTest(); - } - - /** - * Test next job ID generation. - * - * @throws Exception If failed. - */ - @SuppressWarnings("ConstantConditions") - private void tstNextJobId() throws Exception { - IgniteHadoopClientProtocolProvider provider = provider(); - - ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); - - JobID jobId = proto.getNewJobID(); - - assert jobId != null; - assert jobId.getJtIdentifier() != null; - - JobID nextJobId = proto.getNewJobID(); - - assert nextJobId != null; - assert nextJobId.getJtIdentifier() != null; - - assert !F.eq(jobId, nextJobId); - } - - /** - * Tests job counters retrieval. - * - * @throws Exception If failed. - */ - public void testJobCounters() throws Exception { - IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); - - igfs.mkdirs(new IgfsPath(PATH_INPUT)); - - try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( - new IgfsPath(PATH_INPUT + "/test.file"), true)))) { - - bw.write( - "alpha\n" + - "beta\n" + - "gamma\n" + - "alpha\n" + - "beta\n" + - "gamma\n" + - "alpha\n" + - "beta\n" + - "gamma\n" - ); - } - - Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); - - final Job job = Job.getInstance(conf); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestCountingMapper.class); - job.setReducerClass(TestCountingReducer.class); - job.setCombinerClass(TestCountingCombiner.class); - - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - - job.submit(); - - final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); - - assertEquals(0, cntr.getValue()); - - cntr.increment(10); - - assertEquals(10, cntr.getValue()); - - // Transferring to map phase. - setupLockFile.delete(); - - // Transferring to reduce phase. - mapLockFile.delete(); - - job.waitForCompletion(false); - - assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); - - final Counters counters = job.getCounters(); - - assertNotNull("counters cannot be null", counters); - assertEquals("wrong counters count", 3, counters.countCounters()); - assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); - } - - /** - * Tests job counters retrieval for unknown job id. - * - * @throws Exception If failed. - */ - private void tstUnknownJobCounters() throws Exception { - IgniteHadoopClientProtocolProvider provider = provider(); - - ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); - - try { - proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1)); - fail("exception must be thrown"); - } - catch (Exception e) { - assert e instanceof IOException : "wrong error has been thrown"; - } - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMap() throws Exception { - checkJobSubmit(true, true); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapCombine() throws Exception { - checkJobSubmit(false, true); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapReduce() throws Exception { - checkJobSubmit(true, false); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapCombineReduce() throws Exception { - checkJobSubmit(false, false); - } - - /** - * Test job submission. - * - * @param noCombiners Whether there are no combiners. - * @param noReducers Whether there are no reducers. - * @throws Exception If failed. - */ - public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception { - IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); - - igfs.mkdirs(new IgfsPath(PATH_INPUT)); - - try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( - new IgfsPath(PATH_INPUT + "/test.file"), true)))) { - - bw.write("word"); - } - - Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); - - final Job job = Job.getInstance(conf); - - job.setJobName(JOB_NAME); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - - if (!noCombiners) - job.setCombinerClass(TestCombiner.class); - - if (noReducers) - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(TestOutputFormat.class); - - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - - job.submit(); - - JobID jobId = job.getJobID(); - - // Setup phase. - JobStatus jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; - assert jobStatus.getMapProgress() == 0.0f; - assert jobStatus.getReduceProgress() == 0.0f; - - U.sleep(2100); - - JobStatus recentJobStatus = job.getStatus(); - - assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : - "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); - - // Transferring to map phase. - setupLockFile.delete(); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getSetupProgress()); - } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); - } - } - }, 5000L); - - // Map phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; - assert jobStatus.getReduceProgress() == 0.0f; - - U.sleep(2100); - - recentJobStatus = job.getStatus(); - - assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : - "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress(); - - // Transferring to reduce phase. - mapLockFile.delete(); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getMapProgress()); - } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); - } - } - }, 5000L); - - if (!noReducers) { - // Reduce phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; - - // Ensure that reduces progress increases. - U.sleep(2100); - - recentJobStatus = job.getStatus(); - - assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : - "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); - - reduceLockFile.delete(); - } - - job.waitForCompletion(false); - - jobStatus = job.getStatus(); - checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() == 1.0f; - - dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); - } - - /** - * Dump IGFS content. - * - * @param igfs IGFS. - * @param path Path. - * @throws Exception If failed. - */ - @SuppressWarnings("ConstantConditions") - private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception { - IgfsFile file = igfs.info(path); - - assert file != null; - - System.out.println(file.path()); - - if (file.isDirectory()) { - for (IgfsPath child : igfs.listPaths(path)) - dumpIgfs(igfs, child); - } - else { - try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) { - String line = br.readLine(); - - while (line != null) { - System.out.println(line); - - line = br.readLine(); - } - } - } - } - - /** - * Check job status. - * - * @param status Job status. - * @param expJobId Expected job ID. - * @param expJobName Expected job name. - * @param expState Expected state. - * @param expCleanupProgress Expected cleanup progress. - * @throws Exception If failed. - */ - private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName, - JobStatus.State expState, float expCleanupProgress) throws Exception { - assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID(); - assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName(); - assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState(); - assert F.eq(status.getCleanupProgress(), expCleanupProgress) : - "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress(); - } - - /** - * @return Configuration. - */ - private Configuration config(int port) { - Configuration conf = HadoopUtils.safeCreateConfiguration(); - - setupFileSystems(conf); - - conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME); - conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port); - - conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/"); - - return conf; - } - - /** - * @return Protocol provider. - */ - private IgniteHadoopClientProtocolProvider provider() { - return new IgniteHadoopClientProtocolProvider(); - } - - /** - * Test mapper. - */ - public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - while (mapLockFile.exists()) - Thread.sleep(50); - - StringTokenizer wordList = new StringTokenizer(val.toString()); - - while (wordList.hasMoreTokens()) { - word.set(wordList.nextToken()); - - ctx.write(word, one); - } - } - } - - /** - * Test Hadoop counters. - */ - public enum TestCounter { - COUNTER1, COUNTER2, COUNTER3 - } - - /** - * Test mapper that uses counters. - */ - public static class TestCountingMapper extends TestMapper { - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - super.map(key, val, ctx); - ctx.getCounter(TestCounter.COUNTER1).increment(1); - } - } - - /** - * Test combiner that counts invocations. - */ - public static class TestCountingCombiner extends TestReducer { - @Override public void reduce(Text key, Iterable<IntWritable> values, - Context ctx) throws IOException, InterruptedException { - ctx.getCounter(TestCounter.COUNTER1).increment(1); - ctx.getCounter(TestCounter.COUNTER2).increment(1); - - int sum = 0; - for (IntWritable value : values) - sum += value.get(); - - ctx.write(key, new IntWritable(sum)); - } - } - - /** - * Test reducer that counts invocations. - */ - public static class TestCountingReducer extends TestReducer { - @Override public void reduce(Text key, Iterable<IntWritable> values, - Context ctx) throws IOException, InterruptedException { - ctx.getCounter(TestCounter.COUNTER1).increment(1); - ctx.getCounter(TestCounter.COUNTER3).increment(1); - } - } - - /** - * Test combiner. - */ - public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { - // No-op. - } - - public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> { - /** {@inheritDoc} */ - @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx) - throws IOException { - return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx)); - } - } - - /** - * Test output committer. - */ - private static class TestOutputCommitter extends FileOutputCommitter { - /** Delegate. */ - private final FileOutputCommitter delegate; - - /** - * Constructor. - * - * @param ctx Task attempt context. - * @param delegate Delegate. - * @throws IOException If failed. - */ - private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException { - super(FileOutputFormat.getOutputPath(ctx), ctx); - - this.delegate = delegate; - } - - /** {@inheritDoc} */ - @Override public void setupJob(JobContext jobCtx) throws IOException { - try { - while (setupLockFile.exists()) - Thread.sleep(50); - } - catch (InterruptedException ignored) { - throw new IOException("Interrupted."); - } - - delegate.setupJob(jobCtx); - } - - /** {@inheritDoc} */ - @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException { - delegate.setupTask(taskCtx); - } - - /** {@inheritDoc} */ - @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException { - return delegate.needsTaskCommit(taskCtx); - } - - /** {@inheritDoc} */ - @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException { - delegate.commitTask(taskCtx); - } - - /** {@inheritDoc} */ - @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException { - delegate.abortTask(taskCtx); - } - } - - /** - * Test reducer. - */ - public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** Writable container for writing sum of word counts. */ - private IntWritable totalWordCnt = new IntWritable(); - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - while (reduceLockFile.exists()) - Thread.sleep(50); - - int wordCnt = 0; - - for (IntWritable value : values) - wordCnt += value.get(); - - totalWordCnt.set(wordCnt); - - ctx.write(key, totalWordCnt); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java deleted file mode 100644 index 6f910f1..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/cache/HadoopTxConfigCacheTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.cache; - -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.IgniteTxConfigCacheSelfTest; -import org.apache.ignite.internal.util.typedef.internal.CU; - -/** - * Test checks whether hadoop system cache doesn't use user defined TX config. - */ -public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest { - /** - * Success if system caches weren't timed out. - * - * @throws Exception - */ - public void testSystemCacheTx() throws Exception { - final Ignite ignite = grid(0); - - final IgniteInternalCache<Object, Object> hadoopCache = getSystemCache(ignite, CU.SYS_CACHE_HADOOP_MR); - - checkImplicitTxSuccess(hadoopCache); - checkStartTxSuccess(hadoopCache); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java deleted file mode 100644 index ea7fa99..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactorySelfTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.fs; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.util.concurrent.Callable; - -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Assert; - -/** - * Tests KerberosHadoopFileSystemFactory. - */ -public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest { - /** - * Test parameters validation. - * - * @throws Exception If failed. - */ - public void testParameters() throws Exception { - checkParameters(null, null, -1); - - checkParameters(null, null, 100); - checkParameters(null, "b", -1); - checkParameters("a", null, -1); - - checkParameters(null, "b", 100); - checkParameters("a", null, 100); - checkParameters("a", "b", -1); - } - - /** - * Check parameters. - * - * @param keyTab Key tab. - * @param keyTabPrincipal Key tab principal. - * @param reloginInterval Re-login interval. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) { - final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory(); - - fac.setKeyTab(keyTab); - fac.setKeyTabPrincipal(keyTabPrincipal); - fac.setReloginInterval(reloginInterval); - - GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - fac.start(); - - return null; - } - }, IllegalArgumentException.class, null); - } - - /** - * Checks serializatuion and deserialization of the secure factory. - * - * @throws Exception If failed. - */ - public void testSerialization() throws Exception { - KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory(); - - checkSerialization(fac); - - fac = new KerberosHadoopFileSystemFactory(); - - fac.setUri("igfs://igfs@localhost:10500/"); - fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml"); - fac.setKeyTabPrincipal("foo"); - fac.setKeyTab("/etc/krb5.keytab"); - fac.setReloginInterval(30 * 60 * 1000L); - - checkSerialization(fac); - } - - /** - * Serializes the factory, - * - * @param fac The facory to check. - * @throws Exception If failed. - */ - private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - ObjectOutput oo = new ObjectOutputStream(baos); - - oo.writeObject(fac); - - ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())); - - KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject(); - - assertEquals(fac.getUri(), fac2.getUri()); - Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths()); - assertEquals(fac.getKeyTab(), fac2.getKeyTab()); - assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal()); - assertEquals(fac.getReloginInterval(), fac2.getReloginInterval()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java deleted file mode 100644 index fd8fdef..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.util; - -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; - -import java.util.HashMap; -import java.util.Map; - -/** - * Test for basic user name mapper. - */ -public class BasicUserNameMapperSelfTest extends GridCommonAbstractTest { - /** - * Test null mappings. - * - * @throws Exception If failed. - */ - public void testNullMappings() throws Exception { - checkNullOrEmptyMappings(null); - } - - /** - * Test empty mappings. - * - * @throws Exception If failed. - */ - public void testEmptyMappings() throws Exception { - checkNullOrEmptyMappings(new HashMap<String, String>()); - } - - /** - * Check null or empty mappings. - * - * @param map Mappings. - * @throws Exception If failed. - */ - private void checkNullOrEmptyMappings(@Nullable Map<String, String> map) throws Exception { - BasicUserNameMapper mapper = create(map, false, null); - - assertNull(mapper.map(null)); - assertEquals("1", mapper.map("1")); - assertEquals("2", mapper.map("2")); - - mapper = create(map, true, null); - - assertNull(mapper.map(null)); - assertNull(mapper.map("1")); - assertNull(mapper.map("2")); - - mapper = create(map, false, "A"); - - assertNull(mapper.map(null)); - assertEquals("1", mapper.map("1")); - assertEquals("2", mapper.map("2")); - - mapper = create(map, true, "A"); - - assertEquals("A", mapper.map(null)); - assertEquals("A", mapper.map("1")); - assertEquals("A", mapper.map("2")); - } - - /** - * Test regular mappings. - * - * @throws Exception If failed. - */ - public void testMappings() throws Exception { - Map<String, String> map = new HashMap<>(); - - map.put("1", "101"); - - BasicUserNameMapper mapper = create(map, false, null); - - assertNull(mapper.map(null)); - assertEquals("101", mapper.map("1")); - assertEquals("2", mapper.map("2")); - - mapper = create(map, true, null); - - assertNull(mapper.map(null)); - assertEquals("101", mapper.map("1")); - assertNull(mapper.map("2")); - - mapper = create(map, false, "A"); - - assertNull(mapper.map(null)); - assertEquals("101", mapper.map("1")); - assertEquals("2", mapper.map("2")); - - mapper = create(map, true, "A"); - - assertEquals("A", mapper.map(null)); - assertEquals("101", mapper.map("1")); - assertEquals("A", mapper.map("2")); - } - - /** - * Create mapper. - * - * @param dictionary Dictionary. - * @param useDfltUsrName Whether to use default user name. - * @param dfltUsrName Default user name. - * @return Mapper. - */ - private BasicUserNameMapper create(@Nullable Map<String, String> dictionary, boolean useDfltUsrName, - @Nullable String dfltUsrName) { - BasicUserNameMapper mapper = new BasicUserNameMapper(); - - mapper.setMappings(dictionary); - mapper.setUseDefaultUserName(useDfltUsrName); - mapper.setDefaultUserName(dfltUsrName); - - return mapper; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java deleted file mode 100644 index bfac49c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/ChainedUserNameMapperSelfTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.util; - -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import java.util.Collections; -import java.util.concurrent.Callable; - -/** - * Tests for chained user name mapper. - */ -public class ChainedUserNameMapperSelfTest extends GridCommonAbstractTest { - /** Test instance. */ - private static final String INSTANCE = "test_instance"; - - /** Test realm. */ - private static final String REALM = "test_realm"; - - /** - * Test case when mappers are null. - * - * @throws Exception If failed. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testNullMappers() throws Exception { - GridTestUtils.assertThrows(null, new Callable<Void>() { - @Override public Void call() throws Exception { - create((UserNameMapper[])null); - - return null; - } - }, IgniteException.class, null); - } - - /** - * Test case when one of mappers is null. - * - * @throws Exception If failed. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testNullMapperElement() throws Exception { - GridTestUtils.assertThrows(null, new Callable<Void>() { - @Override public Void call() throws Exception { - create(new BasicUserNameMapper(), null); - - return null; - } - }, IgniteException.class, null); - } - - /** - * Test actual chaining logic. - * - * @throws Exception If failed. - */ - public void testChaining() throws Exception { - BasicUserNameMapper mapper1 = new BasicUserNameMapper(); - - mapper1.setMappings(Collections.singletonMap("1", "101")); - - KerberosUserNameMapper mapper2 = new KerberosUserNameMapper(); - - mapper2.setInstance(INSTANCE); - mapper2.setRealm(REALM); - - ChainedUserNameMapper mapper = create(mapper1, mapper2); - - assertEquals("101" + "/" + INSTANCE + "@" + REALM, mapper.map("1")); - assertEquals("2" + "/" + INSTANCE + "@" + REALM, mapper.map("2")); - assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null)); - } - - /** - * Create chained mapper. - * - * @param mappers Child mappers. - * @return Chained mapper. - */ - private ChainedUserNameMapper create(UserNameMapper... mappers) { - ChainedUserNameMapper mapper = new ChainedUserNameMapper(); - - mapper.setMappers(mappers); - - mapper.start(); - - return mapper; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java deleted file mode 100644 index cc685bb..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/KerberosUserNameMapperSelfTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.hadoop.util; - -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; - -/** - * Tests for Kerberos name mapper. - */ -public class KerberosUserNameMapperSelfTest extends GridCommonAbstractTest { - /** Test instance. */ - private static final String INSTANCE = "test_instance"; - - /** Test realm. */ - private static final String REALM = "test_realm"; - - /** - * Test mapper without instance and realm components. - * - * @throws Exception If failed. - */ - public void testMapper() throws Exception { - KerberosUserNameMapper mapper = create(null, null); - - assertEquals(IgfsUtils.fixUserName(null), mapper.map(null)); - assertEquals("test", mapper.map("test")); - } - - /** - * Test mapper with instance component. - * - * @throws Exception If failed. - */ - public void testMapperInstance() throws Exception { - KerberosUserNameMapper mapper = create(INSTANCE, null); - - assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE, mapper.map(null)); - assertEquals("test" + "/" + INSTANCE, mapper.map("test")); - } - - /** - * Test mapper with realm. - * - * @throws Exception If failed. - */ - public void testMapperRealm() throws Exception { - KerberosUserNameMapper mapper = create(null, REALM); - - assertEquals(IgfsUtils.fixUserName(null) + "@" + REALM, mapper.map(null)); - assertEquals("test" + "@" + REALM, mapper.map("test")); - } - - /** - * Test mapper with instance and realm components. - * - * @throws Exception If failed. - */ - public void testMapperInstanceAndRealm() throws Exception { - KerberosUserNameMapper mapper = create(INSTANCE, REALM); - - assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null)); - assertEquals("test" + "/" + INSTANCE + "@" + REALM, mapper.map("test")); - } - - /** - * Create mapper. - * - * @param instance Instance. - * @param realm Realm. - * @return Mapper. - */ - private KerberosUserNameMapper create(@Nullable String instance, @Nullable String realm) { - KerberosUserNameMapper mapper = new KerberosUserNameMapper(); - - mapper.setInstance(instance); - mapper.setRealm(realm); - - mapper.start(); - - return mapper; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java deleted file mode 100644 index 2c25a06..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.ignite.IgniteException; -import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.hadoop.util.ChainedUserNameMapper; -import org.apache.ignite.hadoop.util.KerberosUserNameMapper; -import org.apache.ignite.hadoop.util.UserNameMapper; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest; -import org.apache.ignite.lifecycle.LifecycleAware; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME; -import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.SECONDARY_CFG_PATH; -import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.configuration; -import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.mkUri; -import static org.apache.ignite.igfs.HadoopSecondaryFileSystemConfigurationTest.writeConfiguration; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; - -/** - * Abstract test for Hadoop 1.0 file system stack. - */ -public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest { - /** Secondary grid name */ - private static final String GRID_NAME = "grid_secondary"; - - /** Secondary file system name */ - private static final String IGFS_NAME = "igfs_secondary"; - - /** Secondary file system REST endpoint port */ - private static final int PORT = 11500; - - /** Secondary file system REST endpoint configuration map. */ - private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration() {{ - setType(IgfsIpcEndpointType.TCP); - setPort(PORT); - }}; - - /** Secondary file system authority. */ - private static final String SECONDARY_AUTHORITY = IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + PORT; - - /** Secondary Fs configuration full path. */ - protected String secondaryConfFullPath; - - /** Secondary Fs URI. */ - protected String secondaryUri; - - /** Constructor. */ - public Hadoop1DualAbstractTest(IgfsMode mode) { - super(mode); - } - - /** - * Creates secondary filesystems. - * @return IgfsSecondaryFileSystem - * @throws Exception On failure. - */ - @Override protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception { - startUnderlying(); - - prepareConfiguration(); - - KerberosUserNameMapper mapper1 = new KerberosUserNameMapper(); - - mapper1.setRealm("TEST.COM"); - - TestUserNameMapper mapper2 = new TestUserNameMapper(); - - ChainedUserNameMapper mapper = new ChainedUserNameMapper(); - - mapper.setMappers(mapper1, mapper2); - - CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory(); - - factory.setUri(secondaryUri); - factory.setConfigPaths(secondaryConfFullPath); - factory.setUserNameMapper(mapper); - - IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem(); - - second.setFileSystemFactory(factory); - - igfsSecondary = new HadoopIgfsSecondaryFileSystemTestAdapter(factory); - - return second; - } - - /** - * Starts underlying Ignite process. - * @throws IOException On failure. - */ - protected void startUnderlying() throws Exception { - startGridWithIgfs(GRID_NAME, IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG, secondaryIpFinder); - } - - /** - * Prepares Fs configuration. - * @throws IOException On failure. - */ - protected void prepareConfiguration() throws IOException { - Configuration secondaryConf = configuration(IGFS_SCHEME, SECONDARY_AUTHORITY, true, true); - - secondaryConf.setInt("fs.igfs.block.size", 1024); - - secondaryConfFullPath = writeConfiguration(secondaryConf, SECONDARY_CFG_PATH); - - secondaryUri = mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY); - } - - /** - * Test user name mapper. - */ - private static class TestUserNameMapper implements UserNameMapper, LifecycleAware { - /** */ - private static final long serialVersionUID = 0L; - - /** Started flag. */ - private boolean started; - - /** {@inheritDoc} */ - @Nullable @Override public String map(String name) { - assert started; - assert name != null && name.contains("@"); - - return name.substring(0, name.indexOf("@")); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - started = true; - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - // No-op. - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java deleted file mode 100644 index bbf1223..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualAsyncTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -/** - * DUAL_ASYNC mode test. - */ -public class Hadoop1OverIgfsDualAsyncTest extends Hadoop1DualAbstractTest { - /** - * Constructor. - */ - public Hadoop1OverIgfsDualAsyncTest() { - super(IgfsMode.DUAL_ASYNC); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java deleted file mode 100644 index c57415c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1OverIgfsDualSyncTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -/** - * DUAL_SYNC mode. - */ -public class Hadoop1OverIgfsDualSyncTest extends Hadoop1DualAbstractTest { - /** - * Constructor. - */ - public Hadoop1OverIgfsDualSyncTest() { - super(IgfsMode.DUAL_SYNC); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java deleted file mode 100644 index 5be3a64..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.jetbrains.annotations.Nullable; -import java.io.Externalizable; - -import java.io.File; -import java.io.FileOutputStream; -import java.net.URI; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; - -/** - * Tests for Hadoop file system factory. - */ -public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest { - /** Amount of "start" invocations */ - private static final AtomicInteger START_CNT = new AtomicInteger(); - - /** Amount of "stop" invocations */ - private static final AtomicInteger STOP_CNT = new AtomicInteger(); - - /** Path to secondary file system configuration. */ - private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml"; - - /** IGFS path for DUAL mode. */ - private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir"); - - /** IGFS path for PROXY mode. */ - private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir"); - - /** IGFS path for DUAL mode. */ - private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir"); - - /** IGFS path for PROXY mode. */ - private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir"); - - /** Secondary IGFS. */ - private IgfsEx secondary; - - /** Primary IGFS. */ - private IgfsEx primary; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - START_CNT.set(0); - STOP_CNT.set(0); - - secondary = startSecondary(); - primary = startPrimary(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - secondary = null; - primary = null; - - stopAllGrids(); - } - - /** - * Test custom factory. - * - * @throws Exception If failed. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testCustomFactory() throws Exception { - assert START_CNT.get() == 1; - assert STOP_CNT.get() == 0; - - // Use IGFS directly. - primary.mkdirs(IGFS_PATH_DUAL); - - assert primary.exists(IGFS_PATH_DUAL); - assert secondary.exists(IGFS_PATH_DUAL); - - // Create remote instance. - FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration()); - - // Ensure lifecycle callback was invoked. - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 0; - - // Check file system operations. - assert fs.exists(PATH_DUAL); - - assert fs.delete(PATH_DUAL, true); - assert !primary.exists(IGFS_PATH_DUAL); - assert !secondary.exists(IGFS_PATH_DUAL); - assert !fs.exists(PATH_DUAL); - - assert fs.mkdirs(PATH_DUAL); - assert primary.exists(IGFS_PATH_DUAL); - assert secondary.exists(IGFS_PATH_DUAL); - assert fs.exists(PATH_DUAL); - - assert fs.mkdirs(PATH_PROXY); - assert secondary.exists(IGFS_PATH_PROXY); - assert fs.exists(PATH_PROXY); - - // Close file system and ensure that associated factory was notified. - fs.close(); - - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 1; - - // Stop primary node and ensure that base factory was notified. - G.stop(primary.context().kernalContext().grid().name(), true); - - assert START_CNT.get() == 2; - assert STOP_CNT.get() == 2; - } - - /** - * Start secondary IGFS. - * - * @return IGFS. - * @throws Exception If failed. - */ - private static IgfsEx startSecondary() throws Exception { - return start("secondary", 11500, IgfsMode.PRIMARY, null); - } - - /** - * Start primary IGFS. - * - * @return IGFS. - * @throws Exception If failed. - */ - private static IgfsEx startPrimary() throws Exception { - // Prepare configuration. - Configuration conf = baseConfiguration(); - - conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/"); - - writeConfigurationToFile(conf); - - // Configure factory. - TestFactory factory = new TestFactory(); - - factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/"); - factory.setConfigPaths(SECONDARY_CFG_PATH); - - // Configure file system. - IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem(); - - fs.setFileSystemFactory(factory); - - // Start. - return start("primary", 10500, IgfsMode.DUAL_ASYNC, fs); - } - - /** - * Start Ignite node with IGFS instance. - * - * @param name Node and IGFS name. - * @param endpointPort Endpoint port. - * @param dfltMode Default path mode. - * @param secondaryFs Secondary file system. - * @return Igfs instance. - */ - private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode, - @Nullable IgfsSecondaryFileSystem secondaryFs) { - IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); - - endpointCfg.setType(IgfsIpcEndpointType.TCP); - endpointCfg.setHost("127.0.0.1"); - endpointCfg.setPort(endpointPort); - - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("dataCache"); - igfsCfg.setMetaCacheName("metaCache"); - igfsCfg.setName(name); - igfsCfg.setDefaultMode(dfltMode); - igfsCfg.setIpcEndpointConfiguration(endpointCfg); - igfsCfg.setSecondaryFileSystem(secondaryFs); - igfsCfg.setInitializeDefaultPathModes(true); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setOffHeapMaxMemory(0); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(name); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - return (IgfsEx)G.start(cfg).fileSystem(name); - } - - /** - * Create base FileSystem configuration. - * - * @return Configuration. - */ - private static Configuration baseConfiguration() { - Configuration conf = new Configuration(); - - conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); - - return conf; - } - - /** - * Write configuration to file. - * - * @param conf Configuration. - * @throws Exception If failed. - */ - @SuppressWarnings("ResultOfMethodCallIgnored") - private static void writeConfigurationToFile(Configuration conf) throws Exception { - final String path = U.getIgniteHome() + SECONDARY_CFG_PATH; - - File file = new File(path); - - file.delete(); - - assertFalse(file.exists()); - - try (FileOutputStream fos = new FileOutputStream(file)) { - conf.writeXml(fos); - } - - assertTrue(file.exists()); - } - - /** - * Test factory. - */ - private static class TestFactory extends CachingHadoopFileSystemFactory { - /** - * {@link Externalizable} support. - */ - public TestFactory() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteException { - START_CNT.incrementAndGet(); - - super.start(); - } - - /** {@inheritDoc} */ - @Override public void stop() throws IgniteException { - STOP_CNT.incrementAndGet(); - - super.stop(); - } - } -}