IGNITE-2354: Hadoop: Added tests for errors thrown on different MR stages. This closes #622.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d9f4f6e8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d9f4f6e8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d9f4f6e8 Branch: refs/heads/ignite-db-x-10884 Commit: d9f4f6e8dfdab37a730651b8850ebb976d4b1d72 Parents: 34fc271 Author: iveselovskiy <[email protected]> Authored: Tue Apr 12 16:15:37 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 12 16:15:37 2016 +0300 ---------------------------------------------------------------------- .../hadoop/HadoopAbstractMapReduceTest.java | 405 +++++++++++++++++++ .../processors/hadoop/HadoopErrorSimulator.java | 326 +++++++++++++++ .../HadoopMapReduceErrorResilienceTest.java | 154 +++++++ .../processors/hadoop/HadoopMapReduceTest.java | 380 +---------------- .../hadoop/HadoopSnappyFullMapReduceTest.java | 8 + .../hadoop/examples/HadoopWordCount1Map.java | 12 + .../hadoop/examples/HadoopWordCount1Reduce.java | 5 + .../hadoop/examples/HadoopWordCount2.java | 2 +- .../examples/HadoopWordCount2Combiner.java | 45 +++ .../hadoop/examples/HadoopWordCount2Mapper.java | 19 +- .../examples/HadoopWordCount2Reducer.java | 43 +- .../testsuites/IgniteHadoopTestSuite.java | 8 +- 12 files changed, 1038 insertions(+), 369 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java new file mode 100644 index 0000000..d09ec61 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.FileSystemConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; +import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; +import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; +import org.apache.ignite.igfs.IgfsMode; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.igfs.IgfsMode.PRIMARY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; + +/** + * Abstract test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:[email protected]:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** Red constant. */ + protected static final int red = 10_000; + + /** Blue constant. */ + protected static final int blue = 20_000; + + /** Green constant. */ + protected static final int green = 15_000; + + /** Yellow constant. */ + protected static final int yellow = 7_000; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsUtils.PROP_USER_NAME); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** + * Does actual test job + * + * @param useNewMapper flag to use new mapper API. + * @param useNewCombiner flag to use new combiner API. + * @param useNewReducer flag to use new reducer API. + */ + protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer) + throws Exception { + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + JobConf jobConf = new JobConf(); + + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); + jobConf.setUser(USER); + jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); + + //To split into about 40 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + + Job job = Job.getInstance(jobConf); + + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setJarByClass(HadoopWordCount2.class); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + checkJobStatistics(jobId); + + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + + String actual = readAndSortFile(outFile, job.getConfiguration()); + + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + + useNewReducer, + "blue\t" + blue + "\n" + + "green\t" + green + "\n" + + "red\t" + red + "\n" + + "yellow\t" + yellow + "\n", + actual + ); + } + + /** + * Gets if to compress output data with Snappy. + * + * @return If to compress output data with Snappy. + */ + protected boolean compressOutputSnappy() { + return false; + } + + /** + * Simple test job statistics. + * + * @param jobId Job id. + * @throws IgniteCheckedException + */ + private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { + HadoopCounters cntrs = grid(0).hadoop().counters(jobId); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); + + Map<String, Integer> phaseOrders = new HashMap<>(); + phaseOrders.put("submit", 0); + phaseOrders.put("prepare", 1); + phaseOrders.put("start", 2); + phaseOrders.put("Cstart", 3); + phaseOrders.put("finish", 4); + + String prevTaskId = null; + + long apiEvtCnt = 0; + + for (T2<String, Long> evt : perfCntr.evts()) { + //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 + String[] parsedEvt = evt.get1().split(" "); + + String taskId; + String taskPhase; + + if ("JOB".equals(parsedEvt[0])) { + taskId = parsedEvt[0]; + taskPhase = parsedEvt[1]; + } + else { + taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; + taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; + } + + if (!taskId.equals(prevTaskId)) + tasks.put(taskId, new TreeMap<Integer,Long>()); + + Integer pos = phaseOrders.get(taskPhase); + + assertNotNull("Invalid phase " + taskPhase, pos); + + tasks.get(taskId).put(pos, evt.get2()); + + prevTaskId = taskId; + + apiEvtCnt++; + } + + for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { + Map<Integer, Long> order = task.getValue(); + + long prev = 0; + + for (Map.Entry<Integer, Long> phase : order.entrySet()) { + assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); + + prev = phase.getValue(); + } + } + + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return igfs.exists(statPath); + } + }, 20_000); + + final long apiEvtCnt0 = apiEvtCnt; + + boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { + return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }, 10000); + + if (!res) { + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + + assert false : "Invalid API events count [exp=" + apiEvtCnt0 + + ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** {@inheritDoc} */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java new file mode 100644 index 0000000..843b42b --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Error simulator. + */ +public class HadoopErrorSimulator { + /** No-op singleton instance. */ + public static final HadoopErrorSimulator noopInstance = new HadoopErrorSimulator(); + + /** Instance ref. */ + private static final AtomicReference<HadoopErrorSimulator> ref = new AtomicReference<>(noopInstance); + + /** + * Creates simulator of given kind with given stage bits. + * + * @param kind The kind. + * @param bits The stage bits. + * @return The simulator. + */ + public static HadoopErrorSimulator create(Kind kind, int bits) { + switch (kind) { + case Noop: + return noopInstance; + case Runtime: + return new RuntimeExceptionBitHadoopErrorSimulator(bits); + case IOException: + return new IOExceptionBitHadoopErrorSimulator(bits); + case Error: + return new ErrorBitHadoopErrorSimulator(bits); + default: + throw new IllegalStateException("Unknown kind: " + kind); + } + } + + /** + * Gets the error simulator instance. + */ + public static HadoopErrorSimulator instance() { + return ref.get(); + } + + /** + * Sets instance. + */ + public static boolean setInstance(HadoopErrorSimulator expect, HadoopErrorSimulator update) { + return ref.compareAndSet(expect, update); + } + + /** + * Constructor. + */ + private HadoopErrorSimulator() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMap() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onMapClose() throws IOException { + // no-op + } + + /** + * setConf() does not declare IOException to be thrown. + */ + public void onCombineConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombineSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombine() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onCombineCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceConfigure() { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceSetup() throws IOException, InterruptedException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduce() throws IOException { + // no-op + } + + /** + * Invoked on the named stage. + */ + public void onReduceCleanup() throws IOException, InterruptedException { + // no-op + } + + /** + * Error kind. + */ + public enum Kind { + /** No error. */ + Noop, + + /** Runtime. */ + Runtime, + + /** IOException. */ + IOException, + + /** java.lang.Error. */ + Error + } + + /** + * Runtime error simulator. + */ + public static class RuntimeExceptionBitHadoopErrorSimulator extends HadoopErrorSimulator { + /** Stage bits: defines what map-reduce stages will cause errors. */ + private final int bits; + + /** + * Constructor. + */ + protected RuntimeExceptionBitHadoopErrorSimulator(int b) { + bits = b; + } + + /** + * Simulates an error. + */ + protected void simulateError() throws IOException { + throw new RuntimeException("An error simulated by " + getClass().getSimpleName()); + } + + /** {@inheritDoc} */ + @Override public final void onMapConfigure() { + try { + if ((bits & 1) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onMapSetup() throws IOException, InterruptedException { + if ((bits & 2) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onMap() throws IOException { + if ((bits & 4) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onMapCleanup() throws IOException, InterruptedException { + if ((bits & 8) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombineConfigure() { + try { + if ((bits & 16) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onCombineSetup() throws IOException, InterruptedException { + if ((bits & 32) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombine() throws IOException { + if ((bits & 64) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onCombineCleanup() throws IOException, InterruptedException { + if ((bits & 128) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduceConfigure() { + try { + if ((bits & 256) != 0) + simulateError(); + } + catch (IOException e) { + // ignore + } + } + + /** {@inheritDoc} */ + @Override public final void onReduceSetup() throws IOException, InterruptedException { + if ((bits & 512) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduce() throws IOException { + if ((bits & 1024) != 0) + simulateError(); + } + + /** {@inheritDoc} */ + @Override public final void onReduceCleanup() throws IOException, InterruptedException { + if ((bits & 2048) != 0) + simulateError(); + } + } + + /** + * java.lang.Error simulator. + */ + public static class ErrorBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { + /** + * Constructor. + */ + public ErrorBitHadoopErrorSimulator(int bits) { + super(bits); + } + + /** {@inheritDoc} */ + @Override protected void simulateError() { + throw new Error("An error simulated by " + getClass().getSimpleName()); + } + } + + /** + * IOException simulator. + */ + public static class IOExceptionBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator { + /** + * Constructor. + */ + public IOExceptionBitHadoopErrorSimulator(int bits) { + super(bits); + } + + /** {@inheritDoc} */ + @Override protected void simulateError() throws IOException { + throw new IOException("An IOException simulated by " + getClass().getSimpleName()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java new file mode 100644 index 0000000..dd12935 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; + +/** + * Test of error resiliency after an error in a map-reduce job execution. + * Combinations tested: + * { new ALI, old API } + * x { unchecked exception, checked exception, error } + * x { phase where the error happens }. + */ +public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest { + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Runtime() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_IOException() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError0_Error() throws Exception { + doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error); + } + + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Runtime() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_IOException() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException); + } + /** + * Tests recovery. + * + * @throws Exception If failed. + */ + public void testRecoveryAfterAnError7_Error() throws Exception { + doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** + * Tests correct work after an error. + * + * @throws Exception On error. + */ + private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception { + try { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow); + + boolean useNewMapper = (useNewBits & 1) == 0; + boolean useNewCombiner = (useNewBits & 2) == 0; + boolean useNewReducer = (useNewBits & 4) == 0; + + for (int i = 0; i < 12; i++) { + int bits = 1 << i; + + System.out.println("############################ Simulator kind = " + simulatorKind + + ", Stage bits = " + bits); + + HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits); + + doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer); + } + } catch (Throwable t) { + t.printStackTrace(); + + fail("Unexpected throwable: " + t); + } + } + + /** + * Performs test with given error simulator. + * + * @param sim The simulator. + * @param inFile Input file. + * @param useNewMapper If the use new mapper API. + * @param useNewCombiner If to use new combiner. + * @param useNewReducer If to use new reducer API. + * @throws Exception If failed. + */ + private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper, + boolean useNewCombiner, boolean useNewReducer) throws Exception { + // Set real simulating error simulator: + assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim)); + + try { + // Expect failure there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } + catch (Throwable t) { // This may be an Error. + // Expected: + System.out.println(t.toString()); // Ignore, continue the test. + } + + // Set no-op error simulator: + assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance)); + + // Expect success there: + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 4426847..b703896 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -17,129 +17,13 @@ package org.apache.ignite.internal.processors.hadoop; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; -import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem; -import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; -import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; -import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1; import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.igfs.IgfsMode.PRIMARY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY; -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo; /** * Test of whole cycle of map-reduce processing via Job tracker. */ -public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { - /** IGFS block size. */ - protected static final int IGFS_BLOCK_SIZE = 512 * 1024; - - /** Amount of blocks to prefetch. */ - protected static final int PREFETCH_BLOCKS = 1; - - /** Amount of sequential block reads before prefetch is triggered. */ - protected static final int SEQ_READS_BEFORE_PREFETCH = 2; - - /** Secondary file system URI. */ - protected static final String SECONDARY_URI = "igfs://igfs-secondary:[email protected]:11500/"; - - /** Secondary file system configuration path. */ - protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; - - /** The user to run Hadoop job on behalf of. */ - protected static final String USER = "vasya"; - - /** Secondary IGFS name. */ - protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; - - /** The secondary Ignite node. */ - protected Ignite igniteSecondary; - - /** The secondary Fs. */ - protected IgfsSecondaryFileSystem secondaryFs; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * Gets owner of a IgfsEx path. - * @param p The path. - * @return The owner. - */ - private static String getOwner(IgfsEx i, IgfsPath p) { - return i.info(p).property(IgfsUtils.PROP_USER_NAME); - } - - /** - * Gets owner of a secondary Fs path. - * @param secFs The sec Fs. - * @param p The path. - * @return The owner. - */ - private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { - return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { - @Override public String apply() { - return secFs.info(p).property(IgfsUtils.PROP_USER_NAME); - } - }); - } - - /** - * Checks owner of the path. - * @param p The path. - */ - private void checkOwner(IgfsPath p) { - String ownerPrim = getOwner(igfs, p); - assertEquals(USER, ownerPrim); - - String ownerSec = getOwnerSecondary(secondaryFs, p); - assertEquals(USER, ownerSec); - } - +public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest { /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. @@ -151,260 +35,32 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); - final int red = 10_000; - final int blue = 20_000; - final int green = 15_000; - final int yellow = 7_000; - generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow ); - for (int i = 0; i < 3; i++) { - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - boolean useNewMapper = (i & 1) == 0; - boolean useNewCombiner = (i & 2) == 0; - boolean useNewReducer = (i & 4) == 0; - - JobConf jobConf = new JobConf(); - - jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser(USER); - jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); - - //To split into about 40 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); - - Job job = Job.getInstance(jobConf); - - HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy()); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setJarByClass(HadoopWordCount2.class); - - HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - checkJobStatistics(jobId); - - final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; - - checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); - - checkOwner(new IgfsPath(outFile)); - - String actual = readAndSortFile(outFile, job.getConfiguration()); - - assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, - "blue\t" + blue + "\n" + - "green\t" + green + "\n" + - "red\t" + red + "\n" + - "yellow\t" + yellow + "\n", - actual - ); - } - } - - /** - * Gets if to compress output data with Snappy. - * - * @return If to compress output data with Snappy. - */ - protected boolean compressOutputSnappy() { - return false; - } - - /** - * Simple test job statistics. - * - * @param jobId Job id. - * @throws IgniteCheckedException - */ - private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { - HadoopCounters cntrs = grid(0).hadoop().counters(jobId); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); - - Map<String, Integer> phaseOrders = new HashMap<>(); - phaseOrders.put("submit", 0); - phaseOrders.put("prepare", 1); - phaseOrders.put("start", 2); - phaseOrders.put("Cstart", 3); - phaseOrders.put("finish", 4); - - String prevTaskId = null; - - long apiEvtCnt = 0; - - for (T2<String, Long> evt : perfCntr.evts()) { - //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 - String[] parsedEvt = evt.get1().split(" "); - - String taskId; - String taskPhase; - - if ("JOB".equals(parsedEvt[0])) { - taskId = parsedEvt[0]; - taskPhase = parsedEvt[1]; - } - else { - taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; - taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; - } - - if (!taskId.equals(prevTaskId)) - tasks.put(taskId, new TreeMap<Integer,Long>()); - - Integer pos = phaseOrders.get(taskPhase); - - assertNotNull("Invalid phase " + taskPhase, pos); - - tasks.get(taskId).put(pos, evt.get2()); - - prevTaskId = taskId; - - apiEvtCnt++; - } - - for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { - Map<Integer, Long> order = task.getValue(); - - long prev = 0; - - for (Map.Entry<Integer, Long> phase : order.entrySet()) { - assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); - - prev = phase.getValue(); - } - } - - final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return igfs.exists(statPath); - } - }, 20_000); - - final long apiEvtCnt0 = apiEvtCnt; - - boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) { - return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }, 10000); + for (boolean[] apiMode: getApiModes()) { + assert apiMode.length == 3; - if (!res) { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + boolean useNewMapper = apiMode[0]; + boolean useNewCombiner = apiMode[1]; + boolean useNewReducer = apiMode[2]; - assert false : "Invalid API events count [exp=" + apiEvtCnt0 + - ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; + doTest(inFile, useNewMapper, useNewCombiner, useNewReducer); } } - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); - - super.beforeTest(); - } - /** - * Start grid with IGFS. + * Gets API mode combinations to be tested. + * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet. * - * @param gridName Grid name. - * @param igfsName IGFS name - * @param mode IGFS mode. - * @param secondaryFs Secondary file system (optional). - * @param restCfg Rest configuration string (optional). - * @return Started grid instance. - * @throws Exception If failed. - */ - protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, - @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { - FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); - - igfsCfg.setDataCacheName("dataCache"); - igfsCfg.setMetaCacheName("metaCache"); - igfsCfg.setName(igfsName); - igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); - igfsCfg.setDefaultMode(mode); - igfsCfg.setIpcEndpointConfiguration(restCfg); - igfsCfg.setSecondaryFileSystem(secondaryFs); - igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); - igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setNearConfiguration(null); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - dataCacheCfg.setOffHeapMaxMemory(0); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setFileSystemConfiguration(igfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - return G.start(cfg); - } - - /** - * @return IGFS configuration. + * @return Arrays of booleans indicating API combinations to test. */ - @Override public FileSystemConfiguration igfsConfiguration() throws Exception { - FileSystemConfiguration fsCfg = super.igfsConfiguration(); - - secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); - - fsCfg.setSecondaryFileSystem(secondaryFs); - - return fsCfg; + protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, false }, + { false, false, true }, + { false, true, false }, + { true, false, false }, + { true, true, true }, + }; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java index 22d33a5..27a5fcd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java @@ -25,4 +25,12 @@ public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest { @Override protected boolean compressOutputSnappy() { return true; } + + /** {@inheritDoc} */ + @Override protected boolean[][] getApiModes() { + return new boolean[][] { + { false, false, true }, + { true, true, true }, + }; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java index 30b12bd..d4cd190 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator; /** * Mapper phase of WordCount job. @@ -56,6 +57,8 @@ public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWri output.collect(word, one); } + + HadoopErrorSimulator.instance().onMap(); } /** {@inheritDoc} */ @@ -63,5 +66,14 @@ public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWri super.configure(job); wasConfigured = true; + + HadoopErrorSimulator.instance().onMapConfigure(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + super.close(); + + HadoopErrorSimulator.instance().onMapClose(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java index 2335911..b400d9b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; +import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator; /** * Combiner and Reducer phase of WordCount job. @@ -45,6 +46,8 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex sum += values.next().get(); output.collect(key, new IntWritable(sum)); + + HadoopErrorSimulator.instance().onReduce(); } /** {@inheritDoc} */ @@ -52,5 +55,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex super.configure(job); wasConfigured = true; + + HadoopErrorSimulator.instance().onReduceConfigure(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java index 4b508ca..b2cfee3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -91,7 +91,7 @@ public class HadoopWordCount2 { } if (setCombiner) - job.setCombinerClass(HadoopWordCount2Reducer.class); + job.setCombinerClass(HadoopWordCount2Combiner.class); if (setReducer) { job.setReducerClass(HadoopWordCount2Reducer.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java new file mode 100644 index 0000000..0d25e3c --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.hadoop.examples; + +import java.io.IOException; +import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator; + +/** + * Combiner function with pluggable error simulator. + */ +public class HadoopWordCount2Combiner extends HadoopWordCount2Reducer { + /** {@inheritDoc} */ + @Override protected void configError() { + HadoopErrorSimulator.instance().onCombineConfigure(); + } + + /** {@inheritDoc} */ + @Override protected void setupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombineSetup(); + } + + /** {@inheritDoc} */ + @Override protected void reduceError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombine(); + } + + /** {@inheritDoc} */ + @Override protected void cleanupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onCombineCleanup(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java index 0d0c128..76857e6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator; /** * Mapper phase of WordCount job. @@ -53,17 +54,31 @@ public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritab ctx.write(word, one); } + + HadoopErrorSimulator.instance().onMap(); } /** {@inheritDoc} */ - @Override protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + wasSetUp = true; + + HadoopErrorSimulator.instance().onMapSetup(); + } + + /** {@inheritDoc} */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + super.cleanup(ctx); + + HadoopErrorSimulator.instance().onMapCleanup(); } /** {@inheritDoc} */ @Override public void setConf(Configuration conf) { wasConfigured = true; + + HadoopErrorSimulator.instance().onMapConfigure(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java index 63a9d95..e780170 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator; /** * Combiner and Reducer phase of WordCount job. @@ -50,6 +51,15 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In totalWordCnt.set(wordCnt); ctx.write(key, totalWordCnt); + + reduceError(); + } + + /** + * Simulates reduce error if needed. + */ + protected void reduceError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduce(); } /** {@inheritDoc} */ @@ -57,16 +67,47 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In super.setup(context); wasSetUp = true; + + setupError(); + } + + /** + * Simulates setup error if needed. + */ + protected void setupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduceSetup(); + } + + /** {@inheritDoc} */ + @Override protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + + cleanupError(); + } + + /** + * Simulates cleanup error if needed. + */ + protected void cleanupError() throws IOException, InterruptedException { + HadoopErrorSimulator.instance().onReduceCleanup(); } /** {@inheritDoc} */ @Override public void setConf(Configuration conf) { wasConfigured = true; + + configError(); + } + + /** + * Simulates configuration error if needed. + */ + protected void configError() { + HadoopErrorSimulator.instance().onReduceConfigure(); } /** {@inheritDoc} */ @Override public Configuration getConf() { return null; } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 3358e18..554cbc7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopFileSystemsTest; import org.apache.ignite.internal.processors.hadoop.HadoopGroupingTest; import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest; import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceErrorResilienceTest; import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest; import org.apache.ignite.internal.processors.hadoop.HadoopNoHadoopMapReduceTest; import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest; @@ -168,6 +169,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopNoHadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceErrorResilienceTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName()))); @@ -246,15 +248,15 @@ public class IgniteHadoopTestSuite extends TestSuite { X.println("tmp: " + tmpPath); - File install = new File(tmpPath + File.separatorChar + "__hadoop"); + final File install = new File(tmpPath + File.separatorChar + "__hadoop"); - File home = new File(install, destName); + final File home = new File(install, destName); X.println("Setting " + homeVariable + " to " + home.getAbsolutePath()); System.setProperty(homeVariable, home.getAbsolutePath()); - File successFile = new File(home, "__success"); + final File successFile = new File(home, "__success"); if (home.exists()) { if (successFile.exists()) {
