Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Fri Dec 7 14:01:32 2007 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.join; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This class provides an implementation of ResetableIterator. This + * implementation uses a byte array to store elements added to it. + */ +public class StreamBackedIterator<X extends Writable> + implements ResetableIterator<X> { + + private static class ReplayableByteInputStream extends ByteArrayInputStream { + public ReplayableByteInputStream(byte[] arr) { + super(arr); + } + public void resetStream() { + mark = 0; + reset(); + } + } + + private ByteArrayOutputStream outbuf = new ByteArrayOutputStream(); + private DataOutputStream outfbuf = new DataOutputStream(outbuf); + private ReplayableByteInputStream inbuf; + private DataInputStream infbuf; + + public StreamBackedIterator() { } + + public boolean hasNext() { + return infbuf != null && inbuf.available() > 0; + } + + public boolean next(X val) throws IOException { + if (hasNext()) { + inbuf.mark(0); + val.readFields(infbuf); + return true; + } + return false; + } + + public void replay(X val) throws IOException { + inbuf.reset(); + val.readFields(infbuf); + } + + public void reset() { + if (null != outfbuf) { + inbuf = new ReplayableByteInputStream(outbuf.toByteArray()); + infbuf = new DataInputStream(inbuf); + outfbuf = null; + } + inbuf.resetStream(); + } + + public void add(X item) throws IOException { + item.write(outfbuf); + } + + public void close() throws IOException { + if (null != infbuf) + infbuf.close(); + if (null != outfbuf) + outfbuf.close(); + } + + public void clear() { + if (null != inbuf) + inbuf.resetStream(); + outbuf.reset(); + outfbuf = new DataOutputStream(outbuf); + } +}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Fri Dec 7 14:01:32 2007 @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.join; + +import java.io.DataOutput; +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * Writable type storing multiple [EMAIL PROTECTED] org.apache.hadoop.io.Writable}s. + */ +public class TupleWritable implements Writable, Iterable<Writable> { + + private long written; + private Writable[] values; + + /** + * Create an empty tuple with no allocated storage for writables. + */ + public TupleWritable() { } + + /** + * Initialize tuple with storage; unknown whether any of them contain + * "written" values. + */ + public TupleWritable(Writable[] vals) { + written = 0L; + values = vals; + } + + /** + * Return true if tuple has an element at the position provided. + */ + public boolean has(int i) { + return 0 != ((1 << i) & written); + } + + /** + * Get ith Writable from Tuple. + */ + public Writable get(int i) { + return values[i]; + } + + /** + * The number of children in this Tuple. + */ + public int size() { + return values.length; + } + + /** + * [EMAIL PROTECTED] + */ + public boolean equals(Object other) { + if (other instanceof TupleWritable) { + TupleWritable that = (TupleWritable)other; + if (this.size() != that.size() || this.mask() != that.mask()) { + return false; + } + for (int i = 0; i < values.length; ++i) { + if (!has(i)) continue; + if (!values[i].equals(that.get(i))) { + return false; + } + } + return true; + } + return false; + } + + public int hashCode() { + assert false : "hashCode not designed"; + return (int)written; + } + + /** + * Return an iterator over the elements in this tuple. + * Note that this doesn't flatten the tuple; one may receive tuples + * from this iterator. + */ + public Iterator<Writable> iterator() { + final TupleWritable t = this; + return new Iterator<Writable>() { + long i = written; + long last = 0L; + public boolean hasNext() { + return 0L != i; + } + public Writable next() { + last = Long.lowestOneBit(i); + if (0 == last) + throw new NoSuchElementException(); + i ^= last; + // numberOfTrailingZeros rtn 64 if lsb set + return t.get(Long.numberOfTrailingZeros(last) % 64); + } + public void remove() { + t.written ^= last; + if (t.has(Long.numberOfTrailingZeros(last))) { + throw new IllegalStateException("Attempt to remove non-existent val"); + } + } + }; + } + + /** + * Convert Tuple to String as in the following. + * <tt>[<child1>,<child2>,...,<childn>]</tt> + */ + public String toString() { + StringBuffer buf = new StringBuffer("["); + for (int i = 0; i < values.length; ++i) { + buf.append(has(i) ? values[i].toString() : ""); + buf.append(","); + } + if (values.length != 0) + buf.setCharAt(buf.length() - 1, ']'); + else + buf.append(']'); + return buf.toString(); + } + + // Writable + + /** Writes each Writable to <code>out</code>. + * TupleWritable format: + * [EMAIL PROTECTED] + * <count><type1><type2>...<typen><obj1><obj2>...<objn> + * } + */ + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, values.length); + WritableUtils.writeVLong(out, written); + for (int i = 0; i < values.length; ++i) { + Text.writeString(out, values[i].getClass().getName()); + } + for (int i = 0; i < values.length; ++i) { + if (has(i)) { + values[i].write(out); + } + } + } + + /** + * [EMAIL PROTECTED] + */ + @SuppressWarnings("unchecked") // No static typeinfo on Tuples + public void readFields(DataInput in) throws IOException { + int card = WritableUtils.readVInt(in); + values = new Writable[card]; + written = WritableUtils.readVLong(in); + Class<? extends Writable>[] cls = new Class[card]; + try { + for (int i = 0; i < card; ++i) { + cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class); + } + for (int i = 0; i < card; ++i) { + values[i] = cls[i].newInstance(); + if (has(i)) { + values[i].readFields(in); + } + } + } catch (ClassNotFoundException e) { + throw (IOException)new IOException("Failed tuple init").initCause(e); + } catch (IllegalAccessException e) { + throw (IOException)new IOException("Failed tuple init").initCause(e); + } catch (InstantiationException e) { + throw (IOException)new IOException("Failed tuple init").initCause(e); + } + } + + /** + * Record that the tuple contains an element at the position provided. + */ + void setWritten(int i) { + written |= 1 << i; + } + + /** + * Record that the tuple does not contain an element at the position + * provided. + */ + void clearWritten(int i) { + written &= -1 ^ (1 << i); + } + + /** + * Clear any record of which writables have been written to, without + * releasing storage. + */ + void clearWritten() { + written = 0L; + } + + /** + * Return a bitmap recording which of the writables that have been + * written to. + */ + long mask() { + return written; + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Fri Dec 7 14:01:32 2007 @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.join; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.RecordReader; + +/** + * Proxy class for a RecordReader participating in the join framework. + * This class keeps track of the "head" key-value pair for the + * provided RecordReader and keeps a store of values matching a key when + * this source is participating in a join. + */ +class WrappedRecordReader<K extends WritableComparable, + U extends Writable> + implements ComposableRecordReader<K,U> { + + private boolean empty = false; + private RecordReader<K,U> rr; + private int id; // index at which values will be inserted in collector + + private K khead; // key at the top of this RR + private U vhead; // value assoc with khead + private WritableComparator cmp; + + private ResetableIterator<U> vjoin; + + /** + * For a given RecordReader rr, occupy position id in collector. + */ + WrappedRecordReader(int id, RecordReader<K,U> rr, + Class<? extends WritableComparator> cmpcl) throws IOException { + this.id = id; + this.rr = rr; + khead = rr.createKey(); + vhead = rr.createValue(); + try { + cmp = (null == cmpcl) + ? WritableComparator.get(khead.getClass()) + : cmpcl.newInstance(); + } catch (InstantiationException e) { + throw (IOException)new IOException().initCause(e); + } catch (IllegalAccessException e) { + throw (IOException)new IOException().initCause(e); + } + vjoin = new StreamBackedIterator<U>(); + next(); + } + + /** [EMAIL PROTECTED] */ + public int id() { + return id; + } + + /** + * Return the key at the head of this RR. + */ + public K key() { + return khead; + } + + /** + * Clone the key at the head of this RR into the object supplied. + */ + public void key(K qkey) throws IOException { + WritableUtils.cloneInto(qkey, khead); + } + + /** + * Return true if the RR- including the k,v pair stored in this object- + * is exhausted. + */ + public boolean hasNext() { + return !empty; + } + + /** + * Skip key-value pairs with keys less than or equal to the key provided. + */ + public void skip(K key) throws IOException { + if (hasNext()) { + while (cmp.compare(khead, key) <= 0 && next()); + } + } + + /** + * Read the next k,v pair into the head of this object; return true iff + * the RR and this are exhausted. + */ + protected boolean next() throws IOException { + empty = !rr.next(khead, vhead); + return hasNext(); + } + + /** + * Add an iterator to the collector at the position occupied by this + * RecordReader over the values in this stream paired with the key + * provided (ie register a stream of values from this source matching K + * with a collector). + */ + // JoinCollector comes from parent, which has + @SuppressWarnings("unchecked") // no static type for the slot this sits in + public void accept(CompositeRecordReader.JoinCollector i, K key) + throws IOException { + vjoin.clear(); + if (0 == cmp.compare(key, khead)) { + do { + vjoin.add(vhead); + } while (next() && 0 == cmp.compare(key, khead)); + } + i.add(id, vjoin); + } + + /** + * Write key-value pair at the head of this stream to the objects provided; + * get next key-value pair from proxied RR. + */ + public boolean next(K key, U value) throws IOException { + if (hasNext()) { + WritableUtils.cloneInto(key, khead); + WritableUtils.cloneInto(value, vhead); + next(); + return true; + } + return false; + } + + /** + * Request new key from proxied RR. + */ + public K createKey() { + return rr.createKey(); + } + + /** + * Request new value from proxied RR. + */ + public U createValue() { + return rr.createValue(); + } + + /** + * Request progress from proxied RR. + */ + public float getProgress() throws IOException { + return rr.getProgress(); + } + + /** + * Request position from proxied RR. + */ + public long getPos() throws IOException { + return rr.getPos(); + } + + /** + * Forward close request to proxied RR. + */ + public void close() throws IOException { + rr.close(); + } + + /** + * Implement Comparable contract (compare key at head of proxied RR + * with that of another). + */ + public int compareTo(ComposableRecordReader<K,?> other) { + return cmp.compare(key(), other.key()); + } + + /** + * Return true iff compareTo(other) retn true. + */ + @SuppressWarnings("unchecked") // Explicit type check prior to cast + public boolean equals(Object other) { + return other instanceof ComposableRecordReader + && 0 == compareTo((ComposableRecordReader)other); + } + + public int hashCode() { + assert false : "hashCode not designed"; + return 42; + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/join/package.html Fri Dec 7 14:01:32 2007 @@ -0,0 +1,88 @@ +<HTML> + +<BODY> + +<p>Given a set of sorted datasets keyed with the same class and yielding equal +partitions, it is possible to effect a join of those datasets prior to the map. +This could save costs in re-partitioning, sorting, shuffling, and writing out +data required in the general case.</p> + +<h3><a name="Interface"></a>Interface</h3> + +<p>The attached code offers the following interface to users of these +classes.</p> + +<table> +<tr><th>property</th><th>required</th><th>value</th></tr> +<tr><td>mapred.join.expr</td><td>yes</td> + <td>Join expression to effect over input data</td></tr> +<tr><td>mapred.join.keycomparator</td><td>no</td> + <td><tt>WritableComparator</tt> class to use for comparing keys</td></tr> +<tr><td>mapred.join.define.<ident></td><td>no</td> + <td>Class mapped to identifier in join expression</td></tr> +</table> + +<p>The join expression understands the following grammar:</p> + +<pre>func ::= <ident>([<func>,]*<func>) +func ::= tbl(<class>,"<path>"); + +</pre> + +<p>Operations included in this patch are partitioned into one of two types: +join operations emitting tuples and "multi-filter" operations emitting a +single value from (but not necessarily included in) a set of input values. +For a given key, each operation will consider the cross product of all +values for all sources at that node.</p> + +<p>Identifiers supported by default:</p> + +<table> +<tr><th>identifier</th><th>type</th><th>description</th></tr> +<tr><td>inner</td><td>Join</td><td>Full inner join</td></tr> +<tr><td>outer</td><td>Join</td><td>Full outer join</td></tr> +<tr><td>override</td><td>MultiFilter</td> + <td>For a given key, prefer values from the rightmost source</td></tr> +</table> + +<p>A user of this class must set the <tt>InputFormat</tt> for the job to +<tt>CompositeInputFormat</tt> and define a join expression accepted by the +preceding grammar. For example, both of the following are acceptable:</p> + +<pre>inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, + "hdfs://host:8020/foo/bar"), + tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, + "hdfs://host:8020/foo/baz")) + +outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, + "hdfs://host:8020/foo/bar"), + tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class, + "hdfs://host:8020/foo/baz")), + tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class, + "hdfs://host:8020/foo/rab")) +</pre> + +<p><tt>CompositeInputFormat</tt> includes a handful of convenience methods to +aid construction of these verbose statements.</p> + +<p>As in the second example, joins may be nested. Users may provide a +comparator class in the <tt>mapred.join.keycomparator</tt> property to specify +the ordering of their keys, or accept the default comparator as returned by +<tt>WritableComparator.get(keyclass)</tt>.</p> + +<p>Users can specify their own join operations, typically by overriding +<tt>JoinRecordReader</tt> or <tt>MultiFilterRecordReader</tt> and mapping that +class to an identifier in the join expression using the +<tt>mapred.join.define.<em>ident</em></tt> property, where <em>ident</em> is +the identifier appearing in the join expression. Users may elect to emit- or +modify- values passing through their join operation. Consulting the existing +operations for guidance is recommended. Adding arguments is considerably more +complex (and only partially supported), as one must also add a <tt>Node</tt> +type to the parse tree. One is probably better off extending +<tt>RecordReader</tt> in most cases.</p> + +<a href="http://issues.apache.org/jira/browse/HADOOP-2085">JIRA</a> + +</BODY> + +</HTML> Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java Fri Dec 7 14:01:32 2007 @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.join; + +import java.io.IOException; +import java.util.Iterator; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; +import junit.extensions.TestSetup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +public class TestDatamerge extends TestCase { + + private static MiniDFSCluster cluster = null; + public static Test suite() { + TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) { + protected void setUp() throws Exception { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster(conf, 2, true, null); + } + protected void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + }; + return setup; + } + + private static SequenceFile.Writer[] createWriters(Path testdir, + Configuration conf, int srcs, Path[] src) throws IOException { + for (int i = 0; i < srcs; ++i) { + src[i] = new Path(testdir, Integer.toString(i + 10, 36)); + } + SequenceFile.Writer out[] = new SequenceFile.Writer[srcs]; + for (int i = 0; i < srcs; ++i) { + out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf, + src[i], IntWritable.class, IntWritable.class); + } + return out; + } + + private static Path[] writeSimpleSrc(Path testdir, Configuration conf, + int srcs) throws IOException { + SequenceFile.Writer out[] = null; + Path[] src = new Path[srcs]; + try { + out = createWriters(testdir, conf, srcs, src); + final int capacity = srcs * 2 + 1; + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + for (int k = 0; k < capacity; ++k) { + for (int i = 0; i < srcs; ++i) { + key.set(k % srcs == 0 ? k * srcs : k * srcs + i); + val.set(10 * k + i); + out[i].append(key, val); + if (i == k) { + // add duplicate key + out[i].append(key, val); + } + } + } + } finally { + if (out != null) { + for (int i = 0; i < srcs; ++i) { + if (out[i] != null) + out[i].close(); + } + } + } + return src; + } + + private static String stringify(IntWritable key, Writable val) { + StringBuilder sb = new StringBuilder(); + sb.append("(" + key); + sb.append("," + val + ")"); + return sb.toString(); + } + + private static abstract class SimpleCheckerBase<V extends Writable> + implements Mapper<IntWritable, V, IntWritable, IntWritable>, + Reducer<IntWritable, IntWritable, Text, Text> { + protected final static IntWritable one = new IntWritable(1); + int srcs; + public void close() { } + public void configure(JobConf job) { + srcs = job.getInt("testdatamerge.sources", 0); + assertTrue("Invalid src count: " + srcs, srcs > 0); + } + public abstract void map(IntWritable key, V val, + OutputCollector<IntWritable, IntWritable> out, Reporter reporter) + throws IOException; + public void reduce(IntWritable key, Iterator<IntWritable> values, + OutputCollector<Text, Text> output, + Reporter reporter) throws IOException { + int seen = 0; + while (values.hasNext()) { + seen += values.next().get(); + } + assertTrue("Bad count for " + key.get(), verify(key.get(), seen)); + } + public abstract boolean verify(int key, int occ); + } + + private static class InnerJoinChecker + extends SimpleCheckerBase<TupleWritable> { + public void map(IntWritable key, TupleWritable val, + OutputCollector<IntWritable, IntWritable> out, Reporter reporter) + throws IOException { + int k = key.get(); + final String kvstr = "Unexpected tuple: " + stringify(key, val); + assertTrue(kvstr, 0 == k % (srcs * srcs)); + for (int i = 0; i < val.size(); ++i) { + final int vali = ((IntWritable)val.get(i)).get(); + assertTrue(kvstr, (vali - i) * srcs == 10 * k); + } + out.collect(key, one); + } + public boolean verify(int key, int occ) { + return (key == 0 && occ == 2) || + (key != 0 && (key % (srcs * srcs) == 0) && occ == 1); + } + } + + private static class OuterJoinChecker + extends SimpleCheckerBase<TupleWritable> { + public void map(IntWritable key, TupleWritable val, + OutputCollector<IntWritable, IntWritable> out, Reporter reporter) + throws IOException { + int k = key.get(); + final String kvstr = "Unexpected tuple: " + stringify(key, val); + if (0 == k % (srcs * srcs)) { + for (int i = 0; i < val.size(); ++i) { + assertTrue(kvstr, val.get(i) instanceof IntWritable); + final int vali = ((IntWritable)val.get(i)).get(); + assertTrue(kvstr, (vali - i) * srcs == 10 * k); + } + } else { + for (int i = 0; i < val.size(); ++i) { + if (i == k % srcs) { + assertTrue(kvstr, val.get(i) instanceof IntWritable); + final int vali = ((IntWritable)val.get(i)).get(); + assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i)); + } else { + assertTrue(kvstr, !val.has(i)); + } + } + } + out.collect(key, one); + } + public boolean verify(int key, int occ) { + if (key < srcs * srcs && (key % (srcs + 1)) == 0) + return 2 == occ; + return 1 == occ; + } + } + + private static class OverrideChecker + extends SimpleCheckerBase<IntWritable> { + public void map(IntWritable key, IntWritable val, + OutputCollector<IntWritable, IntWritable> out, Reporter reporter) + throws IOException { + int k = key.get(); + final int vali = val.get(); + final String kvstr = "Unexpected tuple: " + stringify(key, val); + if (0 == k % (srcs * srcs)) { + assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1); + } else { + final int i = k % srcs; + assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i)); + } + out.collect(key, one); + } + public boolean verify(int key, int occ) { + if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0) + return 2 == occ; + return 1 == occ; + } + } + + private static void joinAs(String jointype, + Class<? extends SimpleCheckerBase> c) throws Exception { + final int srcs = 4; + Configuration conf = new Configuration(); + JobConf job = new JobConf(conf, c); + Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype)); + Path[] src = writeSimpleSrc(base, conf, srcs); + job.set("mapred.join.expr", CompositeInputFormat.compose(jointype, + SequenceFileInputFormat.class, src)); + job.setInt("testdatamerge.sources", srcs); + job.setInputFormat(CompositeInputFormat.class); + job.setOutputPath(new Path(base, "out")); + + job.setMapperClass(c); + job.setReducerClass(c); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(IntWritable.class); + JobClient.runJob(job); + base.getFileSystem(job).delete(base); + } + + public void testSimpleInnerJoin() throws Exception { + joinAs("inner", InnerJoinChecker.class); + } + + public void testSimpleOuterJoin() throws Exception { + joinAs("outer", OuterJoinChecker.class); + } + + public void testSimpleOverride() throws Exception { + joinAs("override", OverrideChecker.class); + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java?rev=602240&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/join/TestTupleWritable.java Fri Dec 7 14:01:32 2007 @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.join; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +public class TestTupleWritable extends TestCase { + + private TupleWritable makeTuple(Writable[] writs) { + Writable[] sub1 = { writs[1], writs[2] }; + Writable[] sub3 = { writs[4], writs[5] }; + Writable[] sub2 = { writs[3], new TupleWritable(sub3), writs[6] }; + Writable[] vals = { writs[0], new TupleWritable(sub1), + new TupleWritable(sub2), writs[7], writs[8], + writs[9] }; + // [v0, [v1, v2], [v3, [v4, v5], v6], v7, v8, v9] + TupleWritable ret = new TupleWritable(vals); + for (int i = 0; i < 6; ++i) { + ret.setWritten(i); + } + ((TupleWritable)sub2[1]).setWritten(0); + ((TupleWritable)sub2[1]).setWritten(1); + ((TupleWritable)vals[1]).setWritten(0); + ((TupleWritable)vals[1]).setWritten(1); + for (int i = 0; i < 3; ++i) { + ((TupleWritable)vals[2]).setWritten(i); + } + return ret; + } + + private int verifIter(Writable[] writs, TupleWritable t, int i) { + for (Writable w : t) { + if (w instanceof TupleWritable) { + i = verifIter(writs, ((TupleWritable)w), i); + continue; + } + assertTrue("Bad value", w.equals(writs[i++])); + } + return i; + } + + public void testIterable() throws Exception { + Random r = new Random(); + Writable[] writs = { + new BooleanWritable(r.nextBoolean()), + new FloatWritable(r.nextFloat()), + new FloatWritable(r.nextFloat()), + new IntWritable(r.nextInt()), + new LongWritable(r.nextLong()), + new BytesWritable("dingo".getBytes()), + new LongWritable(r.nextLong()), + new IntWritable(r.nextInt()), + new BytesWritable("yak".getBytes()), + new IntWritable(r.nextInt()) + }; + TupleWritable t = new TupleWritable(writs); + for (int i = 0; i < 6; ++i) { + t.setWritten(i); + } + verifIter(writs, t, 0); + } + + public void testNestedIterable() throws Exception { + Random r = new Random(); + Writable[] writs = { + new BooleanWritable(r.nextBoolean()), + new FloatWritable(r.nextFloat()), + new FloatWritable(r.nextFloat()), + new IntWritable(r.nextInt()), + new LongWritable(r.nextLong()), + new BytesWritable("dingo".getBytes()), + new LongWritable(r.nextLong()), + new IntWritable(r.nextInt()), + new BytesWritable("yak".getBytes()), + new IntWritable(r.nextInt()) + }; + TupleWritable sTuple = makeTuple(writs); + assertTrue("Bad count", writs.length == verifIter(writs, sTuple, 0)); + } + + public void testWritable() throws Exception { + Random r = new Random(); + Writable[] writs = { + new BooleanWritable(r.nextBoolean()), + new FloatWritable(r.nextFloat()), + new FloatWritable(r.nextFloat()), + new IntWritable(r.nextInt()), + new LongWritable(r.nextLong()), + new BytesWritable("dingo".getBytes()), + new LongWritable(r.nextLong()), + new IntWritable(r.nextInt()), + new BytesWritable("yak".getBytes()), + new IntWritable(r.nextInt()) + }; + TupleWritable sTuple = makeTuple(writs); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + sTuple.write(new DataOutputStream(out)); + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + TupleWritable dTuple = new TupleWritable(); + dTuple.readFields(new DataInputStream(in)); + assertTrue("Failed to write/read tuple", sTuple.equals(dTuple)); + } + +}