Author: billie
Date: Tue Jan 15 22:14:07 2013
New Revision: 1433695
URL: http://svn.apache.org/viewvc?rev=1433695&view=rev
Log:
ACCUMULO-730 converted input and output format tests to use local mr. no more
contextfactorysvn status
Removed:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithOutputFormat.java
Modified:
accumulo/trunk/core/pom.xml
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
accumulo/trunk/examples/simple/pom.xml
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
accumulo/trunk/pom.xml
Modified: accumulo/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/pom.xml?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
--- accumulo/trunk/core/pom.xml (original)
+++ accumulo/trunk/core/pom.xml Tue Jan 15 22:14:07 2013
@@ -106,6 +106,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
</dependencies>
</profile>
</profiles>
Modified:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Tue Jan 15 22:14:07 2013
@@ -17,92 +17,177 @@
package org.apache.accumulo.core.client.mapreduce;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class AccumuloFileOutputFormatTest {
- static Job job;
- static TaskAttemptContext tac;
- static Path f = null;
-
- @Before
- public void setup() throws IOException {
- job = new Job();
-
- Path file = new Path("target/");
- f = new Path(file, "_temporary");
- job.getConfiguration().set("mapred.output.dir", file.toString());
-
- tac = ContextFactory.createTaskAttemptContext(job);
- }
-
- @After
- public void teardown() throws IOException {
- if (f != null && f.getFileSystem(job.getConfiguration()).exists(f)) {
- f.getFileSystem(job.getConfiguration()).delete(f, true);
- }
+ public static TemporaryFolder folder = new TemporaryFolder();
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ folder.create();
+
+ MockInstance mockInstance = new MockInstance("testinstance");
+ Connector c = mockInstance.getConnector("root", new byte[] {});
+ c.tableOperations().create("emptytable");
+ c.tableOperations().create("testtable");
+ c.tableOperations().create("badtable");
+ BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+ Mutation m = new Mutation("Key");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ bw = c.createBatchWriter("badtable", new BatchWriterConfig());
+ m = new Mutation("r1");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq1", "A&B");
+ m.put("cf1", "cq2", "A&");
+ bw.addMutation(m);
+ bw.close();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ folder.delete();
}
@Test
- public void testEmptyWrite() throws IOException, InterruptedException {
+ public void testEmptyWrite() throws Exception {
handleWriteTests(false);
}
@Test
- public void testRealWrite() throws IOException, InterruptedException {
+ public void testRealWrite() throws Exception {
handleWriteTests(true);
}
- public void handleWriteTests(boolean content) throws IOException,
InterruptedException {
- AccumuloFileOutputFormat afof = new AccumuloFileOutputFormat();
- RecordWriter<Key,Value> rw = afof.getRecordWriter(tac);
-
- if (content)
- rw.write(new Key("Key"), new Value("".getBytes()));
-
- Path file = afof.getDefaultWorkFile(tac, ".rf");
- System.out.println(file);
- rw.close(tac);
-
- if (content)
- assertTrue(file.getFileSystem(job.getConfiguration()).exists(file));
- else
- assertFalse(file.getFileSystem(job.getConfiguration()).exists(file));
- file.getFileSystem(tac.getConfiguration()).delete(file.getParent(), true);
+ private static class MRTester extends Configured implements Tool {
+ private static class BadKeyMapper extends Mapper<Key,Value,Key,Value> {
+ int index = 0;
+
+ @Override
+ protected void map(Key key, Value value, Context context) throws
IOException, InterruptedException {
+ try {
+ try {
+ context.write(key, value);
+ if (index == 2)
+ assertTrue(false);
+ } catch (Exception e) {
+ assertEquals(2, index);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ index++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ try {
+ assertEquals(2, index);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 4) {
+ throw new IllegalArgumentException("Usage : " +
MRTester.class.getName() + " <user> <pass> <table> <outputfile>");
+ }
+
+ String user = args[0];
+ String pass = args[1];
+ String table = args[2];
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" +
System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ Authorizations authorizations;
+ authorizations = Constants.NO_AUTHS;
+
+ AccumuloInputFormat.setInputInfo(job.getConfiguration(), user,
pass.getBytes(), table, authorizations);
+ AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testinstance");
+ AccumuloFileOutputFormat.setOutputPath(job, new Path(args[3]));
+
+ job.setMapperClass("badtable".equals(table) ? BadKeyMapper.class :
Mapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new
MRTester(), args));
+ }
+ }
+
+ public void handleWriteTests(boolean content) throws Exception {
+ File f = folder.newFile();
+ f.delete();
+ MRTester.main(new String[] {"root", "", content ? "testtable" :
"emptytable", f.getAbsolutePath()});
+
+ assertTrue(f.exists());
+ File[] files = f.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().startsWith("part-m-");
+ }
+ });
+ if (content) {
+ assertEquals(1, files.length);
+ assertTrue(files[0].exists());
+ } else {
+ assertEquals(0, files.length);
+ }
}
@Test
- public void writeBadVisibility() throws IOException, InterruptedException {
- AccumuloFileOutputFormat afof = new AccumuloFileOutputFormat();
- RecordWriter<Key,Value> rw = afof.getRecordWriter(tac);
-
- Path file = afof.getDefaultWorkFile(tac, ".rf");
-
- rw.write(new Key("r1", "cf1", "cq1", "A&B"), new Value("".getBytes()));
- rw.write(new Key("r1", "cf1", "cq2", "A&B"), new Value("".getBytes()));
- try {
- rw.write(new Key("r1", "cf1", "cq2", "A&"), new Value("".getBytes()));
- assertFalse(true);
- } catch (Exception e) {}
-
- file.getFileSystem(tac.getConfiguration()).delete(file.getParent(), true);
+ public void writeBadVisibility() throws Exception {
+ File f = folder.newFile();
+ f.delete();
+ MRTester.main(new String[] {"root", "", "badtable", f.getAbsolutePath()});
+ assertNull(e1);
+ assertNull(e2);
}
-
+
@Test
public void validateConfiguration() throws IOException, InterruptedException
{
@@ -112,6 +197,7 @@ public class AccumuloFileOutputFormatTes
long d = 10l;
String e = "type";
+ Job job = new Job();
AccumuloFileOutputFormat.setReplication(job, a);
AccumuloFileOutputFormat.setFileBlockSize(job, b);
AccumuloFileOutputFormat.setDataBlockSize(job, c);
Modified:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Tue Jan 15 22:14:07 2013
@@ -17,6 +17,7 @@
package org.apache.accumulo.core.client.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
@@ -24,28 +25,27 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
-import
org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Test;
@@ -62,7 +62,7 @@ public class AccumuloInputFormatTest {
*/
@Test
public void testMaxVersions() throws IOException {
- JobContext job = ContextFactory.createJobContext();
+ Job job = new Job();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 1);
int version = AccumuloInputFormat.getMaxVersions(job.getConfiguration());
assertEquals(1, version);
@@ -76,26 +76,29 @@ public class AccumuloInputFormatTest {
*/
@Test(expected = IOException.class)
public void testMaxVersionsLessThan1() throws IOException {
- JobContext job = ContextFactory.createJobContext();
+ Job job = new Job();
AccumuloInputFormat.setMaxVersions(job.getConfiguration(), 0);
}
/**
* Test no max version configured.
+ *
+ * @throws IOException
*/
@Test
- public void testNoMaxVersion() {
- JobContext job = ContextFactory.createJobContext();
+ public void testNoMaxVersion() throws IOException {
+ Job job = new Job();
assertEquals(-1,
AccumuloInputFormat.getMaxVersions(job.getConfiguration()));
}
/**
* Check that the iterator configuration is getting stored in the Job conf
correctly.
- * @throws IOException
+ *
+ * @throws IOException
*/
@Test
public void testSetIterator() throws IOException {
- JobContext job = ContextFactory.createJobContext();
+ Job job = new Job();
IteratorSetting is = new IteratorSetting(1, "WholeRow",
"org.apache.accumulo.core.iterators.WholeRowIterator");
AccumuloInputFormat.addIterator(job.getConfiguration(), is);
@@ -107,8 +110,8 @@ public class AccumuloInputFormatTest {
}
@Test
- public void testAddIterator() {
- JobContext job = ContextFactory.createJobContext();
+ public void testAddIterator() throws IOException {
+ Job job = new Job();
AccumuloInputFormat.addIterator(job.getConfiguration(), new
IteratorSetting(1, "WholeRow", WholeRowIterator.class));
AccumuloInputFormat.addIterator(job.getConfiguration(), new
IteratorSetting(2, "Versions",
"org.apache.accumulo.core.iterators.VersioningIterator"));
@@ -179,10 +182,12 @@ public class AccumuloInputFormatTest {
/**
* Test getting iterator settings for multiple iterators set
+ *
+ * @throws IOException
*/
@Test
- public void testGetIteratorSettings() {
- JobContext job = ContextFactory.createJobContext();
+ public void testGetIteratorSettings() throws IOException {
+ Job job = new Job();
AccumuloInputFormat.addIterator(job.getConfiguration(), new
IteratorSetting(1, "WholeRow",
"org.apache.accumulo.core.iterators.WholeRowIterator"));
AccumuloInputFormat.addIterator(job.getConfiguration(), new
IteratorSetting(2, "Versions",
"org.apache.accumulo.core.iterators.VersioningIterator"));
@@ -212,8 +217,8 @@ public class AccumuloInputFormatTest {
}
@Test
- public void testSetRegex() {
- JobContext job = ContextFactory.createJobContext();
+ public void testSetRegex() throws IOException {
+ Job job = new Job();
String regex = ">\"*%<>\'\\";
@@ -224,18 +229,71 @@ public class AccumuloInputFormatTest {
assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConfiguration()).get(0).getName()));
}
- static class TestMapper extends Mapper<Key,Value,Key,Value> {
- Key key = null;
- int count = 0;
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper extends Mapper<Key,Value,Key,Value> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException,
InterruptedException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ try {
+ assertEquals(100, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+ }
@Override
- protected void map(Key k, Value v, Context context) throws IOException,
InterruptedException {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- key = new Key(k);
- count++;
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Usage : " +
MRTester.class.getName() + " <user> <pass> <table>");
+ }
+
+ String user = args[0];
+ String pass = args[1];
+ String table = args[2];
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" +
System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.setInputInfo(job.getConfiguration(), user,
pass.getBytes(), table, Constants.NO_AUTHS);
+ AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testmapinstance");
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new
MRTester(), args));
}
}
@@ -252,54 +310,8 @@ public class AccumuloInputFormatTest {
}
bw.close();
- Job job = new Job(new Configuration());
- job.setInputFormatClass(AccumuloInputFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setNumReduceTasks(0);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "testtable", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testmapinstance");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
-
- TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
- for (InputSplit split : splits) {
- TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
- RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
- Mapper<Key,Value,Key,Value>.Context context =
ContextFactory.createMapContext(mapper, tac, reader, null, split);
- reader.initialize(split, context);
- mapper.run(context);
- }
- }
-
- @Test
- public void testSimple() throws Exception {
- MockInstance mockInstance = new MockInstance("testmapinstance");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- c.tableOperations().create("testtable2");
- BatchWriter bw = c.createBatchWriter("testtable2", new
BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x",
i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- JobContext job = ContextFactory.createJobContext();
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "testtable2", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testmapinstance");
- AccumuloInputFormat input = new AccumuloInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
- RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
- rr.initialize(ris, tac);
-
- TestMapper mapper = new TestMapper();
- Mapper<Key,Value,Key,Value>.Context context =
ContextFactory.createMapContext(mapper, tac, rr, null, ris);
- rr.initialize(ris, tac);
- while (rr.nextKeyValue()) {
- mapper.map(rr.getCurrentKey(), rr.getCurrentValue(),
(TestMapper.Context) context);
- }
+ MRTester.main(new String[] {"root", "", "testtable"});
+ assertNull(e1);
+ assertNull(e2);
}
}
Modified:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
Tue Jan 15 22:14:07 2013
@@ -18,13 +18,14 @@ package org.apache.accumulo.core.client.
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
@@ -34,42 +35,87 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
/**
*
*/
public class AccumuloOutputFormatTest {
- static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
- Key key = null;
- int count = 0;
+ private static AssertionError e1 = null;
+
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper extends Mapper<Key,Value,Text,Mutation> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ protected void map(Key k, Value v, Context context) throws IOException,
InterruptedException {
+ try {
+ if (key != null)
+ assertEquals(key.getRow().toString(), new String(v.get()));
+ assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+ assertEquals(new String(v.get()), String.format("%09x", count));
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ key = new Key(k);
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ Mutation m = new Mutation("total");
+ m.put("", "", Integer.toString(count));
+ context.write(new Text(), m);
+ }
+ }
@Override
- protected void map(Key k, Value v, Context context) throws IOException,
InterruptedException {
- if (key != null)
- assertEquals(key.getRow().toString(), new String(v.get()));
- assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
- assertEquals(new String(v.get()), String.format("%09x", count));
- key = new Key(k);
- count++;
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 4) {
+ throw new IllegalArgumentException("Usage : " +
MRTester.class.getName() + " <user> <pass> <inputtable> <outputtable>");
+ }
+
+ String user = args[0];
+ String pass = args[1];
+ String table1 = args[2];
+ String table2 = args[3];
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" +
System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+
+ AccumuloInputFormat.setInputInfo(job.getConfiguration(), user,
pass.getBytes(), table1, Constants.NO_AUTHS);
+ AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testmrinstance");
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Mutation.class);
+
+ AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), user,
pass.getBytes(), false, table2);
+ AccumuloOutputFormat.setMockInstance(job.getConfiguration(),
"testmrinstance");
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
}
- @Override
- protected void cleanup(Context context) throws IOException,
InterruptedException {
- super.cleanup(context);
- Mutation m = new Mutation("total");
- m.put("", "", Integer.toString(count));
- try {
- context.write(new Text(), m);
- } catch (NullPointerException e) {}
+ public static void main(String[] args) throws Exception {
+ assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new
MRTester(), args));
}
}
@@ -87,34 +133,8 @@ public class AccumuloOutputFormatTest {
}
bw.close();
- Job job = new Job();
- job.setInputFormatClass(AccumuloInputFormat.class);
- job.setMapperClass(TestMapper.class);
- job.setOutputFormatClass(AccumuloOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Mutation.class);
- job.setNumReduceTasks(0);
- AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "testtable1", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConfiguration(),
"testmrinstance");
- AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), "root",
"".getBytes(), false, "testtable2");
- AccumuloOutputFormat.setMockInstance(job.getConfiguration(),
"testmrinstance");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- List<InputSplit> splits = input.getSplits(job);
- assertEquals(splits.size(), 1);
-
- AccumuloOutputFormat output = new AccumuloOutputFormat();
-
- TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
- for (InputSplit split : splits) {
- TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
- RecordReader<Key,Value> reader = input.createRecordReader(split, tac);
- RecordWriter<Text,Mutation> writer = output.getRecordWriter(tac);
- Mapper<Key,Value,Text,Mutation>.Context context =
ContextFactory.createMapContext(mapper, tac, reader, writer, split);
- reader.initialize(split, context);
- mapper.run(context);
- writer.close(context);
- }
+ MRTester.main(new String[] {"root", "", "testtable1", "testtable2"});
+ assertNull(e1);
Scanner scanner = c.createScanner("testtable2", new Authorizations());
Iterator<Entry<Key,Value>> iter = scanner.iterator();
Modified:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
Tue Jan 15 22:14:07 2013
@@ -17,7 +17,7 @@
package org.apache.accumulo.core.client.mapreduce;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -26,28 +26,26 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import
org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
public class AccumuloRowInputFormatTest {
@@ -55,9 +53,11 @@ public class AccumuloRowInputFormatTest
private static final String ROW2 = "row2";
private static final String ROW3 = "row3";
private static final String COLF1 = "colf1";
- private transient final List<Entry<Key,Value>> row1;
- private transient final List<Entry<Key,Value>> row2;
- private transient final List<Entry<Key,Value>> row3;
+ private static List<Entry<Key,Value>> row1;
+ private static List<Entry<Key,Value>> row2;
+ private static List<Entry<Key,Value>> row3;
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
public AccumuloRowInputFormatTest() {
row1 = new ArrayList<Entry<Key,Value>>();
@@ -69,7 +69,7 @@ public class AccumuloRowInputFormatTest
row3 = new ArrayList<Entry<Key,Value>>();
row3.add(new KeyValue(new Key(ROW3, COLF1, "colq5"), "v5".getBytes()));
}
-
+
public static void checkLists(final List<Entry<Key,Value>> first, final
List<Entry<Key,Value>> second) {
assertEquals("Sizes should be the same.", first.size(), second.size());
for (int i = 0; i < first.size(); i++) {
@@ -79,59 +79,118 @@ public class AccumuloRowInputFormatTest
}
public static void checkLists(final List<Entry<Key,Value>> first, final
Iterator<Entry<Key,Value>> second) {
- int entryIndex = 0; // NOPMD
+ int entryIndex = 0;
while (second.hasNext()) {
final Entry<Key,Value> entry = second.next();
assertEquals("Keys should be equal", first.get(entryIndex).getKey(),
entry.getKey());
assertEquals("Values should be equal", first.get(entryIndex).getValue(),
entry.getValue());
- entryIndex++; // NOPMD
+ entryIndex++;
}
}
public static void insertList(final BatchWriter writer, final
List<Entry<Key,Value>> list) throws MutationsRejectedException {
for (Entry<Key,Value> e : list) {
final Key key = e.getKey();
- final Mutation mutation = new Mutation(key.getRow()); // NOPMD
- ColumnVisibility colVisibility = new
ColumnVisibility(key.getColumnVisibility()); // NOPMD
+ final Mutation mutation = new Mutation(key.getRow());
+ ColumnVisibility colVisibility = new
ColumnVisibility(key.getColumnVisibility());
mutation.put(key.getColumnFamily(), key.getColumnQualifier(),
colVisibility, key.getTimestamp(), e.getValue());
writer.addMutation(mutation);
}
}
+ private static class MRTester extends Configured implements Tool {
+ private static class TestMapper extends
Mapper<Text,PeekingIterator<Entry<Key,Value>>,Key,Value> {
+ int count = 0;
+
+ @Override
+ protected void map(Text k, PeekingIterator<Entry<Key,Value>> v, Context
context) throws IOException, InterruptedException {
+ try {
+ switch (count) {
+ case 0:
+ assertEquals("Current key should be " + ROW1, new Text(ROW1), k);
+ checkLists(row1, v);
+ break;
+ case 1:
+ assertEquals("Current key should be " + ROW2, new Text(ROW2), k);
+ checkLists(row2, v);
+ break;
+ case 2:
+ assertEquals("Current key should be " + ROW3, new Text(ROW3), k);
+ checkLists(row3, v);
+ break;
+ default:
+ assertTrue(false);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ try {
+ assertEquals(3, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 3) {
+ throw new IllegalArgumentException("Usage : " +
MRTester.class.getName() + " <user> <pass> <table>");
+ }
+
+ String user = args[0];
+ String pass = args[1];
+ String table = args[2];
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" +
System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(AccumuloRowInputFormat.class);
+
+ AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), user,
pass.getBytes(), table, Constants.NO_AUTHS);
+ AccumuloRowInputFormat.setMockInstance(job.getConfiguration(),
"instance1");
+
+ job.setMapperClass(TestMapper.class);
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new
MRTester(), args));
+ }
+ }
+
@Test
- public void test() throws AccumuloException, AccumuloSecurityException,
TableExistsException, TableNotFoundException, IOException, InterruptedException
{
+ public void test() throws Exception {
final MockInstance instance = new MockInstance("instance1");
final Connector conn = instance.getConnector("root", "".getBytes());
conn.tableOperations().create("test");
- BatchWriter writer = null; // NOPMD
+ BatchWriter writer = null;
try {
writer = conn.createBatchWriter("test", new BatchWriterConfig());
- insertList(writer, row1);
- insertList(writer, row2);
- insertList(writer, row3);
+ insertList(writer, row1);
+ insertList(writer, row2);
+ insertList(writer, row3);
} finally {
if (writer != null) {
- writer.close();
+ writer.close();
}
}
- final JobContext job = ContextFactory.createJobContext();
- AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "test", new Authorizations());
- AccumuloRowInputFormat.setMockInstance(job.getConfiguration(),
"instance1");
- final AccumuloRowInputFormat crif = new AccumuloRowInputFormat();
- final RangeInputSplit ris = new RangeInputSplit();
- final TaskAttemptContext tac =
ContextFactory.createTaskAttemptContext(job);
- final RecordReader<Text,PeekingIterator<Entry<Key,Value>>> recReader =
crif.createRecordReader(ris, tac);
- recReader.initialize(ris, tac);
-
- assertTrue("Next key value should be true.", recReader.nextKeyValue());
- assertEquals("Current key should be " + ROW1, new Text(ROW1),
recReader.getCurrentKey());
- checkLists(row1, recReader.getCurrentValue());
- assertTrue("Next key value should be true.", recReader.nextKeyValue());
- assertEquals("Current key should be " + ROW2, new Text(ROW2),
recReader.getCurrentKey());
- checkLists(row2, recReader.getCurrentValue());
- assertTrue("Next key value should be true.", recReader.nextKeyValue());
- assertEquals("Current key should be " + ROW3, new Text(ROW3),
recReader.getCurrentKey());
- checkLists(row3, recReader.getCurrentValue());
- assertFalse("Next key value should be false.", recReader.nextKeyValue());
+ MRTester.main(new String[] {"root", "", "test"});
+ assertNull(e1);
+ assertNull(e2);
}
}
Modified: accumulo/trunk/examples/simple/pom.xml
URL:
http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/pom.xml?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
--- accumulo/trunk/examples/simple/pom.xml (original)
+++ accumulo/trunk/examples/simple/pom.xml Tue Jan 15 22:14:07 2013
@@ -59,6 +59,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
</dependencies>
</profile>
</profiles>
Modified:
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
---
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
(original)
+++
accumulo/trunk/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
Tue Jan 15 22:14:07 2013
@@ -24,30 +24,33 @@ import java.util.Map.Entry;
import junit.framework.TestCase;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import
org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.util.ContextFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
public class ChunkInputFormatTest extends TestCase {
- private static final Logger log = Logger.getLogger(ChunkInputStream.class);
- List<Entry<Key,Value>> data;
- List<Entry<Key,Value>> baddata;
+ private static AssertionError e0 = null;
+ private static AssertionError e1 = null;
+ private static AssertionError e2 = null;
+ private static IOException e3 = null;
+
+ private static final Authorizations AUTHS = new Authorizations("A", "B",
"C", "D");
+
+ private static List<Entry<Key,Value>> data;
+ private static List<Entry<Key,Value>> baddata;
{
data = new ArrayList<Entry<Key,Value>>();
@@ -72,7 +75,149 @@ public class ChunkInputFormatTest extend
assertEquals(e1.getValue(), e2.getValue());
}
- public void test() throws IOException, InterruptedException,
AccumuloException, AccumuloSecurityException, TableExistsException,
TableNotFoundException {
+ public static class CIFTester extends Configured implements Tool {
+ public static class TestMapper extends
Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ int count = 0;
+
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value,
Context context) throws IOException, InterruptedException {
+ byte[] b = new byte[20];
+ int read;
+ try {
+ switch (count) {
+ case 0:
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), data.get(0));
+ entryEquals(key.get(1), data.get(1));
+ assertEquals(read = value.read(b), 8);
+ assertEquals(new String(b, 0, read), "asdfjkl;");
+ assertEquals(read = value.read(b), -1);
+ break;
+ case 1:
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), data.get(4));
+ entryEquals(key.get(1), data.get(5));
+ assertEquals(read = value.read(b), 10);
+ assertEquals(new String(b, 0, read), "qwertyuiop");
+ assertEquals(read = value.read(b), -1);
+ break;
+ default:
+ assertTrue(false);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ } finally {
+ value.close();
+ }
+ count++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ try {
+ assertEquals(2, count);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ }
+ }
+
+ public static class TestNoClose extends
Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ int count = 0;
+
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value,
Context context) throws IOException, InterruptedException {
+ byte[] b = new byte[5];
+ int read;
+ try {
+ switch (count) {
+ case 0:
+ assertEquals(read = value.read(b), 5);
+ assertEquals(new String(b, 0, read), "asdfj");
+ break;
+ default:
+ assertTrue(false);
+ }
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ count++;
+ try {
+ context.nextKeyValue();
+ assertTrue(false);
+ } catch (IOException ioe) {
+ e3 = ioe;
+ }
+ }
+ }
+
+ public static class TestBadData extends
Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+ @Override
+ protected void map(List<Entry<Key,Value>> key, InputStream value,
Context context) throws IOException, InterruptedException {
+ byte[] b = new byte[20];
+ try {
+ assertEquals(key.size(), 2);
+ entryEquals(key.get(0), baddata.get(0));
+ entryEquals(key.get(1), baddata.get(1));
+ } catch (AssertionError e) {
+ e0 = e;
+ }
+ try {
+ value.read(b);
+ try {
+ assertTrue(false);
+ } catch (AssertionError e) {
+ e1 = e;
+ }
+ } catch (Exception e) {}
+ try {
+ value.close();
+ try {
+ assertTrue(false);
+ } catch (AssertionError e) {
+ e2 = e;
+ }
+ } catch (Exception e) {}
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 5) {
+ throw new IllegalArgumentException("Usage : " +
CIFTester.class.getName() + " <instance name> <user> <pass> <table>
<mapperClass>");
+ }
+
+ String instance = args[0];
+ String user = args[1];
+ String pass = args[2];
+ String table = args[3];
+
+ Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" +
System.currentTimeMillis());
+ job.setJarByClass(this.getClass());
+
+ job.setInputFormatClass(ChunkInputFormat.class);
+
+ ChunkInputFormat.setInputInfo(job.getConfiguration(), user,
pass.getBytes(), table, AUTHS);
+ ChunkInputFormat.setMockInstance(job.getConfiguration(), instance);
+
+ job.setMapperClass((Class<? extends Mapper>) Class.forName(args[4]));
+ job.setMapOutputKeyClass(Key.class);
+ job.setMapOutputValueClass(Value.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static int main(String[] args) throws Exception {
+ return ToolRunner.run(CachedConfiguration.getInstance(), new
CIFTester(), args);
+ }
+ }
+
+ public void test() throws Exception {
MockInstance instance = new MockInstance("instance1");
Connector conn = instance.getConnector("root", "".getBytes());
conn.tableOperations().create("test");
@@ -86,44 +231,12 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = ContextFactory.createJobContext();
- ChunkInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
- ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance1");
- ChunkInputFormat cif = new ChunkInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac =
ContextFactory.createTaskAttemptContext(job.getConfiguration());
- RecordReader<List<Entry<Key,Value>>,InputStream> rr =
cif.createRecordReader(ris, tac);
- rr.initialize(ris, tac);
-
- assertTrue(rr.nextKeyValue());
- List<Entry<Key,Value>> info = rr.getCurrentKey();
- InputStream cis = rr.getCurrentValue();
- byte[] b = new byte[20];
- int read;
- assertEquals(info.size(), 2);
- entryEquals(info.get(0), data.get(0));
- entryEquals(info.get(1), data.get(1));
- assertEquals(read = cis.read(b), 8);
- assertEquals(new String(b, 0, read), "asdfjkl;");
- assertEquals(read = cis.read(b), -1);
- cis.close();
-
- assertTrue(rr.nextKeyValue());
- info = rr.getCurrentKey();
- cis = rr.getCurrentValue();
- assertEquals(info.size(), 2);
- entryEquals(info.get(0), data.get(4));
- entryEquals(info.get(1), data.get(5));
- assertEquals(read = cis.read(b), 10);
- assertEquals(new String(b, 0, read), "qwertyuiop");
- assertEquals(read = cis.read(b), -1);
- cis.close();
-
- assertFalse(rr.nextKeyValue());
+ assertEquals(0, CIFTester.main(new String[] {"instance1", "root", "",
"test", CIFTester.TestMapper.class.getName()}));
+ assertNull(e1);
+ assertNull(e2);
}
- public void testErrorOnNextWithoutClose() throws IOException,
InterruptedException, AccumuloException, AccumuloSecurityException,
TableNotFoundException,
- TableExistsException {
+ public void testErrorOnNextWithoutClose() throws Exception {
MockInstance instance = new MockInstance("instance2");
Connector conn = instance.getConnector("root", "".getBytes());
conn.tableOperations().create("test");
@@ -137,33 +250,13 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = ContextFactory.createJobContext();
- ChunkInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
- ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance2");
- ChunkInputFormat cif = new ChunkInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac =
ContextFactory.createTaskAttemptContext(job.getConfiguration());
- RecordReader<List<Entry<Key,Value>>,InputStream> crr =
cif.createRecordReader(ris, tac);
- crr.initialize(ris, tac);
-
- assertTrue(crr.nextKeyValue());
- InputStream cis = crr.getCurrentValue();
- byte[] b = new byte[5];
- int read;
- assertEquals(read = cis.read(b), 5);
- assertEquals(new String(b, 0, read), "asdfj");
-
- try {
- crr.nextKeyValue();
- assertNotNull(null);
- } catch (Exception e) {
- log.debug("EXCEPTION " + e.getMessage());
- assertNull(null);
- }
+ assertEquals(1, CIFTester.main(new String[] {"instance2", "root", "",
"test", CIFTester.TestNoClose.class.getName()}));
+ assertNull(e1);
+ assertNull(e2);
+ assertNotNull(e3);
}
- public void testInfoWithoutChunks() throws IOException,
InterruptedException, AccumuloException, AccumuloSecurityException,
TableNotFoundException,
- TableExistsException {
+ public void testInfoWithoutChunks() throws Exception {
MockInstance instance = new MockInstance("instance3");
Connector conn = instance.getConnector("root", "".getBytes());
conn.tableOperations().create("test");
@@ -176,35 +269,9 @@ public class ChunkInputFormatTest extend
}
bw.close();
- JobContext job = ContextFactory.createJobContext();
- ChunkInputFormat.setInputInfo(job.getConfiguration(), "root",
"".getBytes(), "test", new Authorizations("A", "B", "C", "D"));
- ChunkInputFormat.setMockInstance(job.getConfiguration(), "instance3");
- ChunkInputFormat cif = new ChunkInputFormat();
- RangeInputSplit ris = new RangeInputSplit();
- TaskAttemptContext tac =
ContextFactory.createTaskAttemptContext(job.getConfiguration());
- RecordReader<List<Entry<Key,Value>>,InputStream> crr =
cif.createRecordReader(ris, tac);
- crr.initialize(ris, tac);
-
- assertTrue(crr.nextKeyValue());
- List<Entry<Key,Value>> info = crr.getCurrentKey();
- InputStream cis = crr.getCurrentValue();
- byte[] b = new byte[20];
- assertEquals(info.size(), 2);
- entryEquals(info.get(0), baddata.get(0));
- entryEquals(info.get(1), baddata.get(1));
- try {
- cis.read(b);
- assertNotNull(null);
- } catch (Exception e) {
- log.debug("EXCEPTION " + e.getMessage());
- assertNull(null);
- }
- try {
- cis.close();
- assertNotNull(null);
- } catch (Exception e) {
- log.debug("EXCEPTION " + e.getMessage());
- assertNull(null);
- }
+ assertEquals(0, CIFTester.main(new String[] {"instance3", "root", "",
"test", CIFTester.TestBadData.class.getName()}));
+ assertNull(e0);
+ assertNull(e1);
+ assertNull(e2);
}
}
Modified: accumulo/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/accumulo/trunk/pom.xml?rev=1433695&r1=1433694&r2=1433695&view=diff
==============================================================================
--- accumulo/trunk/pom.xml (original)
+++ accumulo/trunk/pom.xml Tue Jan 15 22:14:07 2013
@@ -208,6 +208,7 @@
<configuration>
<formats>
<format>xml</format>
+ <format>html</format>
</formats>
</configuration>
</plugin>
@@ -531,6 +532,7 @@
<slf4j.version>1.6.1</slf4j.version>
<hadoop.version>2.0.2-alpha</hadoop.version>
<avro.version>1.5.3</avro.version>
+ <httpclient.version>3.1</httpclient.version>
</properties>
<dependencyManagement>
<dependencies>
@@ -546,6 +548,12 @@
<version>${avro.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>