Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java?rev=1235548&view=auto
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
(added)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
Tue Jan 24 23:21:58 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+public class TestMRKeyFieldBasedPartitioner extends TestCase {
+
+ /**
+ * Test is key-field-based partitioned works with empty key.
+ */
+ public void testEmptyKey() throws Exception {
+ int numReducers = 10;
+ KeyFieldBasedPartitioner<Text, Text> kfbp =
+ new KeyFieldBasedPartitioner<Text, Text>();
+ Configuration conf = new Configuration();
+ conf.setInt("num.key.fields.for.partition", 10);
+ kfbp.setConf(conf);
+ assertEquals("Empty key should map to 0th partition",
+ 0, kfbp.getPartition(new Text(), new Text(), numReducers));
+
+ // check if the hashcode is correct when no keyspec is specified
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ kfbp.setConf(conf);
+ String input = "abc\tdef\txyz";
+ int hashCode = input.hashCode();
+ int expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // check if the hashcode is correct with specified keyspec
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k2,2");
+ kfbp.setConf(conf);
+ String expectedOutput = "def";
+ byte[] eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, 0);
+ expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // test with invalid end index in keyspecs
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k2,5");
+ kfbp.setConf(conf);
+ expectedOutput = "def\txyz";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, 0);
+ expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // test with 0 end index in keyspecs
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k2");
+ kfbp.setConf(conf);
+ expectedOutput = "def\txyz";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, 0);
+ expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // test with invalid keyspecs
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k10");
+ kfbp.setConf(conf);
+ assertEquals("Partitioner doesnt work as expected", 0,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // test with multiple keyspecs
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k2,2 -k4,4");
+ kfbp.setConf(conf);
+ input = "abc\tdef\tpqr\txyz";
+ expectedOutput = "def";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, 0);
+ expectedOutput = "xyz";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, hashCode);
+ expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+
+ // test with invalid start index in keyspecs
+ kfbp = new KeyFieldBasedPartitioner<Text, Text>();
+ conf = new Configuration();
+ conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k2,2 -k30,21
-k4,4 -k5");
+ kfbp.setConf(conf);
+ expectedOutput = "def";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, 0);
+ expectedOutput = "xyz";
+ eBytes = expectedOutput.getBytes();
+ hashCode = kfbp.hashCode(eBytes, 0, eBytes.length - 1, hashCode);
+ expectedPartition = kfbp.getPartition(hashCode, numReducers);
+ assertEquals("Partitioner doesnt work as expected", expectedPartition,
+ kfbp.getPartition(new Text(input), new Text(), numReducers));
+ }
+}
Propchange:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?rev=1235548&view=auto
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
(added)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
Tue Jan 24 23:21:58 2012
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestTotalOrderPartitioner extends TestCase {
+
+ private static final Text[] splitStrings = new Text[] {
+ // -inf // 0
+ new Text("aabbb"), // 1
+ new Text("babbb"), // 2
+ new Text("daddd"), // 3
+ new Text("dddee"), // 4
+ new Text("ddhee"), // 5
+ new Text("dingo"), // 6
+ new Text("hijjj"), // 7
+ new Text("n"), // 8
+ new Text("yak"), // 9
+ };
+
+ static class Check<T> {
+ T data;
+ int part;
+ Check(T data, int part) {
+ this.data = data;
+ this.part = part;
+ }
+ }
+
+ private static final ArrayList<Check<Text>> testStrings =
+ new ArrayList<Check<Text>>();
+ static {
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("aaabb"), 0));
+ testStrings.add(new Check<Text>(new Text("aabbb"), 1));
+ testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
+ testStrings.add(new Check<Text>(new Text("babbb"), 2));
+ testStrings.add(new Check<Text>(new Text("baabb"), 1));
+ testStrings.add(new Check<Text>(new Text("yai"), 8));
+ testStrings.add(new Check<Text>(new Text("yak"), 9));
+ testStrings.add(new Check<Text>(new Text("z"), 9));
+ testStrings.add(new Check<Text>(new Text("ddngo"), 5));
+ testStrings.add(new Check<Text>(new Text("hi"), 6));
+ };
+
+ private static <T extends WritableComparable<?>> Path writePartitionFile(
+ String testname, Configuration conf, T[] splits) throws IOException {
+ final FileSystem fs = FileSystem.getLocal(conf);
+ final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
+ ).makeQualified(fs);
+ Path p = new Path(testdir, testname + "/_partition.lst");
+ TotalOrderPartitioner.setPartitionFile(conf, p);
+ conf.setInt("mapred.reduce.tasks", splits.length + 1);
+ SequenceFile.Writer w = null;
+ try {
+ w = SequenceFile.createWriter(fs, conf, p,
+ splits[0].getClass(), NullWritable.class,
+ SequenceFile.CompressionType.NONE);
+ for (int i = 0; i < splits.length; ++i) {
+ w.append(splits[i], NullWritable.get());
+ }
+ } finally {
+ if (null != w)
+ w.close();
+ }
+ return p;
+ }
+
+ public void testTotalOrderMemCmp() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ Configuration conf = new Configuration();
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordermemcmp", conf, splitStrings);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ public void testTotalOrderBinarySearch() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ Configuration conf = new Configuration();
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalorderbinarysearch", conf, splitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : testStrings) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+
+ public static class ReverseStringComparator implements RawComparator<Text> {
+ public int compare(Text a, Text b) {
+ return -a.compareTo(b);
+ }
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
+ b2, s2+n2, l2-n2);
+ }
+ }
+
+ public void testTotalOrderCustomComparator() throws Exception {
+ TotalOrderPartitioner<Text,NullWritable> partitioner =
+ new TotalOrderPartitioner<Text,NullWritable>();
+ Configuration conf = new Configuration();
+ Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
+ Arrays.sort(revSplitStrings, new ReverseStringComparator());
+ Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
+ "totalordercustomcomparator", conf, revSplitStrings);
+ conf.setBoolean(TotalOrderPartitioner.NATURAL_ORDER, false);
+ conf.setClass("mapred.mapoutput.key.class", Text.class, Object.class);
+ conf.setClass("mapred.output.key.comparator.class",
+ ReverseStringComparator.class, RawComparator.class);
+ ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("aaabb"), 9));
+ revCheck.add(new Check<Text>(new Text("aabbb"), 9));
+ revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
+ revCheck.add(new Check<Text>(new Text("babbb"), 8));
+ revCheck.add(new Check<Text>(new Text("baabb"), 8));
+ revCheck.add(new Check<Text>(new Text("yai"), 1));
+ revCheck.add(new Check<Text>(new Text("yak"), 1));
+ revCheck.add(new Check<Text>(new Text("z"), 0));
+ revCheck.add(new Check<Text>(new Text("ddngo"), 4));
+ revCheck.add(new Check<Text>(new Text("hi"), 3));
+ try {
+ partitioner.setConf(conf);
+ NullWritable nw = NullWritable.get();
+ for (Check<Text> chk : revCheck) {
+ assertEquals(chk.data.toString(), chk.part,
+ partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
+ }
+ } finally {
+ p.getFileSystem(conf).delete(p, true);
+ }
+ }
+}
Propchange:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
------------------------------------------------------------------------------
svn:eol-style = native