Author: cdouglas
Date: Thu Jul 10 16:22:31 2008
New Revision: 675806
URL: http://svn.apache.org/viewvc?rev=675806&view=rev
Log:
HADOOP-3721. Refactor CompositeRecordReader and related mapred.join classes
to make them clearer.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jul 10 16:22:31 2008
@@ -100,6 +100,9 @@
HADOOP-3726. Throw exceptions from TestCLI setup and teardown instead of
swallowing them. (Steve Loughran via cdouglas)
+ HADOOP-3721. Refactor CompositeRecordReader and related mapred.join classes
+ to make them clearer. (cdouglas)
+
Release 0.18.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
Thu Jul 10 16:22:31 2008
@@ -65,8 +65,9 @@
return false;
}
- public void replay(X val) throws IOException {
+ public boolean replay(X val) throws IOException {
WritableUtils.cloneInto(val, hold);
+ return true;
}
public void reset() {
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
Thu Jul 10 16:22:31 2008
@@ -149,11 +149,7 @@
class JoinCollector {
private K key;
private ResetableIterator<X>[] iters;
- private long partial = 0L;
- private long replaymask = 0L;
- private int start = 0;
private int pos = -1;
- private int iterpos = -1;
private boolean first = true;
/**
@@ -190,10 +186,8 @@
*/
public void reset(K key) {
this.key = key;
- start = 0;
- pos = 0;
first = true;
- partial = 0L;
+ pos = iters.length - 1;
for (int i = 0; i < iters.length; ++i) {
iters[i].reset();
}
@@ -205,12 +199,10 @@
public void clear() {
key = null;
pos = -1;
- first = true;
for (int i = 0; i < iters.length; ++i) {
iters[i].clear();
iters[i] = EMPTY;
}
- partial = 0L;
}
/**
@@ -228,52 +220,42 @@
*/
@SuppressWarnings("unchecked") // No static typeinfo on Tuples
protected boolean next(TupleWritable val) throws IOException {
- if (pos < 0) {
- clear();
- return false;
- }
- int i = start;
- if (first) { // Find first iterator with elements
- for (; i < iters.length && !iters[i].hasNext(); ++i);
- if (iters.length <= i) { // no children had key
- clear();
- return false;
- }
- start = i;
- for (int j = i; j < iters.length; ++j) {
- if (iters[j].hasNext()) {
- partial |= 1 << j;
+ if (first) {
+ int i = -1;
+ for (pos = 0; pos < iters.length; ++pos) {
+ if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
+ i = pos;
+ val.setWritten(i);
}
}
- iterpos = pos = iters.length - 1;
+ pos = i;
first = false;
- } else { // Copy all elements in partial into tuple
- for (; i < iterpos; ++i) {
- if ((partial & (1 << i)) != 0) {
- iters[i].replay((X)val.get(i));
- val.setWritten(i);
- }
+ if (pos < 0) {
+ clear();
+ return false;
}
+ return true;
}
- long partialwritten = val.mask();
- if (iters[i].next((X)val.get(i))) {
- val.setWritten(i);
+ while (0 <= pos && !(iters[pos].hasNext() &&
+ iters[pos].next((X)val.get(pos)))) {
+ --pos;
}
- for (++i; i < iters.length; ++i) {
- iters[i].reset();
- if (iters[i].hasNext() && iters[i].next((X)val.get(i))) {
+ if (pos < 0) {
+ clear();
+ return false;
+ }
+ val.setWritten(pos);
+ for (int i = 0; i < pos; ++i) {
+ if (iters[i].replay((X)val.get(i))) {
val.setWritten(i);
}
}
- iterpos = iters.length - 1;
- for (; iterpos > pos && !iters[iterpos].hasNext(); --iterpos);
- if (!iters[iterpos].hasNext()) {
- for (; !(pos < 0 || iters[pos].hasNext()); --pos);
- iterpos = pos;
- }
- replaymask = val.mask();
- if ((replaymask ^ partialwritten) == 0L) {
- return next(val);
+ while (pos + 1 < iters.length) {
+ ++pos;
+ iters[pos].reset();
+ if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
+ val.setWritten(pos);
+ }
}
return true;
}
@@ -282,18 +264,19 @@
* Replay the last Tuple emitted.
*/
@SuppressWarnings("unchecked") // No static typeinfo on Tuples
- public void replay(TupleWritable val) throws IOException {
+ public boolean replay(TupleWritable val) throws IOException {
// The last emitted tuple might have drawn on an empty source;
// it can't be cleared prematurely, b/c there may be more duplicate
// keys in iterator positions < pos
- if (first) {
- throw new IllegalStateException();
- }
+ assert !first;
+ boolean ret = false;
for (int i = 0; i < iters.length; ++i) {
- if ((replaymask & (1 << i)) != 0) {
- iters[i].replay((X)val.get(i));
+ if (iters[i].replay((X)val.get(i))) {
+ val.setWritten(i);
+ ret = true;
}
}
+ return ret;
}
/**
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
Thu Jul 10 16:22:31 2008
@@ -91,8 +91,8 @@
return jc.flush(val);
}
- public void replay(TupleWritable val) throws IOException {
- jc.replay(val);
+ public boolean replay(TupleWritable val) throws IOException {
+ return jc.replay(val);
}
public void reset() {
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
Thu Jul 10 16:22:31 2008
@@ -129,8 +129,9 @@
return ret;
}
- public void replay(V val) throws IOException {
+ public boolean replay(V val) throws IOException {
WritableUtils.cloneInto(val, emit(ivalue));
+ return true;
}
public void reset() {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java Thu
Jul 10 16:22:31 2008
@@ -377,7 +377,8 @@
throw new IOException("Error gathering splits from child RReader");
}
if (i > 0 && splits[i-1].length != tmp.length) {
- throw new IOException("Inconsistent split cardinality from child");
+ throw new IOException("Inconsistent split cardinality from child " +
+ i + " (" + splits[i-1].length + "/" + tmp.length + ")");
}
splits[i] = tmp;
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/ResetableIterator.java
Thu Jul 10 16:22:31 2008
@@ -35,10 +35,10 @@
public void close() throws IOException { }
public void clear() { }
public boolean next(U val) throws IOException {
- throw new UnsupportedOperationException();
+ return false;
}
- public void replay(U val) throws IOException {
- throw new UnsupportedOperationException();
+ public boolean replay(U val) throws IOException {
+ return false;
}
public void add(U item) throws IOException {
throw new UnsupportedOperationException();
@@ -46,7 +46,8 @@
}
/**
- * True iff a call to next will succeed.
+ * True if a call to next may return a value. This is permitted false
+ * positives, but not false negatives.
*/
public boolean hasNext();
@@ -63,7 +64,7 @@
/**
* Assign last value returned to actual.
*/
- public void replay(T val) throws IOException;
+ public boolean replay(T val) throws IOException;
/**
* Set iterator to return to the start of its range. Must be called after
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/StreamBackedIterator.java
Thu Jul 10 16:22:31 2008
@@ -62,9 +62,12 @@
return false;
}
- public void replay(X val) throws IOException {
+ public boolean replay(X val) throws IOException {
inbuf.reset();
+ if (0 == inbuf.available())
+ return false;
val.readFields(infbuf);
+ return true;
}
public void reset() {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java Thu
Jul 10 16:22:31 2008
@@ -22,14 +22,18 @@
import java.io.DataOutput;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
-public class FakeIF
- implements InputFormat<IncomparableKey,NullWritable> {
+public class FakeIF<K,V>
+ implements InputFormat<K,V>, JobConfigurable {
public static class FakeSplit implements InputSplit {
public void write(DataOutput out) throws IOException { }
@@ -38,6 +42,22 @@
public String[] getLocations() { return new String[0]; }
}
+ public static void setKeyClass(JobConf job, Class<?> k) {
+ job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
+ }
+
+ public static void setValClass(JobConf job, Class<?> v) {
+ job.setClass("test.fakeif.valclass", v, Writable.class);
+ }
+
+ private Class<?> keyclass;
+ private Class<?> valclass;
+
+ public void configure(JobConf job) {
+ keyclass = job.getClass("test.fakeif.keyclass", IncomparableKey.class,
WritableComparable.class);
+ valclass = job.getClass("test.fakeif.valclass", NullWritable.class,
WritableComparable.class);
+ }
+
public FakeIF() { }
public void validateInput(JobConf conf) { }
@@ -46,13 +66,18 @@
return new InputSplit[] { new FakeSplit() };
}
- public RecordReader<IncomparableKey,NullWritable> getRecordReader(
+ public RecordReader<K,V> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
- return new RecordReader<IncomparableKey,NullWritable>() {
- public boolean next(IncomparableKey key, NullWritable value)
- throws IOException { return false; }
- public IncomparableKey createKey() { return new IncomparableKey(); }
- public NullWritable createValue() { return NullWritable.get(); }
+ return new RecordReader<K,V>() {
+ public boolean next(K key, V value) throws IOException { return false; }
+ @SuppressWarnings("unchecked")
+ public K createKey() {
+ return (K)ReflectionUtils.newInstance(keyclass, null);
+ }
+ @SuppressWarnings("unchecked")
+ public V createValue() {
+ return (V)ReflectionUtils.newInstance(valclass, null);
+ }
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=675806&r1=675805&r2=675806&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Thu Jul 10 16:22:31 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -41,6 +42,7 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -247,6 +249,97 @@
joinAs("override", OverrideChecker.class);
}
+ public void testNestedJoin() throws Exception {
+ // outer(inner(S1,...,Sn),outer(S1,...Sn))
+ final int SOURCES = 3;
+ final int ITEMS = (SOURCES + 1) * (SOURCES + 1);
+ JobConf job = new JobConf();
+ Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
+ int[][] source = new int[SOURCES][];
+ for (int i = 0; i < SOURCES; ++i) {
+ source[i] = new int[ITEMS];
+ for (int j = 0; j < ITEMS; ++j) {
+ source[i][j] = (i + 2) * (j + 1);
+ }
+ }
+ Path[] src = new Path[SOURCES];
+ SequenceFile.Writer out[] = createWriters(base, job, SOURCES, src);
+ IntWritable k = new IntWritable();
+ for (int i = 0; i < SOURCES; ++i) {
+ IntWritable v = new IntWritable();
+ v.set(i);
+ for (int j = 0; j < ITEMS; ++j) {
+ k.set(source[i][j]);
+ out[i].append(k, v);
+ }
+ out[i].close();
+ }
+ out = null;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("outer(inner(");
+ for (int i = 0; i < SOURCES; ++i) {
+ sb.append(
+ CompositeInputFormat.compose(SequenceFileInputFormat.class,
+ src[i].toString()));
+ if (i + 1 != SOURCES) sb.append(",");
+ }
+ sb.append("),outer(");
+ sb.append(CompositeInputFormat.compose(FakeIF.class,"foobar"));
+ sb.append(",");
+ for (int i = 0; i < SOURCES; ++i) {
+ sb.append(
+ CompositeInputFormat.compose(SequenceFileInputFormat.class,
+ src[i].toString()));
+ sb.append(",");
+ }
+ sb.append(CompositeInputFormat.compose(FakeIF.class,"raboof") + "))");
+ job.set("mapred.join.expr", sb.toString());
+ job.setInputFormat(CompositeInputFormat.class);
+ Path outf = new Path(base, "out");
+ FileOutputFormat.setOutputPath(job, outf);
+ FakeIF.setKeyClass(job, IntWritable.class);
+ FakeIF.setValClass(job, IntWritable.class);
+
+ job.setMapperClass(IdentityMapper.class);
+ job.setReducerClass(IdentityReducer.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(TupleWritable.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ JobClient.runJob(job);
+
+ FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+ assertEquals(1, outlist.length);
+ assertTrue(0 < outlist[0].getLen());
+ SequenceFile.Reader r =
+ new SequenceFile.Reader(cluster.getFileSystem(),
+ outlist[0].getPath(), job);
+ TupleWritable v = new TupleWritable();
+ while (r.next(k, v)) {
+ assertFalse(((TupleWritable)v.get(1)).has(0));
+ assertFalse(((TupleWritable)v.get(1)).has(SOURCES + 1));
+ boolean chk = true;
+ int ki = k.get();
+ for (int i = 2; i < SOURCES + 2; ++i) {
+ if ((ki % i) == 0 && ki <= i * ITEMS) {
+ assertEquals(i - 2, ((IntWritable)
+ ((TupleWritable)v.get(1)).get((i - 1))).get());
+ } else chk = false;
+ }
+ if (chk) { // present in all sources; chk inner
+ assertTrue(v.has(0));
+ for (int i = 0; i < SOURCES; ++i)
+ assertTrue(((TupleWritable)v.get(0)).has(i));
+ } else { // should not be present in inner join
+ assertFalse(v.has(0));
+ }
+ }
+ r.close();
+ base.getFileSystem(job).delete(base, true);
+
+ }
+
public void testConfiguredInputFormat() throws Exception {
JobConf conf = new JobConf();
conf.set("mapred.join.expr", CompositeInputFormat.compose(