http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index ffa6f7d..a69b72a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -17,47 +17,27 @@ package org.apache.ignite.internal.processors.hadoop; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteFileSystem; -import org.apache.ignite.IgniteSpringBean; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner; import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsMetrics; -import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; -import org.apache.ignite.igfs.mapreduce.IgfsTask; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.cluster.IgniteClusterEx; -import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl; -import org.apache.ignite.internal.processors.igfs.IgfsContext; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter; -import org.apache.ignite.internal.processors.igfs.IgfsLocalMetrics; -import org.apache.ignite.internal.processors.igfs.IgfsPaths; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; +import org.apache.ignite.internal.processors.igfs.IgfsMock; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; /** * @@ -90,12 +70,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes /** */ private static final String INVALID_HOST_3 = "invalid_host3"; - /** Mocked Grid. */ - private static final MockIgnite GRID = new MockIgnite(); - /** Mocked IGFS. */ private static final IgniteFileSystem IGFS = new MockIgfs(); + /** Mocked Grid. */ + private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS); + /** Planner. */ private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); @@ -109,15 +89,15 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>(); /** - * + * Static initializer. */ static { - GridTestUtils.setFieldValue(PLANNER, "ignite", GRID); + GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID); } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - GridTestUtils.setFieldValue(PLANNER, "log", log()); + GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log()); BLOCK_MAP.clear(); PROXY_MAP.clear(); @@ -445,7 +425,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes top.add(node2); top.add(node3); - HadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); + HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null); PLAN.set(plan); @@ -607,81 +587,17 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** - * Mocked job. + * Mocked IGFS. */ - private static class MockJob implements HadoopJob { - /** Reducers count. */ - private final int reducers; - - /** */ - private Collection<HadoopInputSplit> splitList; - + private static class MockIgfs extends IgfsMock { /** * Constructor. - * - * @param reducers Reducers count. - * @param splitList Splits. */ - private MockJob(int reducers, Collection<HadoopInputSplit> splitList) { - this.reducers = reducers; - this.splitList = splitList; - } - - /** {@inheritDoc} */ - @Override public HadoopJobId id() { - return null; - } - - /** {@inheritDoc} */ - @Override public HadoopJobInfo info() { - return new HadoopDefaultJobInfo() { - @Override public int reducers() { - return reducers; - } - }; + public MockIgfs() { + super("igfs"); } /** {@inheritDoc} */ - @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { - return splitList; - } - - /** {@inheritDoc} */ - @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void dispose(boolean external) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void cleanupStagingDirectory() { - // No-op. - } - } - - /** - * Mocked IGFS. - */ - private static class MockIgfs implements IgfsEx { - /** {@inheritDoc} */ @Override public boolean isProxy(URI path) { return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); } @@ -692,331 +608,8 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, - long maxLen) { - return null; - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgfsContext context() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsPaths proxyPaths() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsStatus globalSpace() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public Boolean globalSampling() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsLocalMetrics localMetrics() { - return null; - } - - /** {@inheritDoc} */ - @Override public long groupBlockSize() { - return 0; - } - - /** {@inheritDoc} */ - @Nullable @Override public String clientLogDirectory() { - return null; - } - - /** {@inheritDoc} */ - @Override public void clientLogDirectory(String logDir) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evictExclude(IgfsPath path, boolean primary) { - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return null; - } - - /** {@inheritDoc} */ - @Override public FileSystemConfiguration configuration() { - return null; - } - - /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { return true; } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile info(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary summary(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - return false; - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, - @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream append(IgfsPath path, boolean create) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgfsMetrics metrics() { - return null; - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long size(IgfsPath path) { - return 0; - } - - /** {@inheritDoc} */ - @Override public void format() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, - Collection<IgfsPath> paths, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, - Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, - @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, - @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, - long maxRangeLen, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid nextAffinityKey() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFileSystem withAsync() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isAsync() { - return false; - } - - /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsSecondaryFileSystem asSecondary() { - return null; - } - } - - /** - * Mocked Grid. - */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - private static class MockIgnite extends IgniteSpringBean implements IgniteEx { - /** {@inheritDoc} */ - @Override public IgniteClusterEx cluster() { - return (IgniteClusterEx)super.cluster(); - } - - /** {@inheritDoc} */ - @Override public IgniteFileSystem igfsx(String name) { - assert F.eq("igfs", name); - - return IGFS; - } - - /** {@inheritDoc} */ - @Override public Hadoop hadoop() { - return null; - } - - /** {@inheritDoc} */ - @Override public String name() { - return null; - } - - /** {@inheritDoc} */ - @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() { - return null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<IgniteInternalCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean eventUserRecordable(int type) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean allEventsUserRecordable(int[] types) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isJmxRemoteEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isRestartEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override public ClusterNode localNode() { - return null; - } - - /** {@inheritDoc} */ - @Override public String latestVersion() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridKernalContext context() { - return null; - } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index b703896..5d1de38 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -17,13 +17,130 @@ package org.apache.ignite.internal.processors.hadoop; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; /** * Test of whole cycle of map-reduce processing via Job tracker. */ -public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { +public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:[email protected]:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsUtils.PROP_USER_NAME); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. @@ -35,32 +152,274 @@ public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + final int red = 10_000; + final int blue = 20_000; + final int green = 15_000; + final int yellow = 7_000; + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); - for (boolean[] apiMode: getApiModes()) { - assert apiMode.length == 3; + for (int i = 0; i < 3; i++) { + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + boolean useNewMapper = (i & 1) == 0; + boolean useNewCombiner = (i & 2) == 0; + boolean useNewReducer = (i & 4) == 0; + + JobConf jobConf = new JobConf(); - boolean useNewMapper = apiMode[0]; - boolean useNewCombiner = apiMode[1]; - boolean useNewReducer = apiMode[2]; + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); + jobConf.setUser(USER); + jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); - doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + //To split into about 40 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setJarByClass(HadoopWordCount2.class); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + checkJobStatistics(jobId); + + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + + String actual = readAndSortFile(outFile, job.getConfiguration()); + + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + + useNewReducer, + "blue\t" + blue + "\n" + + "green\t" + green + "\n" + + "red\t" + red + "\n" + + "yellow\t" + yellow + "\n", + actual + ); } } /** - * Gets API mode combinations to be tested. - * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. + * Gets if to compress output data with Snappy. + * + * @return If to compress output data with Snappy. + */ + protected boolean compressOutputSnappy() { + return false; + } + + /** + * Simple test job statistics. * - * @return Arrays of booleans indicating API combinations to test. + * @param jobId Job id. + * @throws IgniteCheckedException */ - protected boolean[][] getApiModes() { - return new boolean[][] { - { false, false, false }, - { false, false, true }, - { false, true, false }, - { true, false, false }, - { true, true, true }, - }; + private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { + HadoopCounters cntrs = grid(0).hadoop().counters(jobId); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); + + Map<String, Integer> phaseOrders = new HashMap<>(); + phaseOrders.put("submit", 0); + phaseOrders.put("prepare", 1); + phaseOrders.put("start", 2); + phaseOrders.put("Cstart", 3); + phaseOrders.put("finish", 4); + + String prevTaskId = null; + + long apiEvtCnt = 0; + + for (T2<String, Long> evt : perfCntr.evts()) { + //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 + String[] parsedEvt = evt.get1().split(" "); + + String taskId; + String taskPhase; + + if ("JOB".equals(parsedEvt[0])) { + taskId = parsedEvt[0]; + taskPhase = parsedEvt[1]; + } + else { + taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; + taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; + } + + if (!taskId.equals(prevTaskId)) + tasks.put(taskId, new TreeMap<Integer,Long>()); + + Integer pos = phaseOrders.get(taskPhase); + + assertNotNull("Invalid phase " + taskPhase, pos); + + tasks.get(taskId).put(pos, evt.get2()); + + prevTaskId = taskId; + + apiEvtCnt++; + } + + for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { + Map<Integer, Long> order = task.getValue(); + + long prev = 0; + + for (Map.Entry<Integer, Long> phase : order.entrySet()) { + assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); + + prev = phase.getValue(); + } + } + + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return igfs.exists(statPath); + } + }, 20_000); + + final long apiEvtCnt0 = apiEvtCnt; + + boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { + return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }, 10000); + + if (!res) { + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + + assert false : "Invalid API events count [exp=" + apiEvtCnt0 + + ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + HadoopConfiguration hadoopCfg = createHadoopConfiguration(); + + if (hadoopCfg != null) + cfg.setHadoopConfiguration(hadoopCfg); + + return G.start(cfg); + } + + /** + * Creates custom Hadoop configuration. + * + * @return The Hadoop configuration. + */ + protected HadoopConfiguration createHadoopConfiguration() { + return null; + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java new file mode 100644 index 0000000..88d0f80 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.UUID; + +/** + * Mock job for planner tests. + */ +public class HadoopPlannerMockJob implements HadoopJob { + /** Input splits. */ + private final Collection<HadoopInputSplit> splits; + + /** Reducers count. */ + private final int reducers; + + /** + * Constructor. + * + * @param splits Input splits. + * @param reducers Reducers. + */ + public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) { + this.splits = splits; + this.reducers = reducers; + } + + /** {@inheritDoc} */ + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { + return splits; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return new JobInfo(reducers); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void dispose(boolean external) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + throwUnsupported(); + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + throwUnsupported(); + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } + + /** + * Mocked job info. + */ + private static class JobInfo implements HadoopJobInfo { + /** Reducers. */ + private final int reducers; + + /** + * Constructor. + * + * @param reducers Reducers. + */ + public JobInfo(int reducers) { + this.reducers = reducers; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + return reducers; + } + + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, + @Nullable String[] libNames) throws IgniteCheckedException { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String jobName() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public String user() { + throwUnsupported(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java new file mode 100644 index 0000000..4e7cc50 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedMapReducePlannerTest.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock; +import org.apache.ignite.internal.processors.igfs.IgfsMock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +/** + * Tests for weighted map-reduce planned. + */ +public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest { + /** ID 1. */ + private static final UUID ID_1 = new UUID(0, 1); + + /** ID 2. */ + private static final UUID ID_2 = new UUID(0, 2); + + /** ID 3. */ + private static final UUID ID_3 = new UUID(0, 3); + + /** MAC 1. */ + private static final String MAC_1 = "mac1"; + + /** MAC 2. */ + private static final String MAC_2 = "mac2"; + + /** MAC 3. */ + private static final String MAC_3 = "mac3"; + + /** Host 1. */ + private static final String HOST_1 = "host1"; + + /** Host 2. */ + private static final String HOST_2 = "host2"; + + /** Host 3. */ + private static final String HOST_3 = "host3"; + + /** Host 4. */ + private static final String HOST_4 = "host4"; + + /** Host 5. */ + private static final String HOST_5 = "host5"; + + /** Standard node 1. */ + private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1); + + /** Standard node 2. */ + private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2); + + /** Standard node 3. */ + private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3); + + /** Standard nodes. */ + private static final Collection<ClusterNode> NODES; + + /** + * Static initializer. + */ + static { + NODES = new ArrayList<>(); + + NODES.add(NODE_1); + NODES.add(NODE_2); + NODES.add(NODE_3); + } + + /** + * Test one IGFS split being assigned to affinity node. + * + * @throws Exception If failed. + */ + public void testOneIgfsSplitAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50)); + + final int expReducers = 4; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + assert plan.mappers() == 1; + assert plan.mapperNodeIds().size() == 1; + assert plan.mapperNodeIds().contains(ID_1); + + checkPlanMappers(plan, splits, NODES, false/*only 1 split*/); + checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/); + } + + /** + * Test one HDFS splits. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsAffinity() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 7; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Test HDFS splits with Replication == 3. + * + * @throws Exception If failed. + */ + public void testHdfsSplitsReplication() throws Exception { + IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs(); + + final List<HadoopInputSplit> splits = new ArrayList<>(); + + splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50)); + splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100)); + splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37)); + // The following splits belong to hosts that are out of Ignite topology at all. + // This means that these splits should be assigned to any least loaded modes: + splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2)); + splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3)); + + final int expReducers = 8; + + HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers); + + IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs); + + final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null); + + checkPlanMappers(plan, splits, NODES, true); + + checkPlanReducers(plan, NODES, expReducers, true); + } + + /** + * Get all IDs. + * + * @param nodes Nodes. + * @return IDs. + */ + private static Set<UUID> allIds(Collection<ClusterNode> nodes) { + Set<UUID> allIds = new HashSet<>(); + + for (ClusterNode n : nodes) + allIds.add(n.id()); + + return allIds; + } + + /** + * Check mappers for the plan. + * + * @param plan Plan. + * @param splits Splits. + * @param nodes Nodes. + * @param expectUniformity WHether uniformity is expected. + */ + private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits, + Collection<ClusterNode> nodes, boolean expectUniformity) { + // Number of mappers should correspomd to the number of input splits: + assertEquals(splits.size(), plan.mappers()); + + if (expectUniformity) { + // mappers are assigned to all available nodes: + assertEquals(nodes.size(), plan.mapperNodeIds().size()); + + + assertEquals(allIds(nodes), plan.mapperNodeIds()); + } + + // Check all splits are covered by mappers: + Set<HadoopInputSplit> set = new HashSet<>(); + + for (UUID id: plan.mapperNodeIds()) { + Collection<HadoopInputSplit> sp = plan.mappers(id); + + assert sp != null; + + for (HadoopInputSplit s: sp) + assertTrue(set.add(s)); + } + + // must be of the same size & contain same elements: + assertEquals(set, new HashSet<>(splits)); + } + + /** + * Check plan reducers. + * + * @param plan Plan. + * @param nodes Nodes. + * @param expReducers Expected reducers. + * @param expectUniformity Expected uniformity. + */ + private static void checkPlanReducers(HadoopMapReducePlan plan, + Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) { + + assertEquals(expReducers, plan.reducers()); + + if (expectUniformity) + assertEquals(allIds(nodes), plan.reducerNodeIds()); + + int sum = 0; + int lenSum = 0; + + for (UUID uuid: plan.reducerNodeIds()) { + int[] rr = plan.reducers(uuid); + + assert rr != null; + + lenSum += rr.length; + + for (int i: rr) + sum += i; + } + + assertEquals(expReducers, lenSum); + + // Numbers in the arrays must be consequtive integers stating from 0, + // check that simply calculating their total sum: + assertEquals((lenSum * (lenSum - 1) / 2), sum); + } + + /** + * Create planner for IGFS. + * + * @param igfs IGFS. + * @return Planner. + */ + private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) { + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs); + + GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite); + + return planner; + } + + /** + * Throw {@link UnsupportedOperationException}. + */ + private static void throwUnsupported() { + throw new UnsupportedOperationException("Should not be called!"); + } + + /** + * Mocked node. + */ + private static class MockNode implements ClusterNode { + /** ID. */ + private final UUID id; + + /** MAC addresses. */ + private final String macs; + + /** Addresses. */ + private final List<String> addrs; + + /** + * Constructor. + * + * @param id Node ID. + * @param macs MAC addresses. + * @param addrs Addresses. + */ + public MockNode(UUID id, String macs, String... addrs) { + assert addrs != null; + + this.id = id; + this.macs = macs; + + this.addrs = Arrays.asList(addrs); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T attribute(String name) { + if (F.eq(name, IgniteNodeAttributes.ATTR_MACS)) + return (T)macs; + + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return addrs; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics metrics() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public long order() { + throwUnsupported(); + + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + throwUnsupported(); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + throwUnsupported(); + + return false; + } + } + + /** + * Locations builder. + */ + private static class LocationsBuilder { + /** Locations. */ + private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>(); + + /** + * Create new locations builder. + * + * @return Locations builder. + */ + public static LocationsBuilder create() { + return new LocationsBuilder(); + } + + /** + * Add locations. + * + * @param start Start. + * @param nodes Nodes. + * @return This builder for chaining. + */ + public LocationsBuilder add(long start, MockNode... nodes) { + locs.put(start, Arrays.asList(nodes)); + + return this; + } + + /** + * Build locations. + * + * @return Locations. + */ + public TreeMap<Long, Collection<MockNode>> build() { + return locs; + } + + /** + * Build IGFS. + * + * @return IGFS. + */ + public MockIgfs buildIgfs() { + return new MockIgfs(build()); + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs extends IgfsMock { + /** Block locations. */ + private final TreeMap<Long, Collection<MockNode>> locs; + + /** + * Constructor. + * + * @param locs Block locations. + */ + public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) { + super("igfs"); + + this.locs = locs; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + Collection<IgfsBlockLocation> res = new ArrayList<>(); + + long cur = start; + long remaining = len; + + long prevLocStart = -1; + Collection<MockNode> prevLocNodes = null; + + for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) { + long locStart = locEntry.getKey(); + Collection<MockNode> locNodes = locEntry.getValue(); + + if (prevLocNodes != null) { + if (cur < locStart) { + // Add part from previous block. + long prevLen = locStart - prevLocStart; + + res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes)); + + cur = locStart; + remaining -= prevLen; + } + } + + prevLocStart = locStart; + prevLocNodes = locNodes; + + if (remaining == 0) + break; + } + + // Add remainder. + if (remaining != 0) + res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes)); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return false; + } + } + + /** + * Mocked block location. + */ + private static class IgfsBlockLocationMock implements IgfsBlockLocation { + /** Start. */ + private final long start; + + /** Length. */ + private final long len; + + /** Node IDs. */ + private final List<UUID> nodeIds; + + /** + * Constructor. + * + * @param start Start. + * @param len Length. + * @param nodes Nodes. + */ + public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) { + this.start = start; + this.len = len; + + this.nodeIds = new ArrayList<>(nodes.size()); + + for (MockNode node : nodes) + nodeIds.add(node.id); + } + + /** {@inheritDoc} */ + @Override public long start() { + return start; + } + + /** {@inheritDoc} */ + @Override public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> nodeIds() { + return nodeIds; + } + + /** {@inheritDoc} */ + @Override public Collection<String> names() { + throwUnsupported(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hosts() { + throwUnsupported(); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java new file mode 100644 index 0000000..e0403c2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopWeightedPlannerMapReduceTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner; + +/** + * Tests whole map-red execution Weighted planner. + */ +public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest { + /** {@inheritDoc} */ + @Override protected HadoopConfiguration createHadoopConfiguration() { + HadoopConfiguration hadoopCfg = new HadoopConfiguration(); + + // Use weighted planner with default settings: + IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner(); + + hadoopCfg.setMapReducePlanner(planner); + + return hadoopCfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1dc8bcc2/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 7d877ab..6900425 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -28,7 +28,6 @@ import org.apache.ignite.hadoop.cache.HadoopTxConfigCacheTest; import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactorySelfTest; import org.apache.ignite.hadoop.util.BasicUserNameMapperSelfTest; import org.apache.ignite.hadoop.util.ChainedUserNameMapperSelfTest; -import org.apache.ignite.hadoop.util.KerberosUserNameMapper; import org.apache.ignite.hadoop.util.KerberosUserNameMapperSelfTest; import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest; import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest; @@ -72,6 +71,8 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTasksV1Test; import org.apache.ignite.internal.processors.hadoop.HadoopTasksV2Test; import org.apache.ignite.internal.processors.hadoop.HadoopV2JobSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopValidationSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopWeightedMapReducePlannerTest; +import org.apache.ignite.internal.processors.hadoop.HadoopWeightedPlannerMapReduceTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest; @@ -110,6 +111,9 @@ public class IgniteHadoopTestSuite extends TestSuite { TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedMapReducePlannerTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(BasicUserNameMapperSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(KerberosUserNameMapperSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(ChainedUserNameMapperSelfTest.class.getName()))); @@ -156,7 +160,6 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopValidationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopJobTrackerSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName()))); @@ -176,6 +179,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedPlannerMapReduceTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopNoHadoopMapReduceTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceErrorResilienceTest.class.getName())));
