Author: ctubbsii
Date: Tue Jan 15 23:58:38 2013
New Revision: 1433751
URL: http://svn.apache.org/viewvc?rev=1433751&view=rev
Log:
ACCUMULO-532 Update contrib to reflect changes in ACCUMULO-769 and to use the
released versions of Hama BSP so we can close this ticket
Added:
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
- copied, changed from r1431766,
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
Removed:
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
Modified:
accumulo/contrib/bsp/trunk/pom.xml
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Modified: accumulo/contrib/bsp/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/pom.xml?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
--- accumulo/contrib/bsp/trunk/pom.xml (original)
+++ accumulo/contrib/bsp/trunk/pom.xml Tue Jan 15 23:58:38 2013
@@ -19,12 +19,113 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-bsp</artifactId>
<version>1.5.0-SNAPSHOT</version>
-
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>12</version>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <versionRange>[1.0,)</versionRange>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore />
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>integration-tests</id>
+ <activation>
+ <property>
+ <name>!skipTests</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-integration-test-jar</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <finalName>integration</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verify-integration-tests</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>org.apache.hama</groupId>
<artifactId>hama-core</artifactId>
- <version>0.4.0-incubating</version>
+ <version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
@@ -34,7 +135,13 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
+ <version>[1.0.0,2.0.0)</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
Modified:
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
---
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
(original)
+++
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloInputFormat.java
Tue Jan 15 23:58:38 2013
@@ -27,23 +27,25 @@ import org.apache.hama.bsp.InputFormat;
import org.apache.hama.bsp.InputSplit;
import org.apache.hama.bsp.RecordReader;
+/**
+ * <p>
+ * AccumuloInputFormat class. To be used with Hama BSP.
+ * </p>
+ *
+ * @see BSPJob#setInputFormat(Class)
+ */
public class AccumuloInputFormat extends
org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat implements
InputFormat<Key,Value> {
+
public class BSPRecordReaderBase extends RecordReaderBase<Key,Value>
implements RecordReader<Key,Value> {
public BSPRecordReaderBase(InputSplit split, BSPJob job) throws
IOException {
- this.initialize((BSPRangeInputSplit) split, job.getConf());
+ this.initialize((BSPRangeInputSplit) split,
MapreduceWrapper.wrappedTaskAttemptContext(job));
}
- /*
- * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
- */
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return next(currentKey, currentValue);
}
- /*
- * @see org.apache.hama.bsp.RecordReader#createKey()
- */
@Override
public Key createKey() {
if (currentKey == null) {
@@ -53,9 +55,6 @@ public class AccumuloInputFormat extends
}
}
- /*
- * @see org.apache.hama.bsp.RecordReader#createValue()
- */
@Override
public Value createValue() {
if (currentValue == null) {
@@ -65,17 +64,11 @@ public class AccumuloInputFormat extends
}
}
- /*
- * @see org.apache.hama.bsp.RecordReader#getPos()
- */
@Override
public long getPos() throws IOException {
return 0;
}
- /*
- * @see org.apache.hama.bsp.RecordReader#next(java.lang.Object,
java.lang.Object)
- */
@Override
public boolean next(Key k, Value v) throws IOException {
if (scannerIterator.hasNext()) {
@@ -108,7 +101,7 @@ public class AccumuloInputFormat extends
@Override
public InputSplit[] getSplits(BSPJob job, int arg1) throws IOException {
- List<org.apache.hadoop.mapreduce.InputSplit> splits =
getSplits(job.getConf());
+ List<org.apache.hadoop.mapreduce.InputSplit> splits =
getSplits(MapreduceWrapper.wrappedTaskAttemptContext(job));
InputSplit[] bspSplits = new BSPRangeInputSplit[splits.size()];
for (int i = 0; i < splits.size(); i++) {
bspSplits[i] = new BSPRangeInputSplit((RangeInputSplit) splits.get(i));
Modified:
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
---
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
(original)
+++
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/AccumuloOutputFormat.java
Tue Jan 15 23:58:38 2013
@@ -21,27 +21,34 @@ import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.OutputFormat;
import org.apache.hama.bsp.RecordWriter;
+/**
+ * <p>
+ * AccumuloOutputFormat class. To be used with Hama BSP.
+ * </p>
+ *
+ * @see BSPJob#setOutputFormat(Class)
+ */
public class AccumuloOutputFormat extends
org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat implements
OutputFormat<Text,Mutation> {
protected static class BSPRecordWriter extends AccumuloRecordWriter
implements RecordWriter<Text,Mutation> {
- BSPRecordWriter(Configuration conf) throws AccumuloException,
AccumuloSecurityException, IOException {
- super(conf);
+
+ private BSPJob job;
+
+ BSPRecordWriter(BSPJob job) throws AccumuloException,
AccumuloSecurityException, IOException {
+ super(MapreduceWrapper.wrappedTaskAttemptContext(job));
+ this.job = job;
}
- /*
- * @see org.apache.hama.bsp.RecordWriter#close()
- */
@Override
public void close() throws IOException {
try {
- close(null);
+ close(MapreduceWrapper.wrappedTaskAttemptContext(job));
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -49,21 +56,15 @@ public class AccumuloOutputFormat extend
}
- /*
- * @see
org.apache.hama.bsp.OutputFormat#checkOutputSpecs(org.apache.hadoop.fs.FileSystem,
org.apache.hama.bsp.BSPJob)
- */
@Override
public void checkOutputSpecs(FileSystem fs, BSPJob job) throws IOException {
- checkOutputSpecs(job.getConf());
+ checkOutputSpecs(MapreduceWrapper.wrappedTaskAttemptContext(job));
}
- /*
- * @see
org.apache.hama.bsp.OutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem,
org.apache.hama.bsp.BSPJob, java.lang.String)
- */
@Override
- public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob
job, String arg2) throws IOException {
+ public RecordWriter<Text,Mutation> getRecordWriter(FileSystem fs, BSPJob
job, String name) throws IOException {
try {
- return new BSPRecordWriter(job.getConf());
+ return new BSPRecordWriter(job);
} catch (Exception e) {
throw new IOException(e);
}
Added:
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java?rev=1433751&view=auto
==============================================================================
---
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
(added)
+++
accumulo/contrib/bsp/trunk/src/main/java/org/apache/accumulo/bsp/MapreduceWrapper.java
Tue Jan 15 23:58:38 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.bsp;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hama.bsp.BSPJob;
+
+/**
+ * <p>
+ * MapreduceWrapper class. Provides a wrapper to wrap {@link BSPJob} into the
appropriate Hadoop type required by {@link AccumuloInputFormat} and
+ * {@link AccumuloOutputFormat} static configurator methods. Useful for
reusing code to set the job's configuration and not using the expected Hadoop
API.
+ * </p>
+ */
+public class MapreduceWrapper {
+
+ /**
+ * Wraps a {@link BSPJob} for reading its {@link Configuration} within
Accumulo MapReduce classes' protected static configuration getters.
+ *
+ * @param job
+ * the {@link BSPJob} instance to be wrapped
+ * @return an instance of {@link TaskAttemptContext} whose {@link
Configuration} is the same as the job
+ */
+ public static TaskAttemptContext wrappedTaskAttemptContext(final BSPJob job)
{
+ return new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+ }
+
+ /**
+ * Wraps a {@link BSPJob} for writing its {@link Configuration} within
Accumulo MapReduce classes' public static configuration setters.
+ *
+ * @param job
+ * the {@link BSPJob} instance to be wrapped
+ * @return an instance of {@link Job} that exposes {@link
BSPJob#getConfiguration()} via {@link Job#getConfiguration()}; no other methods
of {@link Job} are
+ * implemented, so this object cannot be used for anything other
than editing the {@link BSPJob}'s {@link Configuration}
+ */
+ public static Job wrappedJob(BSPJob job) {
+ final BSPJob bspJob = job;
+ try {
+ return new Job() {
+ @Override
+ public Configuration getConfiguration() {
+ return bspJob.getConfiguration();
+ }
+ };
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added:
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java?rev=1433751&view=auto
==============================================================================
---
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
(added)
+++
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatIT.java
Tue Jan 15 23:58:38 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.accumulo.bsp.AccumuloInputFormat;
+import org.apache.accumulo.bsp.MapreduceWrapper;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.InputSplit;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class AccumuloInputFormatIT {
+
+ static class InputFormatTestBSP<M extends Writable> extends
BSP<Key,Value,Key,Value,M> {
+ Key key = null;
+ int count = 0;
+
+ @Override
+ public void bsp(BSPPeer<Key,Value,Key,Value,M> peer) throws IOException,
SyncException, InterruptedException {
+ // this method reads the next key value record from file
+ KeyValuePair<Key,Value> pair;
+
+ while ((pair = peer.readNext()) != null) {
+ if (key != null) {
+ assertEquals(key.getRow().toString(), new
String(pair.getValue().get()));
+ }
+
+ assertEquals(pair.getKey().getRow(), new Text(String.format("%09x",
count + 1)));
+ assertEquals(new String(pair.getValue().get()), String.format("%09x",
count));
+ count++;
+
+ key = new Key(pair.getKey());
+ }
+
+ peer.sync();
+ assertEquals(100, count);
+ }
+ }
+
+ @Test
+ public void testBSPInputFormat() throws Exception {
+ MockInstance mockInstance = new MockInstance("testmapinstance");
+ Connector c = mockInstance.getConnector("root", new byte[] {});
+ if (c.tableOperations().exists("testtable"))
+ c.tableOperations().delete("testtable");
+ c.tableOperations().create("testtable");
+
+ BatchWriter bw = c.createBatchWriter("testtable", new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+ m.put(new Text(), new Text(), new Value(String.format("%09x",
i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ BSPJob bspJob = new BSPJob();
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
+
+ bspJob.setInputFormat(AccumuloInputFormat.class);
+ bspJob.setBspClass(InputFormatTestBSP.class);
+ bspJob.setInputPath(new Path("test"));
+
+ AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable",
new Authorizations());
+ AccumuloInputFormat.setMockInstance(job, "testmapinstance");
+
+ AccumuloInputFormat input = new AccumuloInputFormat();
+ InputSplit[] splits = input.getSplits(bspJob, 0);
+ assertEquals(splits.length, 1);
+
+ bspJob.setJar("target/integration-tests.jar");
+ bspJob.setOutputPath(new Path("target/bsp-inputformat-test"));
+ if (!bspJob.waitForCompletion(false))
+ fail("Job not finished successfully");
+ }
+
+}
Modified:
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1433751&r1=1433750&r2=1433751&view=diff
==============================================================================
---
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
(original)
+++
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Tue Jan 15 23:58:38 2013
@@ -21,243 +21,175 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.accumulo.bsp.AccumuloInputFormat;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.bsp.MapreduceWrapper;
import org.apache.accumulo.core.client.IteratorSetting;
-import
org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
-import
org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.BSP;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.InputSplit;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.util.KeyValuePair;
-import org.junit.After;
import org.junit.Test;
+/**
+ *
+ */
public class AccumuloInputFormatTest {
- @After
- public void tearDown() throws Exception {}
-
- /**
- * Test basic setting & getting of max versions.
- *
- * @throws IOException
- * Signals that an I/O exception has occurred.
- */
- @Test
- public void testMaxVersions() throws IOException {
- BSPJob job = new BSPJob();
- AccumuloInputFormat.setMaxVersions(job.getConf(), 1);
- int version = AccumuloInputFormat.getMaxVersions(job.getConf());
- assertEquals(1, version);
- }
-
- @Test(expected = IOException.class)
- public void testMaxVersionsLessThan1() throws IOException {
- BSPJob job = new BSPJob();
- AccumuloInputFormat.setMaxVersions(job.getConf(), 0);
- }
-
- @Test
- public void testNoMaxVersion() throws IOException {
- BSPJob job = new BSPJob();
- assertEquals(-1, AccumuloInputFormat.getMaxVersions(job.getConf()));
- }
-
@Test
public void testSetIterator() throws IOException {
- BSPJob job = new BSPJob();
+ BSPJob bspJob = new BSPJob();
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1,
"WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
- Configuration conf = job.getConf();
- String iterators = conf.get("AccumuloInputFormat.iterators");
-
assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow",
iterators);
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow",
"org.apache.accumulo.core.iterators.WholeRowIterator"));
+
+ TaskAttemptContext context =
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ List<IteratorSetting> iterators =
AccumuloInputFormat.getIterators(context);
+ assertEquals(1, iterators.size());
+ IteratorSetting iter = iterators.get(0);
+ assertEquals(1, iter.getPriority());
+ assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator",
iter.getIteratorClass());
+ assertEquals("WholeRow", iter.getName());
+ assertEquals(0, iter.getOptions().size());
}
@Test
public void testAddIterator() throws IOException {
- BSPJob job = new BSPJob();
+ BSPJob bspJob = new BSPJob();
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1,
"WholeRow", WholeRowIterator.class));
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2,
"Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow",
WholeRowIterator.class));
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions",
"org.apache.accumulo.core.iterators.VersioningIterator"));
IteratorSetting iter = new IteratorSetting(3, "Count",
"org.apache.accumulo.core.iterators.CountingIterator");
iter.addOption("v1", "1");
iter.addOption("junk", "\0omg:!\\xyzzy");
- AccumuloInputFormat.addIterator(job.getConf(), iter);
+ AccumuloInputFormat.addIterator(job, iter);
- List<AccumuloIterator> list =
AccumuloInputFormat.getIterators(job.getConf());
+ TaskAttemptContext context =
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ List<IteratorSetting> list = AccumuloInputFormat.getIterators(context);
// Check the list size
assertTrue(list.size() == 3);
// Walk the list and make sure our settings are correct
- AccumuloIterator setting = list.get(0);
+ IteratorSetting setting = list.get(0);
assertEquals(1, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator",
setting.getIteratorClass());
- assertEquals("WholeRow", setting.getIteratorName());
+ assertEquals("WholeRow", setting.getName());
setting = list.get(1);
assertEquals(2, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.VersioningIterator",
setting.getIteratorClass());
- assertEquals("Versions", setting.getIteratorName());
+ assertEquals("Versions", setting.getName());
setting = list.get(2);
assertEquals(3, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.CountingIterator",
setting.getIteratorClass());
- assertEquals("Count", setting.getIteratorName());
+ assertEquals("Count", setting.getName());
- List<AccumuloIteratorOption> iteratorOptions =
AccumuloInputFormat.getIteratorOptions(job.getConf());
+ Map<String,String> iteratorOptions = setting.getOptions();
assertEquals(2, iteratorOptions.size());
- assertEquals("Count", iteratorOptions.get(0).getIteratorName());
- assertEquals("Count", iteratorOptions.get(1).getIteratorName());
- assertEquals("v1", iteratorOptions.get(0).getKey());
- assertEquals("1", iteratorOptions.get(0).getValue());
- assertEquals("junk", iteratorOptions.get(1).getKey());
- assertEquals("\0omg:!\\xyzzy", iteratorOptions.get(1).getValue());
+ assertTrue(iteratorOptions.containsKey("v1"));
+ assertEquals("1", iteratorOptions.get("v1"));
+ assertTrue(iteratorOptions.containsKey("junk"));
+ assertEquals("\0omg:!\\xyzzy", iteratorOptions.get("junk"));
}
@Test
- public void testIteratorOptionEncoding() throws Throwable {
+ public void testIteratorOptionEncoding() throws IOException {
+ BSPJob bspJob = new BSPJob();
String key = "colon:delimited:key";
String value = "comma,delimited,value";
+
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
IteratorSetting someSetting = new IteratorSetting(1, "iterator",
"Iterator.class");
someSetting.addOption(key, value);
- BSPJob job = new BSPJob();
- AccumuloInputFormat.addIterator(job.getConf(), someSetting);
+ AccumuloInputFormat.addIterator(job, someSetting);
- final String rawConfigOpt = new AccumuloIteratorOption("iterator", key,
value).toString();
-
- assertEquals(rawConfigOpt,
job.getConf().get("AccumuloInputFormat.iterators.options"));
-
- List<AccumuloIteratorOption> opts =
AccumuloInputFormat.getIteratorOptions(job.getConf());
+ TaskAttemptContext context =
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ List<IteratorSetting> iters = AccumuloInputFormat.getIterators(context);
+ assertEquals(1, iters.size());
+ assertEquals("iterator", iters.get(0).getName());
+ assertEquals("Iterator.class", iters.get(0).getIteratorClass());
+ assertEquals(1, iters.get(0).getPriority());
+ Map<String,String> opts = iters.get(0).getOptions();
assertEquals(1, opts.size());
- assertEquals(opts.get(0).getKey(), key);
- assertEquals(opts.get(0).getValue(), value);
+ assertTrue(opts.containsKey(key));
+ assertEquals(value, opts.get(key));
someSetting.addOption(key + "2", value);
someSetting.setPriority(2);
someSetting.setName("it2");
- AccumuloInputFormat.addIterator(job.getConf(), someSetting);
- opts = AccumuloInputFormat.getIteratorOptions(job.getConf());
- assertEquals(3, opts.size());
- for (AccumuloIteratorOption opt : opts) {
- assertEquals(opt.getKey().substring(0, key.length()), key);
- assertEquals(opt.getValue(), value);
- }
+ AccumuloInputFormat.addIterator(job, someSetting);
+
+ context = MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ iters = AccumuloInputFormat.getIterators(context);
+ assertEquals(2, iters.size());
+ assertEquals("iterator", iters.get(0).getName());
+ assertEquals("Iterator.class", iters.get(0).getIteratorClass());
+ assertEquals(1, iters.get(0).getPriority());
+ opts = iters.get(0).getOptions();
+ assertEquals(1, opts.size());
+ assertTrue(opts.containsKey(key));
+ assertEquals(value, opts.get(key));
+ assertEquals("it2", iters.get(1).getName());
+ assertEquals("Iterator.class", iters.get(1).getIteratorClass());
+ assertEquals(2, iters.get(1).getPriority());
+ opts = iters.get(1).getOptions();
+ assertEquals(2, opts.size());
+ assertTrue(opts.containsKey(key));
+ assertEquals(value, opts.get(key));
+ assertTrue(opts.containsKey(key + "2"));
+ assertEquals(value, opts.get(key + "2"));
}
@Test
public void testGetIteratorSettings() throws IOException {
- BSPJob job = new BSPJob();
+ BSPJob bspJob = new BSPJob();
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(1,
"WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(2,
"Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
- AccumuloInputFormat.addIterator(job.getConf(), new IteratorSetting(3,
"Count", "org.apache.accumulo.core.iterators.CountingIterator"));
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow",
"org.apache.accumulo.core.iterators.WholeRowIterator"));
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions",
"org.apache.accumulo.core.iterators.VersioningIterator"));
+ AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count",
"org.apache.accumulo.core.iterators.CountingIterator"));
- List<AccumuloIterator> list =
AccumuloInputFormat.getIterators(job.getConf());
+ TaskAttemptContext context =
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ List<IteratorSetting> list = AccumuloInputFormat.getIterators(context);
// Check the list size
- assertTrue(list.size() == 3);
+ assertEquals(3, list.size());
// Walk the list and make sure our settings are correct
- AccumuloIterator setting = list.get(0);
+ IteratorSetting setting = list.get(0);
assertEquals(1, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator",
setting.getIteratorClass());
- assertEquals("WholeRow", setting.getIteratorName());
+ assertEquals("WholeRow", setting.getName());
setting = list.get(1);
assertEquals(2, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.VersioningIterator",
setting.getIteratorClass());
- assertEquals("Versions", setting.getIteratorName());
+ assertEquals("Versions", setting.getName());
setting = list.get(2);
assertEquals(3, setting.getPriority());
assertEquals("org.apache.accumulo.core.iterators.CountingIterator",
setting.getIteratorClass());
- assertEquals("Count", setting.getIteratorName());
-
+ assertEquals("Count", setting.getName());
}
@Test
public void testSetRegex() throws IOException {
- BSPJob job = new BSPJob();
+ BSPJob bspJob = new BSPJob();
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
String regex = ">\"*%<>\'\\";
IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
RegExFilter.setRegexs(is, regex, null, null, null, false);
- AccumuloInputFormat.addIterator(job.getConf(), is);
+ AccumuloInputFormat.addIterator(job, is);
-
assertTrue(regex.equals(AccumuloInputFormat.getIterators(job.getConf()).get(0).getIteratorName()));
+ TaskAttemptContext context =
MapreduceWrapper.wrappedTaskAttemptContext(bspJob);
+ assertEquals(regex,
AccumuloInputFormat.getIterators(context).get(0).getName());
}
- static class TestBSP extends BSP<Key,Value,Key,Value> {
- Key key = null;
- int count = 0;
-
- @Override
- public void bsp(BSPPeer<Key,Value,Key,Value> peer) throws IOException,
SyncException, InterruptedException {
- // this method reads the next key value record from file
- KeyValuePair<Key,Value> pair;
-
- while ((pair = peer.readNext()) != null) {
- if (key != null) {
- assertEquals(key.getRow().toString(), new
String(pair.getValue().get()));
- }
-
- assertEquals(pair.getKey().getRow(), new Text(String.format("%09x",
count + 1)));
- assertEquals(new String(pair.getValue().get()), String.format("%09x",
count));
- count++;
-
- key = new Key(pair.getKey());
- }
-
- peer.sync();
- assertEquals(100, count);
- }
- }
-
- @Test
- public void testBsp() throws Exception {
- MockInstance mockInstance = new MockInstance("testmapinstance");
- Connector c = mockInstance.getConnector("root", new byte[] {});
- if (c.tableOperations().exists("testtable"))
- c.tableOperations().delete("testtable");
- c.tableOperations().create("testtable");
-
- BatchWriter bw = c.createBatchWriter("testtable", 10000L, 1000L, 4);
- for (int i = 0; i < 100; i++) {
- Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
- m.put(new Text(), new Text(), new Value(String.format("%09x",
i).getBytes()));
- bw.addMutation(m);
- }
- bw.close();
-
- BSPJob job = new BSPJob(new HamaConfiguration());
- job.setInputFormat(AccumuloInputFormat.class);
- job.setBspClass(TestBSP.class);
- job.setInputPath(new Path("test"));
- AccumuloInputFormat.setInputInfo(job.getConf(), "root", "".getBytes(),
"testtable", new Authorizations());
- AccumuloInputFormat.setMockInstance(job.getConf(), "testmapinstance");
-
- AccumuloInputFormat input = new AccumuloInputFormat();
- InputSplit[] splits = input.getSplits(job, 0);
- assertEquals(splits.length, 1);
-
- job.waitForCompletion(false);
- }
}
Copied:
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
(from r1431766,
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java)
URL:
http://svn.apache.org/viewvc/accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java?p2=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java&p1=accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java&r1=1431766&r2=1433751&rev=1433751&view=diff
==============================================================================
---
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
(original)
+++
accumulo/contrib/bsp/trunk/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatIT.java
Tue Jan 15 23:58:38 2013
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Iterator;
@@ -26,7 +27,9 @@ import java.util.Map.Entry;
import org.apache.accumulo.bsp.AccumuloInputFormat;
import org.apache.accumulo.bsp.AccumuloOutputFormat;
+import org.apache.accumulo.bsp.MapreduceWrapper;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mock.MockInstance;
@@ -37,6 +40,8 @@ import org.apache.accumulo.core.security
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
@@ -46,14 +51,17 @@ import org.apache.hama.bsp.sync.SyncExce
import org.apache.hama.util.KeyValuePair;
import org.junit.Test;
-public class AccumuloOutputFormatTest {
+/**
+ *
+ */
+public class AccumuloOutputFormatIT {
- static class TestBSP extends BSP<Key,Value,Text,Mutation> {
+ static class OutputFormatTestBSP<M extends Writable> extends
BSP<Key,Value,Text,Mutation,M> {
Key key = null;
int count = 0;
@Override
- public void bsp(BSPPeer<Key,Value,Text,Mutation> peer) throws IOException,
SyncException, InterruptedException {
+ public void bsp(BSPPeer<Key,Value,Text,Mutation,M> peer) throws
IOException, SyncException, InterruptedException {
// this method reads the next key value record from file
KeyValuePair<Key,Value> pair;
@@ -73,7 +81,7 @@ public class AccumuloOutputFormatTest {
}
@Override
- public void cleanup(BSPPeer<Key,Value,Text,Mutation> peer) throws
IOException {
+ public void cleanup(BSPPeer<Key,Value,Text,Mutation,M> peer) throws
IOException {
Mutation m = new Mutation("total");
m.put("", "", Integer.toString(count));
peer.write(new Text("testtable2"), m);
@@ -81,7 +89,7 @@ public class AccumuloOutputFormatTest {
}
@Test
- public void testBSP() throws Exception {
+ public void testBSPOutputFormat() throws Exception {
MockInstance mockInstance = new MockInstance("testmrinstance");
Connector c = mockInstance.getConnector("root", new byte[] {});
if (c.tableOperations().exists("testtable1"))
@@ -91,7 +99,7 @@ public class AccumuloOutputFormatTest {
c.tableOperations().create("testtable1");
c.tableOperations().create("testtable2");
- BatchWriter bw = c.createBatchWriter("testtable1", 10000L, 1000L, 4);
+ BatchWriter bw = c.createBatchWriter("testtable1", new
BatchWriterConfig());
for (int i = 0; i < 100; i++) {
Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
m.put(new Text(), new Text(), new Value(String.format("%09x",
i).getBytes()));
@@ -100,29 +108,33 @@ public class AccumuloOutputFormatTest {
bw.close();
Configuration conf = new Configuration();
- BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
- bsp.setJobName("Test Input Output");
+ BSPJob bspJob = new BSPJob(new HamaConfiguration(conf));
+ bspJob.setJobName("Test Input Output");
- bsp.setBspClass(TestBSP.class);
- bsp.setInputFormat(AccumuloInputFormat.class);
- bsp.setInputPath(new Path("test"));
-
- bsp.setOutputFormat(AccumuloOutputFormat.class);
- bsp.setOutputPath(new Path("test"));
-
- bsp.setOutputKeyClass(Text.class);
- bsp.setOutputValueClass(Mutation.class);
-
- AccumuloInputFormat.setInputInfo(bsp.getConf(), "root", "".getBytes(),
"testtable1", new Authorizations());
- AccumuloInputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
- AccumuloOutputFormat.setOutputInfo(bsp.getConf(), "root", "".getBytes(),
false, "testtable2");
- AccumuloOutputFormat.setMockInstance(bsp.getConf(), "testmrinstance");
+ bspJob.setBspClass(OutputFormatTestBSP.class);
+ bspJob.setInputFormat(AccumuloInputFormat.class);
+ bspJob.setInputPath(new Path("test"));
+
+ bspJob.setOutputFormat(AccumuloOutputFormat.class);
+ bspJob.setJar("target/integration-tests.jar");
+ bspJob.setOutputPath(new Path("target/bsp-outputformat-test"));
+
+ bspJob.setOutputKeyClass(Text.class);
+ bspJob.setOutputValueClass(Mutation.class);
+
+ Job job = MapreduceWrapper.wrappedJob(bspJob);
+
+ AccumuloInputFormat.setInputInfo(job, "root", "".getBytes(), "testtable1",
new Authorizations());
+ AccumuloInputFormat.setMockInstance(job, "testmrinstance");
+ AccumuloOutputFormat.setOutputInfo(job, "root", "".getBytes(), false,
"testtable2");
+ AccumuloOutputFormat.setMockInstance(job, "testmrinstance");
AccumuloInputFormat input = new AccumuloInputFormat();
- InputSplit[] splits = input.getSplits(bsp, 0);
+ InputSplit[] splits = input.getSplits(bspJob, 0);
assertEquals(splits.length, 1);
- bsp.waitForCompletion(false);
+ if (!bspJob.waitForCompletion(false))
+ fail("Job not finished successfully");
Scanner scanner = c.createScanner("testtable2", new Authorizations());
Iterator<Entry<Key,Value>> iter = scanner.iterator();
@@ -131,6 +143,6 @@ public class AccumuloOutputFormatTest {
assertEquals("total", entry.getKey().getRow().toString());
assertEquals(100, Integer.parseInt(new String(entry.getValue().get())));
assertFalse(iter.hasNext());
-
}
+
}