Updated Branches: refs/heads/ACCUMULO-391 d344ad6d1 -> 335acb8de
Merging map/reduce test for AccumuloInputFormat with the MultiTable version. ACCUMULO-391 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/335acb8d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/335acb8d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/335acb8d Branch: refs/heads/ACCUMULO-391 Commit: 335acb8def354990a80cc081428a28d671f7e25c Parents: d344ad6 Author: Corey J. Nolet <[email protected]> Authored: Wed Sep 18 21:53:59 2013 -0400 Committer: Corey J. Nolet <[email protected]> Committed: Wed Sep 18 21:54:24 2013 -0400 ---------------------------------------------------------------------- .../mapreduce/AccumuloInputFormatTest.java | 180 ++++--------------- .../client/mapreduce/multi/ContextFactory.java | 174 ------------------ 2 files changed, 36 insertions(+), 318 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/335acb8d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java index 2c88a01..08b4b2c 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java @@ -16,20 +16,17 @@ */ package org.apache.accumulo.core.client.mapreduce; -import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit; +import static java.util.Arrays.asList; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.addIterator; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.fetchColumns; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getFetchedColumns; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getIterators; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.getRanges; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setConnectorInfo; -import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setInputTableName; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setInputTableNames; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setMockInstance; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setRanges; import static org.apache.accumulo.core.client.mapreduce.InputFormatBase.setScanAuthorizations; -import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createMapContext; -import static org.apache.accumulo.core.client.mapreduce.multi.ContextFactory.createTaskAttemptContext; import static org.apache.accumulo.core.iterators.user.RegExFilter.setRegexs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -39,15 +36,12 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import junit.framework.Assert; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -68,11 +62,8 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -83,7 +74,8 @@ public class AccumuloInputFormatTest { private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName(); private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance"; private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1"; - + private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2"; + /** * Check that the iterator configuration is getting stored in the Job conf correctly. * @@ -226,6 +218,7 @@ public class AccumuloInputFormatTest { private static AssertionError e2 = null; private static class MRTester extends Configured implements Tool { + private static class TestMapper extends Mapper<Key,Value,Key,Value> { Key key = null; int count = 0; @@ -233,10 +226,11 @@ public class AccumuloInputFormatTest { @Override protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { try { + String tableName = ((InputFormatBase.RangeInputSplit)context.getInputSplit ()).getTableName (); if (key != null) assertEquals(key.getRow().toString(), new String(v.get())); - assertEquals(k.getRow(), new Text(String.format("%09x", count + 1))); - assertEquals(new String(v.get()), String.format("%09x", count)); + assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow()); + assertEquals(String.format("%s_%09x", tableName, count), new String(v.get())); } catch (AssertionError e) { e1 = e; } @@ -257,22 +251,23 @@ public class AccumuloInputFormatTest { @Override public int run(String[] args) throws Exception { - if (args.length != 3) { + if (args.length != 4) { throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user> <pass> <table>"); } String user = args[0]; String pass = args[1]; - String table = args[2]; - + String table1 = args[2]; + String table2 = args[3]; + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); job.setInputFormatClass(AccumuloInputFormat.class); - setConnectorInfo (job, user, new PasswordToken (pass)); - setInputTableName (job, table); - setMockInstance (job, INSTANCE_NAME); + setConnectorInfo(job, user, new PasswordToken(pass)); + setInputTableNames (job, asList (new String[]{table1, table2})); + setMockInstance(job, INSTANCE_NAME); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -290,70 +285,40 @@ public class AccumuloInputFormatTest { assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args)); } } - - /** - * A sample Mapper that verifies aspects of the input. - * - * This mapper verifies that all keys passed to it are for the expected - * table and that it sees exactly 100 keys. - * - */ - static class MultitableMapper extends Mapper<Key,Value,Key,Value> { - private int count; - private Text expectedTable; - - public void expectedTable(Text t) { - this.expectedTable = t; - } - - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - super.setup(context); - count = 0; - } - - @Override - protected void map(Key k, Value v, Context context) throws IOException, InterruptedException { - assertEquals(expectedTable.toString (), ((RangeInputSplit)context.getInputSplit ()).getTableName ()); - ++count; - } - - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - super.cleanup(context); - Assert.assertEquals (100, count); - } - } - @Test public void testMap() throws Exception { MockInstance mockInstance = new MockInstance(INSTANCE_NAME); Connector c = mockInstance.getConnector("root", new PasswordToken("")); c.tableOperations().create(TEST_TABLE_1); + c.tableOperations().create(TEST_TABLE_2); BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig()); + BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig()); for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); + Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1))); + t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes())); + bw.addMutation(t1m); + Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1))); + t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes())); + bw2.addMutation(t2m); } bw.close(); + bw2.close (); - MRTester.main(new String[] {"root", "", TEST_TABLE_1}); + MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2}); assertNull(e1); assertNull(e2); } + /** * Asserts that the configuration contains the expected ranges for the tables. */ @Test public void testMultitableRangeSerialization() throws Throwable { - List<String> tables = Arrays.asList ("t1", "t2", "t3"); + List<String> tables = asList ("t1", "t2", "t3"); Job job = new Job(new Configuration()); job.setInputFormatClass(AccumuloInputFormat.class); - job.setMapperClass (MultitableMapper.class); + job.setMapperClass (MRTester.TestMapper.class); job.setNumReduceTasks (0); setConnectorInfo (job, "root", new PasswordToken (new byte[0])); setInputTableNames (job, tables); @@ -362,10 +327,10 @@ public class AccumuloInputFormatTest { HashMap<String, Collection<Range>> tblRanges = new HashMap<String, Collection<Range>>(); for(String tbl : tables) { - List<Range> ranges = Arrays.asList( - new Range("a", "b"), - new Range("c", "d"), - new Range("e", "f") ); + List<Range> ranges = asList ( + new Range ("a", "b"), + new Range ("c", "d"), + new Range ("e", "f")); tblRanges.put(tbl, ranges); } @@ -387,10 +352,10 @@ public class AccumuloInputFormatTest { */ @Test public void testMultitableIteratorSerialization() throws Throwable { - HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2")); + HashSet<String> tables = new HashSet<String>(asList ("t1", "t2")); Job job = new Job(new Configuration()); job.setInputFormatClass (AccumuloInputFormat.class); - job.setMapperClass (MultitableMapper.class); + job.setMapperClass (MRTester.TestMapper.class); job.setNumReduceTasks (0); setConnectorInfo (job, "root", new PasswordToken (new byte[0])); setInputTableNames (job, tables); @@ -413,10 +378,10 @@ public class AccumuloInputFormatTest { @Test public void testMultitableColumnSerialization() throws IOException, AccumuloSecurityException { - HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2")); + HashSet<String> tables = new HashSet<String>(asList ("t1", "t2")); Job job = new Job(new Configuration()); job.setInputFormatClass (AccumuloInputFormat.class); - job.setMapperClass (MultitableMapper.class); + job.setMapperClass (MRTester.TestMapper.class); job.setNumReduceTasks (0); setConnectorInfo (job, "root", new PasswordToken (new byte[0])); setInputTableNames (job, tables); @@ -435,79 +400,6 @@ public class AccumuloInputFormatTest { Collection<Pair<Text,Text>> t1actual = getFetchedColumns (job, "t1"); assertEquals(columns.get("t1"), t1actual); Collection<Pair<Text,Text>> t2actual = getFetchedColumns (job, "t2"); + assertEquals(columns.get("t2"), t2actual); } - - - /** - * Creates five tables, table0 through table4, that get loaded with 100 keys each. - * - * This test expects that each table is filled with 100 entries and that a sample - * MapReduce job is created to scan all five. We should see five input splits; one for - * each table. - * - * The sample job uses the TestMapper class defined locally to this test. Verification - * of features such as expected table and number of keys is performed via the TestMapper. - * - * @throws Throwable - */ - @Test - public void testMultitableMap() throws Throwable { - MockInstance mockInstance = new MockInstance("testmapinstance"); - Connector c = mockInstance.getConnector("root", new byte[] {}); - StringBuilder tablesBuilder = new StringBuilder(); - LinkedList<String> tablesList = new LinkedList<String>(); - for(int i = 0; i < 5; ++i) { - String table = "table" + i; - tablesList.add(table); - writeData(c, table); - tablesBuilder.append(table).append(','); - } - tablesBuilder.setLength(tablesBuilder.length() - 1); - - Job job = new Job(new Configuration()); - job.setInputFormatClass (AccumuloInputFormat.class); - job.setMapperClass (MultitableMapper.class); - job.setNumReduceTasks (0); - setConnectorInfo (job, "root", new PasswordToken (new byte[0])); - setInputTableNames (job, tablesList); - setScanAuthorizations (job, new Authorizations ()); - setMockInstance (job, "testmapinstance"); - - AccumuloInputFormat input = new AccumuloInputFormat (); - List<InputSplit> splits = input.getSplits(job); - assertEquals(splits.size(), 5); - - MultitableMapper mapper = (MultitableMapper) job.getMapperClass().newInstance(); - for (InputSplit split : splits) { - TaskAttemptContext tac = createTaskAttemptContext (job); - RecordReader<Key,Value> reader = input.createRecordReader(split, tac); - Mapper<Key,Value,Key,Value>.Context context = createMapContext (mapper, tac, reader, null, split); - reader.initialize(split, context); - mapper.expectedTable( new Text( ((RangeInputSplit) split).getTableName () ) ); - mapper.run(context); - } - } - - /** - * Writes data out to a table. - * - * The data written out is 100 entries, with the row being a number 1-100 and the value - * being a number one less than the row (0-99). - * - * @param c - * @param table - * @throws Throwable - */ - static void writeData(Connector c, String table) throws Throwable { - c.tableOperations().create(table); - BatchWriter bw = c.createBatchWriter(table, 10000L, 1000L, 4); - for (int i = 0; i < 100; i++) { - Mutation m = new Mutation(new Text(String.format("%09x", i + 1))); - m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes())); - bw.addMutation(m); - } - bw.close(); - } - - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/335acb8d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/multi/ContextFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/multi/ContextFactory.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/multi/ContextFactory.java deleted file mode 100644 index ecb28cd..0000000 --- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/multi/ContextFactory.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.accumulo.core.client.mapreduce.multi; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MapContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.StatusReporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -/** - * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based - * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0. - */ -public class ContextFactory { - - private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR; - private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR; - private static final Constructor<?> TASK_ID_CONSTRUCTOR; - private static final Constructor<?> MAP_CONSTRUCTOR; - private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR; - private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR; - private static final Class<?> TASK_TYPE_CLASS; - private static final boolean useV21; - - static { - boolean v21 = true; - final String PACKAGE = "org.apache.hadoop.mapreduce"; - try { - Class.forName(PACKAGE + ".task.JobContextImpl"); - } catch (ClassNotFoundException cnfe) { - v21 = false; - } - useV21 = v21; - Class<?> jobContextCls; - Class<?> taskContextCls; - Class<?> mapCls; - Class<?> mapContextCls; - Class<?> innerMapContextCls; - try { - if (v21) { - jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl"); - taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl"); - TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType"); - mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); - mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); - innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context"); - } else { - jobContextCls = Class.forName(PACKAGE + ".JobContext"); - taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext"); - TASK_TYPE_CLASS = null; - mapContextCls = Class.forName(PACKAGE + ".MapContext"); - mapCls = Class.forName(PACKAGE + ".Mapper"); - innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context"); - } - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Can't find class", e); - } - try { - JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class); - JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); - TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class); - TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); - if (useV21) { - TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class); - TASK_ID_CONSTRUCTOR.setAccessible(true); - MAP_CONSTRUCTOR = mapCls.getConstructor(); - MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class); - MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, - OutputCommitter.class, StatusReporter.class, InputSplit.class); - MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true); - } else { - TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class); - TASK_ID_CONSTRUCTOR.setAccessible(true); - MAP_CONSTRUCTOR = null; - MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, - OutputCommitter.class, StatusReporter.class, InputSplit.class); - MAP_CONTEXT_IMPL_CONSTRUCTOR = null; - } - MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); - } catch (SecurityException e) { - throw new IllegalArgumentException("Can't run constructor ", e); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("Can't find constructor ", e); - } - } - - public static JobContext createJobContext() { - return createJobContext(new Configuration()); - } - - public static JobContext createJobContext(Configuration conf) { - try { - return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0)); - } catch (InstantiationException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException("Can't create object", e); - } - } - - public static TaskAttemptContext createTaskAttemptContext(JobContext job) { - return createTaskAttemptContext(job.getConfiguration()); - } - - public static TaskAttemptContext createTaskAttemptContext(Configuration conf) { - try { - if (useV21) - return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, - TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0)); - else - return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0)); - } catch (InstantiationException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException("Can't create object", e); - } - } - - public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, - RecordWriter<K2,V2> writer, InputSplit split) { - return createMapContext(m, tac, reader, writer, null, null, split); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader, - RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { - try { - if (useV21) { - Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis); - } else { - return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, - split); - } - } catch (InstantiationException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Can't create object", e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException("Can't create object", e); - } - } -}
