CRUNCH-93: Moving test support classes - Moved InMemoryEmitter to o.a.c.mem.emit package - Created a TestContext in CrunchTestSupport class for unit testing of various components - Deleted unit test code from DoFn e.g. configurationForTest - All things are retrived from context - Implement InMemorycontext that has only config elements for MemoryPipeline
Signed-off-by: Rahul Sharma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1cee024c Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1cee024c Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1cee024c Branch: refs/heads/master Commit: 1cee024c65938606d0d5794fbc4d10a1135f0f65 Parents: 14132b0 Author: Rahul Sharma <[email protected]> Authored: Fri Oct 12 21:40:52 2012 +0530 Committer: Rahul Sharma <[email protected]> Committed: Fri Oct 19 11:01:52 2012 +0530 ---------------------------------------------------------------------- crunch-test/pom.xml | 5 + .../org/apache/crunch/test/CrunchTestSupport.java | 70 ++++++++++++++- .../java/org/apache/crunch/test/TestCounters.java | 41 +++++++++ .../apache/crunch/test/CrunchTestSupportTest.java | 53 +++++++++++ crunch/pom.xml | 7 ++ crunch/src/main/java/org/apache/crunch/DoFn.java | 50 ++--------- .../java/org/apache/crunch/fn/CompositeMapFn.java | 6 -- .../main/java/org/apache/crunch/fn/PairMapFn.java | 5 - .../crunch/impl/mem/collect/MemCollection.java | 59 ++++++++++++- .../crunch/impl/mem/emit/InMemoryEmitter.java | 57 ++++++++++++ .../crunch/io/avro/AvroFileReaderFactory.java | 1 - .../apache/crunch/io/seq/SeqFileReaderFactory.java | 1 - .../crunch/io/seq/SeqFileTableReaderFactory.java | 2 - .../crunch/io/text/TextFileReaderFactory.java | 1 - .../org/apache/crunch/test/InMemoryEmitter.java | 57 ------------ .../java/org/apache/crunch/test/TestCounters.java | 41 --------- .../apache/crunch/types/avro/AvroTableType.java | 12 --- .../java/org/apache/crunch/types/avro/Avros.java | 34 ------- .../apache/crunch/types/writable/Writables.java | 31 ------- .../org/apache/crunch/lib/join/JoinFnTestBase.java | 9 +- .../java/org/apache/crunch/test/CountersTest.java | 11 +-- .../org/apache/crunch/types/avro/AvrosTest.java | 28 ++++-- 22 files changed, 321 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-test/pom.xml b/crunch-test/pom.xml index 6caf8ed..6378f31 100644 --- a/crunch-test/pom.xml +++ b/crunch-test/pom.xml @@ -59,6 +59,11 @@ under the License. <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> <dependency> <groupId>org.hamcrest</groupId> http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java index 7f74931..fc1a77a 100644 --- a/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java +++ b/crunch-test/src/main/java/org/apache/crunch/test/CrunchTestSupport.java @@ -17,14 +17,76 @@ */ package org.apache.crunch.test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.junit.Rule; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** - * A temporary workaround for Scala tests to use when working with Rule annotations - * until it gets fixed in JUnit 4.11. + * A temporary workaround for Scala tests to use when working with Rule + * annotations until it gets fixed in JUnit 4.11. */ public class CrunchTestSupport { @Rule - public TemporaryPath tempDir = new TemporaryPath( - "crunch.tmp.dir", "hadoop.tmp.dir"); + public TemporaryPath tempDir = new TemporaryPath("crunch.tmp.dir", "hadoop.tmp.dir"); + + /** + * The method creates a {@linkplain TaskInputOutputContext} which can be used + * in unit tests. The context servers very limited purpose. You can only + * operate with counters, taskAttempId ,status and configuration while using + * this context. + */ + public static <KI, VI, KO, VO> TaskInputOutputContext<KI, VI, KO, VO> getTestContext(final Configuration config) { + TaskInputOutputContext<KI, VI, KO, VO> context = Mockito.mock(TaskInputOutputContext.class); + TestCounters.clearCounters(); + final StateHolder holder = new StateHolder(); + + Mockito.when(context.getCounter(Mockito.any(Enum.class))).then(new Answer<Counter>() { + @Override + public Counter answer(InvocationOnMock invocation) throws Throwable { + Enum<?> counter = (Enum<?>) invocation.getArguments()[0]; + return TestCounters.getCounter(counter); + } + + }); + + Mockito.when(context.getCounter(Mockito.anyString(), Mockito.anyString())).then(new Answer<Counter>() { + @Override + public Counter answer(InvocationOnMock invocation) throws Throwable { + String group = (String) invocation.getArguments()[0]; + String name = (String) invocation.getArguments()[0]; + return TestCounters.getCounter(group, name); + } + + }); + + Mockito.when(context.getConfiguration()).thenReturn(config); + Mockito.when(context.getTaskAttemptID()).thenReturn(new TaskAttemptID()); + + Mockito.when(context.getStatus()).then(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return holder.internalStatus; + } + }); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + holder.internalStatus = (String) invocation.getArguments()[0]; + return null; + } + }).when(context).setStatus(Mockito.anyString()); + + return context; + + } + + static class StateHolder { + private String internalStatus; + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java new file mode 100644 index 0000000..bcb4da1 --- /dev/null +++ b/crunch-test/src/main/java/org/apache/crunch/test/TestCounters.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.test; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; + +/** + * A utility class used during unit testing to update and read counters. + */ +public class TestCounters { + + private static Counters COUNTERS = new Counters(); + + public static Counter getCounter(Enum<?> e) { + return COUNTERS.findCounter(e); + } + + public static Counter getCounter(String group, String name) { + return COUNTERS.findCounter(group, name); + } + + public static void clearCounters() { + COUNTERS = new Counters(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java ---------------------------------------------------------------------- diff --git a/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java new file mode 100644 index 0000000..65b86d9 --- /dev/null +++ b/crunch-test/src/test/java/org/apache/crunch/test/CrunchTestSupportTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.junit.Test; + +public class CrunchTestSupportTest { + + enum CT { + ONE, + }; + + @Test + public void testContext() { + Configuration sampleConfig = new Configuration(); + String status = "test"; + TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(sampleConfig); + assertEquals(sampleConfig, testContext.getConfiguration()); + TaskAttemptID taskAttemptID = testContext.getTaskAttemptID(); + assertEquals(taskAttemptID, testContext.getTaskAttemptID()); + assertNotNull(taskAttemptID); + assertNull(testContext.getStatus()); + testContext.setStatus(status); + assertEquals(status, testContext.getStatus()); + assertEquals(0, testContext.getCounter(CT.ONE).getValue()); + testContext.getCounter(CT.ONE).increment(1); + assertEquals(1, testContext.getCounter(CT.ONE).getValue()); + testContext.getCounter(CT.ONE).increment(4); + assertEquals(5, testContext.getCounter(CT.ONE).getValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/pom.xml ---------------------------------------------------------------------- diff --git a/crunch/pom.xml b/crunch/pom.xml index 5dcc489..39dcc75 100644 --- a/crunch/pom.xml +++ b/crunch/pom.xml @@ -75,6 +75,13 @@ under the License. <artifactId>jackson-mapper-asl</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>javassist</groupId> + <artifactId>javassist</artifactId> + <version>3.12.1.GA</version> + </dependency> + <!-- Both Protobufs and Thrift are supported as derived serialization types, and you can use http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java index 7d516de..8d7cc17 100644 --- a/crunch/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch/src/main/java/org/apache/crunch/DoFn.java @@ -19,7 +19,6 @@ package org.apache.crunch; import java.io.Serializable; -import org.apache.crunch.test.TestCounters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -38,8 +37,6 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext; */ public abstract class DoFn<S, T> implements Serializable { private transient TaskInputOutputContext<?, ?, ?, ?> context; - private transient Configuration testConf; - private transient String internalStatus; /** * Configure this DoFn. Subclasses may override this method to modify the @@ -61,8 +58,8 @@ public abstract class DoFn<S, T> implements Serializable { * this method to do appropriate initialization. * * <p> - * Called during the setup of the job instance this {@code DoFn} is - * associated with. + * Called during the setup of the job instance this {@code DoFn} is associated + * with. * </p> * */ @@ -110,16 +107,6 @@ public abstract class DoFn<S, T> implements Serializable { } /** - * Sets a {@code Configuration} instance to be used during unit tests. - * - * @param conf - * The Configuration instance. - */ - public void setConfigurationForTest(Configuration conf) { - this.testConf = conf; - } - - /** * Returns an estimate of how applying this function to a {@link PCollection} * will cause it to change in side. The optimizer uses these estimates to * decide where to break up dependent MR jobs into separate Map and Reduce @@ -138,25 +125,14 @@ public abstract class DoFn<S, T> implements Serializable { } protected Configuration getConfiguration() { - if (context != null) { - return context.getConfiguration(); - } else if (testConf != null) { - return testConf; - } - return null; + return context.getConfiguration(); } protected Counter getCounter(Enum<?> counterName) { - if (context == null) { - return TestCounters.getCounter(counterName); - } return context.getCounter(counterName); } protected Counter getCounter(String groupName, String counterName) { - if (context == null) { - return TestCounters.getCounter(groupName, counterName); - } return context.getCounter(groupName, counterName); } @@ -169,31 +145,19 @@ public abstract class DoFn<S, T> implements Serializable { } protected void progress() { - if (context != null) { - context.progress(); - } + context.progress(); } protected TaskAttemptID getTaskAttemptID() { - if (context != null) { - return context.getTaskAttemptID(); - } else { - return new TaskAttemptID(); - } + return context.getTaskAttemptID(); } protected void setStatus(String status) { - if (context != null) { - context.setStatus(status); - } - this.internalStatus = status; + context.setStatus(status); } protected String getStatus() { - if (context != null) { - return context.getStatus(); - } - return internalStatus; + return context.getStatus(); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java index a41daf4..4714fe4 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java +++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java @@ -61,10 +61,4 @@ public class CompositeMapFn<R, S, T> extends MapFn<R, T> { first.configure(conf); second.configure(conf); } - - @Override - public void setConfigurationForTest(Configuration conf) { - first.setConfigurationForTest(conf); - second.setConfigurationForTest(conf); - } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java index 63634ef..b25a6d8 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java +++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java @@ -55,9 +55,4 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> { values.cleanup(null); } - @Override - public void setConfigurationForTest(Configuration conf) { - keys.setConfigurationForTest(conf); - values.setConfigurationForTest(conf); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index a79ec2b..61bb1e7 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -17,8 +17,13 @@ */ package org.apache.crunch.impl.mem.collect; +import java.lang.reflect.Method; import java.util.Collection; +import javassist.util.proxy.MethodFilter; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; + import org.apache.crunch.DoFn; import org.apache.crunch.FilterFn; import org.apache.crunch.MapFn; @@ -30,14 +35,20 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mem.emit.InMemoryEmitter; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.lib.Sample; import org.apache.crunch.lib.Sort; import org.apache.crunch.materialize.pobject.CollectionPObject; -import org.apache.crunch.test.InMemoryEmitter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -103,6 +114,7 @@ public class MemCollection<S> implements PCollection<S> { @Override public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) { InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>(); + doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); doFn.initialize(); for (S s : collect) { doFn.process(s, emitter); @@ -214,4 +226,49 @@ public class MemCollection<S> implements PCollection<S> { public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) { return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType())); } + + /** + * The method creates a {@link TaskInputOutputContext} that will just provide + * {@linkplain Configuration}. The method has been implemented with javaassist + * as there are API changes in versions of Hadoop. In hadoop 1.0.3 the + * {@linkplain TaskInputOutputContext} is abstract class while in version 2 + * the same is an interface. + * <p> + * Note: The intention of this is to provide the bare essentials that are + * required to make the {@linkplain MemPipeline} work. It lacks even the basic + * things that can proved some support for unit testing pipeline. + */ + private static TaskInputOutputContext<?, ?, ?, ?> getInMemoryContext(final Configuration conf) { + ProxyFactory factory = new ProxyFactory(); + Class<TaskInputOutputContext> superType = TaskInputOutputContext.class; + Class[] types = new Class[0]; + Object[] args = new Object[0]; + if (superType.isInterface()) { + factory.setInterfaces(new Class[] { superType }); + } else { + types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class, + StatusReporter.class }; + args = new Object[] { conf, new TaskAttemptID(), null, null, null }; + factory.setSuperclass(superType); + } + factory.setFilter(new MethodFilter() { + @Override + public boolean isHandled(Method m) { + return m.getName().equals("getConfiguration"); + } + }); + MethodHandler handler = new MethodHandler() { + @Override + public Object invoke(Object arg0, Method arg1, Method arg2, Object[] arg3) throws Throwable { + return conf; + } + }; + try { + Object newInstance = factory.create(types, args, handler); + return (TaskInputOutputContext<?, ?, ?, ?>) newInstance; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java new file mode 100644 index 0000000..6976615 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.emit; + +import java.util.List; + +import org.apache.crunch.Emitter; + +import com.google.common.collect.Lists; + +/** + * An {@code Emitter} instance that writes emitted records to a backing + * {@code List}. + * + * @param <T> + */ +public class InMemoryEmitter<T> implements Emitter<T> { + + private final List<T> output; + + public InMemoryEmitter() { + this(Lists.<T> newArrayList()); + } + + public InMemoryEmitter(List<T> output) { + this.output = output; + } + + @Override + public void emit(T emitted) { + output.add(emitted); + } + + @Override + public void flush() { + + } + + public List<T> getOutput() { + return output; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index d1940cc..6f21dd2 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -69,7 +69,6 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { @Override public Iterator<T> read(FileSystem fs, final Path path) { - this.mapFn.setConfigurationForTest(conf); this.mapFn.initialize(); try { FsInput fsi = new FsInput(path, fs.getConf()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java index 050c0fc..ad1b81b 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java @@ -54,7 +54,6 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> { @Override public Iterator<T> read(FileSystem fs, final Path path) { - mapFn.setConfigurationForTest(conf); mapFn.initialize(); try { final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java index 7c34a75..20c749a 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java @@ -59,9 +59,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K @Override public Iterator<Pair<K, V>> read(FileSystem fs, final Path path) { - keyMapFn.setConfigurationForTest(conf); keyMapFn.initialize(); - valueMapFn.setConfigurationForTest(conf); valueMapFn.initialize(); try { final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java index 5a512fc..a0c48e0 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java @@ -63,7 +63,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { mapFn = ((CompositeMapFn) input).getSecond(); } } - mapFn.setConfigurationForTest(conf); mapFn.initialize(); FSDataInputStream is; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java deleted file mode 100644 index 1e0acb9..0000000 --- a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.test; - -import java.util.List; - -import org.apache.crunch.Emitter; - -import com.google.common.collect.Lists; - -/** - * An {@code Emitter} instance that writes emitted records to a backing - * {@code List}. - * - * @param <T> - */ -public class InMemoryEmitter<T> implements Emitter<T> { - - private final List<T> output; - - public InMemoryEmitter() { - this(Lists.<T> newArrayList()); - } - - public InMemoryEmitter(List<T> output) { - this.output = output; - } - - @Override - public void emit(T emitted) { - output.add(emitted); - } - - @Override - public void flush() { - - } - - public List<T> getOutput() { - return output; - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/test/TestCounters.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch/src/main/java/org/apache/crunch/test/TestCounters.java deleted file mode 100644 index bcb4da1..0000000 --- a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.test; - -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; - -/** - * A utility class used during unit testing to update and read counters. - */ -public class TestCounters { - - private static Counters COUNTERS = new Counters(); - - public static Counter getCounter(Enum<?> e) { - return COUNTERS.findCounter(e); - } - - public static Counter getCounter(String group, String name) { - return COUNTERS.findCounter(group, name); - } - - public static void clearCounters() { - COUNTERS = new Counters(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 285b423..5416c4f 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -57,12 +57,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT } @Override - public void setConfigurationForTest(Configuration conf) { - keyMapFn.setConfigurationForTest(conf); - valueMapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { keyMapFn.setContext(getContext()); valueMapFn.setContext(getContext()); @@ -99,12 +93,6 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT } @Override - public void setConfigurationForTest(Configuration conf) { - firstMapFn.setConfigurationForTest(conf); - secondMapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { firstMapFn.setContext(getContext()); secondMapFn.setContext(getContext()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 969af1d..4a83db5 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -291,11 +291,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { this.mapFn.setContext(getContext()); } @@ -335,11 +330,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { this.mapFn.setContext(getContext()); } @@ -378,11 +368,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { this.mapFn.setContext(getContext()); } @@ -410,11 +395,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { this.mapFn.setContext(getContext()); } @@ -460,13 +440,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - for (MapFn fn : fns) { - fn.setConfigurationForTest(conf); - } - } - - @Override public void initialize() { for (MapFn fn : fns) { fn.setContext(getContext()); @@ -527,13 +500,6 @@ public class Avros { } @Override - public void setConfigurationForTest(Configuration conf) { - for (MapFn fn : fns) { - fn.setConfigurationForTest(conf); - } - } - - @Override public void initialize() { this.schema = new Schema.Parser().parse(jsonSchema); for (MapFn fn : fns) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java index 23bc7f5..5e305b8 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -297,13 +297,6 @@ public class Writables { } @Override - public void setConfigurationForTest(Configuration conf) { - for (MapFn fn : fns) { - fn.setConfigurationForTest(conf); - } - } - - @Override public void initialize() { for (MapFn fn : fns) { fn.setContext(getContext()); @@ -353,12 +346,6 @@ public class Writables { } } - @Override - public void setConfigurationForTest(Configuration conf) { - for (MapFn fn : fns) { - fn.setConfigurationForTest(conf); - } - } @Override public void initialize() { @@ -439,10 +426,6 @@ public class Writables { mapFn.configure(conf); } - @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } @Override public void initialize() { @@ -474,10 +457,6 @@ public class Writables { mapFn.configure(conf); } - @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } @Override public void initialize() { @@ -516,11 +495,6 @@ public class Writables { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { mapFn.setContext(getContext()); } @@ -551,11 +525,6 @@ public class Writables { } @Override - public void setConfigurationForTest(Configuration conf) { - mapFn.setConfigurationForTest(conf); - } - - @Override public void initialize() { mapFn.setContext(getContext()); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java index 741899e..9e4337f 100644 --- a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java +++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.crunch.Emitter; import org.apache.crunch.Pair; +import org.apache.crunch.test.CrunchTestSupport; import org.apache.crunch.test.StringWrapper; import org.apache.hadoop.conf.Configuration; import org.junit.Before; @@ -41,7 +42,7 @@ public abstract class JoinFnTestBase { @Before public void setUp() { joinFn = getJoinFn(); - joinFn.setConfigurationForTest(new Configuration()); + joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration())); joinFn.initialize(); emitter = mock(Emitter.class); } @@ -67,13 +68,11 @@ public abstract class JoinFnTestBase { } - protected abstract void checkOutput( - Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter); + protected abstract void checkOutput(Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter); protected abstract JoinFn<StringWrapper, StringWrapper, String> getJoinFn(); - protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, - String rightValue) { + protected List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) { Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue); List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList(); valuePairList.add(valuePair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/test/CountersTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java index 3df7657..66f854e 100644 --- a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java +++ b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java @@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; -import org.junit.After; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; /** @@ -35,12 +35,11 @@ public class CountersTest { THREE }; - @After - public void after() { - TestCounters.clearCounters(); - } - public static class CTFn extends DoFn<String, String> { + CTFn() { + setContext(CrunchTestSupport.getTestContext(new Configuration())); + } + @Override public void process(String input, Emitter<String> emitter) { getCounter(CT.ONE).increment(1); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1cee024c/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java index 801829d..dabf0fe 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java @@ -37,13 +37,16 @@ import org.apache.crunch.Pair; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.test.CrunchTestSupport; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.junit.Test; import com.google.common.collect.ImmutableList; @@ -186,6 +189,11 @@ public class AvrosTest { @SuppressWarnings("rawtypes") public void testWritables() throws Exception { AvroType at = Avros.writables(LongWritable.class); + + TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new Configuration()); + at.getInputMapFn().setContext(testContext); + at.getOutputMapFn().setContext(testContext); + LongWritable lw = new LongWritable(1729L); assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw))); } @@ -222,9 +230,9 @@ public class AvrosTest { public void testIsPrimitive_PrimitiveMappedType() { assertTrue(Avros.isPrimitive(Avros.ints())); } - + @Test - public void testIsPrimitive_TruePrimitiveValue(){ + public void testIsPrimitive_TruePrimitiveValue() { AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier()); assertTrue(Avros.isPrimitive(truePrimitiveAvroType)); } @@ -294,20 +302,20 @@ public class AvrosTest { assertEquals(pair, doubleMappedPair); } - + @Test - public void testPairOutputMapFn_VerifyNoObjectReuse(){ + public void testPairOutputMapFn_VerifyNoObjectReuse() { StringWrapper stringWrapper = new StringWrapper("Test"); - - Pair<Integer,StringWrapper> pair = Pair.of(1, stringWrapper); - + + Pair<Integer, StringWrapper> pair = Pair.of(1, stringWrapper); + AvroType<Pair<Integer, StringWrapper>> pairType = Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)); - + pairType.getOutputMapFn().initialize(); - + Object outputMappedValueA = pairType.getOutputMapFn().map(pair); Object outputMappedValueB = pairType.getOutputMapFn().map(pair); - + assertEquals(outputMappedValueA, outputMappedValueB); assertNotSame(outputMappedValueA, outputMappedValueB); }
