Author: ddas
Date: Thu Aug 7 04:49:16 2008
New Revision: 683592
URL: http://svn.apache.org/viewvc?rev=683592&view=rev
Log:
HADOOP-2302. Provides a comparator for numerical sorting of key fields.
Contributed by Devaraj Das.
Added:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java
hadoop/core/trunk/src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=683592&r1=683591&r2=683592&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 7 04:49:16 2008
@@ -67,6 +67,9 @@
the TaskTracker, refactor Hadoop Metrics as an implementation of the api.
(Ari Rabkin via acmurthy)
+ HADOOP-2302. Provides a comparator for numerical sorting of key fields.
+ (ddas)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=683592&r1=683591&r2=683592&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Thu Aug 7 04:49:16 2008
@@ -36,6 +36,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
@@ -332,7 +333,7 @@
key.set(line, 0, length);
val.set("");
} else {
- UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos,
separator.length);
+ StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos,
separator.length);
}
} catch (CharacterCodingException e) {
LOG.warn(StringUtils.stringifyException(e));
Added:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java?rev=683592&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java
(added)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamKeyValUtil.java
Thu Aug 7 04:49:16 2008
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+public class StreamKeyValUtil {
+
+ /**
+ * Find the first occured tab in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @return position that first tab occures otherwise -1
+ */
+ public static int findTab(byte [] utf, int start, int length) {
+ for(int i=start; i<(start+length); i++) {
+ if (utf[i]==(byte)'\t') {
+ return i;
+ }
+ }
+ return -1;
+ }
+ /**
+ * Find the first occured tab in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @return position that first tab occures otherwise -1
+ */
+ public static int findTab(byte [] utf) {
+ return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, 0,
+ utf.length, (byte)'\t', 1);
+ }
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @param separatorLength the length of the separator between key and value
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, int start, int length,
+ Text key, Text val, int splitPos,
+ int separatorLength) throws IOException {
+ if (splitPos<start || splitPos >= (start+length))
+ throw new IllegalArgumentException("splitPos must be in the range " +
+ "[" + start + ", " + (start+length) +
"]: " + splitPos);
+ int keyLen = (splitPos-start);
+ byte [] keyBytes = new byte[keyLen];
+ System.arraycopy(utf, start, keyBytes, 0, keyLen);
+ int valLen = (start+length)-splitPos-separatorLength;
+ byte [] valBytes = new byte[valLen];
+ System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
+ key.set(keyBytes);
+ val.set(valBytes);
+ }
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, int start, int length,
+ Text key, Text val, int splitPos) throws
IOException {
+ splitKeyVal(utf, start, length, key, val, splitPos, 1);
+ }
+
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @param separatorLength the length of the separator between key and value
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos,
+ int separatorLength)
+ throws IOException {
+ splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+ }
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
+ throws IOException {
+ splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
+ }
+
+ /**
+ * Read a utf8 encoded line from a data input stream.
+ * @param lineReader LineReader to read the line from.
+ * @param out Text to read into
+ * @return number of bytes read
+ * @throws IOException
+ */
+ public static int readLine(LineReader lineReader, Text out)
+ throws IOException {
+ out.clear();
+ return lineReader.readLine(out);
+ }
+
+}
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?rev=683592&r1=683591&r2=683592&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Thu Aug 7 04:49:16 2008
@@ -21,10 +21,13 @@
import java.io.IOException;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
/**
* General utils for byte array containing UTF-8 encoded strings
+ * @deprecated use [EMAIL PROTECTED]
org.apache.hadoop.util.UTF8ByteArrayUtils} and
+ * [EMAIL PROTECTED] StreamKeyValUtil} instead
*/
public class UTF8ByteArrayUtils {
@@ -34,14 +37,11 @@
* @param start starting offset
* @param length no. of bytes
* @return position that first tab occures otherwise -1
+ * @deprecated use [EMAIL PROTECTED] StreamKeyValUtil#findTab(byte[], int,
int)}
*/
+ @Deprecated
public static int findTab(byte [] utf, int start, int length) {
- for(int i=start; i<(start+length); i++) {
- if (utf[i]==(byte)'\t') {
- return i;
- }
- }
- return -1;
+ return StreamKeyValUtil.findTab(utf, start, length);
}
/**
@@ -51,14 +51,13 @@
* @param end ending position
* @param b the byte to find
* @return position that first byte occures otherwise -1
+ * @deprecated use
+ * [EMAIL PROTECTED]
org.apache.hadoop.util.UTF8ByteArrayUtils#findByte(byte[], int,
+ * int, byte)}
*/
+ @Deprecated
public static int findByte(byte [] utf, int start, int end, byte b) {
- for(int i=start; i<end; i++) {
- if (utf[i]==b) {
- return i;
- }
- }
- return -1;
+ return org.apache.hadoop.util.UTF8ByteArrayUtils.findByte(utf, start, end,
b);
}
/**
@@ -68,22 +67,13 @@
* @param end ending position
* @param b the bytes to find
* @return position that first byte occures otherwise -1
+ * @deprecated use
+ * [EMAIL PROTECTED]
org.apache.hadoop.util.UTF8ByteArrayUtils#findBytes(byte[], int,
+ * int, byte[])}
*/
+ @Deprecated
public static int findBytes(byte [] utf, int start, int end, byte[] b) {
- int matchEnd = end - b.length;
- for(int i=start; i<=matchEnd; i++) {
- boolean matched = true;
- for(int j=0; j<b.length; j++) {
- if (utf[i+j] != b[j]) {
- matched = false;
- break;
- }
- }
- if (matched) {
- return i;
- }
- }
- return -1;
+ return org.apache.hadoop.util.UTF8ByteArrayUtils.findBytes(utf, start,
end, b);
}
/**
@@ -94,18 +84,14 @@
* @param b the byte to find
* @param n the desired occurrence of the given byte
* @return position that nth occurrence of the given byte if exists;
otherwise -1
+ * @deprecated use
+ * [EMAIL PROTECTED]
org.apache.hadoop.util.UTF8ByteArrayUtils#findNthByte(byte[], int,
+ * int, byte, int)}
*/
+ @Deprecated
public static int findNthByte(byte [] utf, int start, int length, byte b,
int n) {
- int pos = -1;
- int nextStart = start;
- for (int i = 0; i < n; i++) {
- pos = findByte(utf, nextStart, length, b);
- if (pos < 0) {
- return pos;
- }
- nextStart = pos + 1;
- }
- return pos;
+ return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, start,
+ length, b, n);
}
/**
@@ -114,18 +100,24 @@
* @param b the byte to find
* @param n the desired occurrence of the given byte
* @return position that nth occurrence of the given byte if exists;
otherwise -1
+ * @deprecated use
+ * [EMAIL PROTECTED]
org.apache.hadoop.util.UTF8ByteArrayUtils#findNthByte(byte[],
+ * byte, int)}
*/
+ @Deprecated
public static int findNthByte(byte [] utf, byte b, int n) {
- return findNthByte(utf, 0, utf.length, b, n);
+ return org.apache.hadoop.util.UTF8ByteArrayUtils.findNthByte(utf, b, n);
}
/**
* Find the first occured tab in a UTF-8 encoded string
* @param utf a byte array containing a UTF-8 encoded string
* @return position that first tab occures otherwise -1
+ * @deprecated use [EMAIL PROTECTED] StreamKeyValUtil#findTab(byte[])}
*/
+ @Deprecated
public static int findTab(byte [] utf) {
- return findNthByte(utf, 0, utf.length, (byte)'\t', 1);
+ return StreamKeyValUtil.findTab(utf);
}
/**
@@ -138,22 +130,17 @@
* @param val contains value upon the method is returned
* @param splitPos the split pos
* @param separatorLength the length of the separator between key and value
+ * @deprecated use
+ * [EMAIL PROTECTED] StreamKeyValUtil#splitKeyVal(byte[], int, int, Text,
Text,
+ * int, int)}
* @throws IOException
*/
+ @Deprecated
public static void splitKeyVal(byte[] utf, int start, int length,
Text key, Text val, int splitPos,
int separatorLength) throws IOException {
- if (splitPos<start || splitPos >= (start+length))
- throw new IllegalArgumentException("splitPos must be in the range " +
- "[" + start + ", " + (start+length) +
"]: " + splitPos);
- int keyLen = (splitPos-start);
- byte [] keyBytes = new byte[keyLen];
- System.arraycopy(utf, start, keyBytes, 0, keyLen);
- int valLen = (start+length)-splitPos-separatorLength;
- byte [] valBytes = new byte[valLen];
- System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
- key.set(keyBytes);
- val.set(valBytes);
+ StreamKeyValUtil.splitKeyVal(utf, start,
+ length, key, val, splitPos, separatorLength);
}
/**
@@ -165,11 +152,14 @@
* @param key contains key upon the method is returned
* @param val contains value upon the method is returned
* @param splitPos the split pos
+ * @deprecated use
+ * [EMAIL PROTECTED] StreamKeyValUtil#splitKeyVal(byte[], int, int, Text,
Text, int)}
* @throws IOException
*/
+ @Deprecated
public static void splitKeyVal(byte[] utf, int start, int length,
Text key, Text val, int splitPos) throws
IOException {
- splitKeyVal(utf, start, length, key, val, splitPos, 1);
+ StreamKeyValUtil.splitKeyVal(utf, start, length, key, val, splitPos);
}
@@ -181,12 +171,15 @@
* @param val contains value upon the method is returned
* @param splitPos the split pos
* @param separatorLength the length of the separator between key and value
+ * @deprecated use
+ * [EMAIL PROTECTED] StreamKeyValUtil#splitKeyVal(byte[], Text, Text, int,
int)}
* @throws IOException
*/
+ @Deprecated
public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos,
int separatorLength)
throws IOException {
- splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+ StreamKeyValUtil.splitKeyVal(utf, key, val, splitPos, separatorLength);
}
/**
@@ -196,23 +189,28 @@
* @param key contains key upon the method is returned
* @param val contains value upon the method is returned
* @param splitPos the split pos
+ * @deprecated use
+ * [EMAIL PROTECTED] StreamKeyValUtil#splitKeyVal(byte[], Text, Text, int)}
* @throws IOException
*/
+ @Deprecated
public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
throws IOException {
- splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
+ StreamKeyValUtil.splitKeyVal(utf, key, val, splitPos);
}
/**
* Read a utf8 encoded line from a data input stream.
* @param lineReader LineReader to read the line from.
* @param out Text to read into
- * @return number of bytes read
+ * @return number of bytes read
+ * @deprecated use
+ * [EMAIL PROTECTED] StreamKeyValUtil#readLine(LineRecordReader.LineReader,
Text)}
* @throws IOException
*/
+ @Deprecated
public static int readLine(LineReader lineReader, Text out)
throws IOException {
- out.clear();
- return lineReader.readLine(out);
+ return StreamKeyValUtil.readLine(lineReader, out);
}
}
Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java?rev=683592&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java
(added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/UTF8ByteArrayUtils.java
Thu Aug 7 04:49:16 2008
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+public class UTF8ByteArrayUtils {
+ /**
+ * Find the first occurrence of the given byte b in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param end ending position
+ * @param b the byte to find
+ * @return position that first byte occures otherwise -1
+ */
+ public static int findByte(byte [] utf, int start, int end, byte b) {
+ for(int i=start; i<end; i++) {
+ if (utf[i]==b) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Find the first occurrence of the given bytes b in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param end ending position
+ * @param b the bytes to find
+ * @return position that first byte occures otherwise -1
+ */
+ public static int findBytes(byte [] utf, int start, int end, byte[] b) {
+ int matchEnd = end - b.length;
+ for(int i=start; i<=matchEnd; i++) {
+ boolean matched = true;
+ for(int j=0; j<b.length; j++) {
+ if (utf[i+j] != b[j]) {
+ matched = false;
+ break;
+ }
+ }
+ if (matched) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param length the length of byte array
+ * @param b the byte to find
+ * @param n the desired occurrence of the given byte
+ * @return position that nth occurrence of the given byte if exists;
otherwise -1
+ */
+ public static int findNthByte(byte [] utf, int start, int length, byte b,
int n) {
+ int pos = -1;
+ int nextStart = start;
+ for (int i = 0; i < n; i++) {
+ pos = findByte(utf, nextStart, length, b);
+ if (pos < 0) {
+ return pos;
+ }
+ nextStart = pos + 1;
+ }
+ return pos;
+ }
+
+ /**
+ * Find the nth occurrence of the given byte b in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param b the byte to find
+ * @param n the desired occurrence of the given byte
+ * @return position that nth occurrence of the given byte if exists;
otherwise -1
+ */
+ public static int findNthByte(byte [] utf, byte b, int n) {
+ return findNthByte(utf, 0, utf.length, b, n);
+ }
+
+}
+
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=683592&r1=683591&r2=683592&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Thu Aug
7 04:49:16 2008
@@ -39,6 +39,8 @@
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
+import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
@@ -510,6 +512,58 @@
theClass, RawComparator.class);
}
+ /**
+ * Set the [EMAIL PROTECTED] KeyFieldBasedComparator} options used to
compare keys.
+ *
+ * @param keySpec the key specification of the form -k pos1[,pos2], where,
+ * pos is of the form f[.c][opts], where f is the number
+ * of the key field to use, and c is the number of the first character from
+ * the beginning of the field. Fields and character posns are numbered
+ * starting with 1; a character position of zero in pos2 indicates the
+ * field's last character. If '.c' is omitted from pos1, it defaults to 1
+ * (the beginning of the field); if omitted from pos2, it defaults to 0
+ * (the end of the field). opts are ordering options. The supported options
+ * are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ */
+ public void setKeyFieldComparatorOptions(String keySpec) {
+ setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
+ set("mapred.text.key.comparator.options", keySpec);
+ }
+
+ /**
+ * Get the [EMAIL PROTECTED] KeyFieldBasedComparator} options
+ */
+ public String getKeyFieldComparatorOption() {
+ return get("mapred.text.key.comparator.options");
+ }
+
+ /**
+ * Set the [EMAIL PROTECTED] KeyFieldBasedPartitioner} options used for
+ * [EMAIL PROTECTED] Partitioner}
+ *
+ * @param keySpec the key specification of the form -k pos1[,pos2], where,
+ * pos is of the form f[.c][opts], where f is the number
+ * of the key field to use, and c is the number of the first character from
+ * the beginning of the field. Fields and character posns are numbered
+ * starting with 1; a character position of zero in pos2 indicates the
+ * field's last character. If '.c' is omitted from pos1, it defaults to 1
+ * (the beginning of the field); if omitted from pos2, it defaults to 0
+ * (the end of the field).
+ */
+ public void setKeyFieldPartitionerOptions(String keySpec) {
+ setPartitionerClass(KeyFieldBasedPartitioner.class);
+ set("mapred.text.key.partitioner.options", keySpec);
+ }
+
+ /**
+ * Get the [EMAIL PROTECTED] KeyFieldBasedPartitioner} options
+ */
+ public String getKeyFieldPartitionerOption() {
+ return get("mapred.text.key.partitioner.options");
+ }
+
/**
* Get the user defined [EMAIL PROTECTED] WritableComparable} comparator for
* grouping keys of inputs to the reduce.
@@ -1261,6 +1315,5 @@
}
return null;
}
-
}
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java?rev=683592&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
Thu Aug 7 04:49:16 2008
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This comparator implementation provides a subset of the features provided
+ * by the Unix/GNU Sort. In particular, the supported features are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ * of the field to use, and c is the number of the first character from the
+ * beginning of the field. Fields and character posns are numbered starting
+ * with 1; a character position of zero in pos2 indicates the field's last
+ * character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ * of the field); if omitted from pos2, it defaults to 0 (the end of the
+ * field). opts are ordering options (any of 'nr' as described above).
+ * We assume that the fields in the key are separated by
+ * map.output.key.field.separator.
+ */
+
+public class KeyFieldBasedComparator<K, V> extends WritableComparator
+implements JobConfigurable {
+ private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+ private static final byte NEGATIVE = (byte)'-';
+ private static final byte ZERO = (byte)'0';
+ private static final byte DECIMAL = (byte)'.';
+
+ public void configure(JobConf job) {
+ String option = job.getKeyFieldComparatorOption();
+ String keyFieldSeparator = job.get("map.output.key.field.separator","\t");
+ keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+ keyFieldHelper.parseOption(option);
+ }
+
+ public KeyFieldBasedComparator() {
+ super(Text.class);
+ }
+
+
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+ int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+ List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+ if (allKeySpecs.size() == 0) {
+ return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
+ }
+ int []lengthIndicesFirst = keyFieldHelper.getWordLengths(b1, s1+n1, s1+l1);
+ int []lengthIndicesSecond = keyFieldHelper.getWordLengths(b2, s2+n2,
s2+l2);
+ for (KeyDescription keySpec : allKeySpecs) {
+ int startCharFirst = keyFieldHelper.getStartOffset(b1, s1+n1, s1+l1,
lengthIndicesFirst,
+ keySpec);
+ int endCharFirst = keyFieldHelper.getEndOffset(b1, s1+n1, s1+l1,
lengthIndicesFirst,
+ keySpec);
+ int startCharSecond = keyFieldHelper.getStartOffset(b2, s2+n2, s2+l2,
lengthIndicesSecond,
+ keySpec);
+ int endCharSecond = keyFieldHelper.getEndOffset(b2, s2+n2, s2+l2,
lengthIndicesSecond,
+ keySpec);
+ int result;
+ if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2,
+ startCharSecond, endCharSecond, keySpec)) != 0) {
+ return result;
+ }
+ }
+ return 0;
+ }
+
+ private int compareByteSequence(byte[] first, int start1, int end1,
+ byte[] second, int start2, int end2, KeyDescription key) {
+ if (start1 == -1) {
+ if (key.reverse) {
+ return 1;
+ }
+ return -1;
+ }
+ if (start2 == -1) {
+ if (key.reverse) {
+ return -1;
+ }
+ return 1;
+ }
+ int compareResult = 0;
+ if (!key.numeric) {
+ compareResult = compareBytes(first, start1, end1, second, start2, end2);
+ }
+ if (key.numeric) {
+ compareResult = numericalCompare (first, start1, end1, second, start2,
end2);
+ }
+ if (key.reverse) {
+ return -compareResult;
+ }
+ return compareResult;
+ }
+
+ private int numericalCompare (byte[] a, int start1, int end1,
+ byte[] b, int start2, int end2) {
+ int i = start1;
+ int j = start2;
+ int mul = 1;
+ byte first_a = a[i];
+ byte first_b = b[j];
+ if (first_a == NEGATIVE) {
+ if (first_b != NEGATIVE) {
+ //check for cases like -0.0 and 0.0 (they should be declared equal)
+ return oneNegativeCompare(a,start1+1,end1,b,start2,end2);
+ }
+ i++;
+ }
+ if (first_b == NEGATIVE) {
+ if (first_a != NEGATIVE) {
+ //check for cases like 0.0 and -0.0 (they should be declared equal)
+ return -oneNegativeCompare(b,start2+1,end2,a,start1,end1);
+ }
+ j++;
+ }
+ if (first_b == NEGATIVE && first_a == NEGATIVE) {
+ mul = -1;
+ }
+
+ //skip over ZEROs
+ while (i <= end1) {
+ if (a[i] != ZERO) {
+ break;
+ }
+ i++;
+ }
+ while (j <= end2) {
+ if (b[j] != ZERO) {
+ break;
+ }
+ j++;
+ }
+
+ //skip over equal characters and stopping at the first nondigit char
+ //The nondigit character could be '.'
+ while (i <= end1 && j <= end2) {
+ if (!isdigit(a[i]) || a[i] != b[j]) {
+ break;
+ }
+ i++; j++;
+ }
+ if (i <= end1) {
+ first_a = a[i];
+ }
+ if (j <= end2) {
+ first_b = b[j];
+ }
+ //store the result of the difference. This could be final result if the
+ //number of digits in the mantissa is the same in both the numbers
+ int firstResult = first_a - first_b;
+
+ //check whether we hit a decimal in the earlier scan
+ if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
+ (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
+ return ((mul < 0) ? -decimalCompare(a,i,end1,b,j,end2) :
+ decimalCompare(a,i,end1,b,j,end2));
+ }
+ //check the number of digits in the mantissa of the numbers
+ int numRemainDigits_a = 0;
+ int numRemainDigits_b = 0;
+ while (i <= end1) {
+ //if we encounter a non-digit treat the corresponding number as being
+ //smaller
+ if (isdigit(a[i++])) {
+ numRemainDigits_a++;
+ } else break;
+ }
+ while (j <= end2) {
+ //if we encounter a non-digit treat the corresponding number as being
+ //smaller
+ if (isdigit(b[j++])) {
+ numRemainDigits_b++;
+ } else break;
+ }
+ int ret = numRemainDigits_a - numRemainDigits_b;
+ if (ret == 0) {
+ return ((mul < 0) ? -firstResult : firstResult);
+ } else {
+ return ((mul < 0) ? -ret : ret);
+ }
+ }
+ private boolean isdigit(byte b) {
+ if ('0' <= b && b <= '9') {
+ return true;
+ }
+ return false;
+ }
+ private int decimalCompare(byte[] a, int i, int end1,
+ byte[] b, int j, int end2) {
+ if (i > end1) {
+ //if a[] has nothing remaining
+ return -decimalCompare1(b, ++j, end2);
+ }
+ if (j > end2) {
+ //if b[] has nothing remaining
+ return decimalCompare1(a, ++i, end1);
+ }
+ if (a[i] == DECIMAL && b[j] == DECIMAL) {
+ while (i <= end1 && j <= end2) {
+ if (a[i] != b[j]) {
+ if (isdigit(a[i]) && isdigit(b[j])) {
+ return a[i] - b[j];
+ }
+ if (isdigit(a[i])) {
+ return 1;
+ }
+ if (isdigit(b[j])) {
+ return -1;
+ }
+ return 0;
+ }
+ i++; j++;
+ }
+ if (i > end1 && j > end2) {
+ return 0;
+ }
+
+ if (i > end1) {
+ //check whether there is a non-ZERO digit after potentially
+ //a number of ZEROs (e.g., a=.4444, b=.444400004)
+ return -decimalCompare1(b, j, end2);
+ }
+ if (j > end2) {
+ //check whether there is a non-ZERO digit after potentially
+ //a number of ZEROs (e.g., b=.4444, a=.444400004)
+ return decimalCompare1(a, i, end1);
+ }
+ }
+ else if (a[i] == DECIMAL) {
+ return decimalCompare1(a, ++i, end1);
+ }
+ else if (b[j] == DECIMAL) {
+ return -decimalCompare1(b, ++j, end2);
+ }
+ return 0;
+ }
+
+ private int decimalCompare1(byte[] a, int i, int end) {
+ while (i <= end) {
+ if (a[i] == ZERO) {
+ i++;
+ continue;
+ }
+ if (isdigit(a[i])) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ return 0;
+ }
+
+ private int oneNegativeCompare(byte[] a, int start1, int end1,
+ byte[] b, int start2, int end2) {
+ //here a[] is negative and b[] is positive
+ //We have to ascertain whether the number contains any digits.
+ //If it does, then it is a smaller number for sure. If not,
+ //then we need to scan b[] to find out whether b[] has a digit
+ //If b[] does contain a digit, then b[] is certainly
+ //greater. If not, that is, both a[] and b[] don't contain
+ //digits then they should be considered equal.
+ if (!isZero(a, start1, end1)) {
+ return -1;
+ }
+ //reached here - this means that a[] is a ZERO
+ if (!isZero(b, start2, end2)) {
+ return -1;
+ }
+ //reached here - both numbers are basically ZEROs and hence
+ //they should compare equal
+ return 0;
+ }
+
+ private boolean isZero(byte a[], int start, int end) {
+ //check for zeros in the significand part as well as the decimal part
+ //note that we treat the non-digit characters as ZERO
+ int i = start;
+ //we check the significand for being a ZERO
+ while (i <= end) {
+ if (a[i] != ZERO) {
+ if (a[i] != DECIMAL && isdigit(a[i])) {
+ return false;
+ }
+ break;
+ }
+ i++;
+ }
+
+ if (i != (end+1) && a[i++] == DECIMAL) {
+ //we check the decimal part for being a ZERO
+ while (i <= end) {
+ if (a[i] != ZERO) {
+ if (isdigit(a[i])) {
+ return false;
+ }
+ break;
+ }
+ i++;
+ }
+ }
+ return true;
+ }
+}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java?rev=683592&r1=683591&r2=683592&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
Thu Aug 7 04:49:16 2008
@@ -18,37 +18,83 @@
package org.apache.hadoop.mapred.lib;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
+ /**
+ * Defines a way to partition keys based on certain key fields (also see
+ * [EMAIL PROTECTED] KeyFieldBasedComparator}.
+ * The key specification supported is of the form -k pos1[,pos2], where,
+ * pos is of the form f[.c][opts], where f is the number
+ * of the key field to use, and c is the number of the first character from
+ * the beginning of the field. Fields and character posns are numbered
+ * starting with 1; a character position of zero in pos2 indicates the
+ * field's last character. If '.c' is omitted from pos1, it defaults to 1
+ * (the beginning of the field); if omitted from pos2, it defaults to 0
+ * (the end of the field).
+ *
+ */
public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
+ private static final Log LOG =
LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
private int numOfPartitionFields;
-
- private String keyFieldSeparator;
+
+ private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
public void configure(JobConf job) {
- this.keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
- this.numOfPartitionFields = job.getInt("num.key.fields.for.partition", 0);
+ String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
+ keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+ if (job.get("num.key.fields.for.partition") != null) {
+ LOG.warn("Using deprecated num.key.fields.for.partition. " +
+ "Use mapred.text.key.partitioner.options instead");
+ this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
+ keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
+ } else {
+ String option = job.getKeyFieldPartitionerOption();
+ keyFieldHelper.parseOption(option);
+ }
}
- /** Use [EMAIL PROTECTED] Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
- String partitionKeyStr = key.toString();
- String[] fields = partitionKeyStr.split(this.keyFieldSeparator);
- if (this.numOfPartitionFields > 0
- && this.numOfPartitionFields < fields.length) {
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < this.numOfPartitionFields; i++) {
- sb.append(fields[i]).append(this.keyFieldSeparator);
- }
- partitionKeyStr = sb.toString();
- if (partitionKeyStr.length() > 0) {
- partitionKeyStr = partitionKeyStr.substring(0,
- partitionKeyStr.length() - 1);
- }
+ byte[] keyBytes;
+
+ List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+ if (allKeySpecs.size() == 0) {
+ return (key.toString().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+ }
+
+ try {
+ keyBytes = key.toString().getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("The current system does not " +
+ "support UTF-8 encoding!", e);
+ }
+ int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
+ keyBytes.length);
+ int currentHash = 0;
+ for (KeyDescription keySpec : allKeySpecs) {
+ int startChar = keyFieldHelper.getStartOffset(keyBytes, 0,
keyBytes.length,
+ lengthIndicesFirst, keySpec);
+ int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
+ lengthIndicesFirst, keySpec);
+ currentHash = hashCode(keyBytes, startChar, endChar,
+ currentHash);
}
- return (partitionKeyStr.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+ return (currentHash & Integer.MAX_VALUE) % numReduceTasks;
}
+
+ protected int hashCode(byte[] b, int start, int end, int currentHash) {
+ for (int i = start; i <= end; i++) {
+ currentHash = 31*currentHash + b[i];
+ }
+ return currentHash;
+ }
+
}
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java?rev=683592&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/KeyFieldHelper.java
Thu Aug 7 04:49:16 2008
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
+
+/**
+ * This is used in [EMAIL PROTECTED] KeyFieldBasedComparator} &
+ * [EMAIL PROTECTED] KeyFieldBasedPartitioner}. Defines all the methods
+ * for parsing key specifications. The key specification is of the form:
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ * of the field to use, and c is the number of the first character from the
+ * beginning of the field. Fields and character posns are numbered starting
+ * with 1; a character position of zero in pos2 indicates the field's last
+ * character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ * of the field); if omitted from pos2, it defaults to 0 (the end of the
+ * field). opts are ordering options (supported options are 'nr').
+ */
+
+class KeyFieldHelper {
+
+ protected static class KeyDescription {
+ int beginFieldIdx = 1;
+ int beginChar = 1;
+ int endFieldIdx = 0;
+ int endChar = 0;
+ boolean numeric;
+ boolean reverse;
+ }
+
+ private List<KeyDescription> allKeySpecs = new ArrayList<KeyDescription>();
+ private byte[] keyFieldSeparator;
+ private boolean keySpecSeen = false;
+
+ public void setKeyFieldSeparator(String keyFieldSeparator) {
+ try {
+ this.keyFieldSeparator =
+ keyFieldSeparator.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("The current system does not " +
+ "support UTF-8 encoding!", e);
+ }
+ }
+
+ /** Required for backcompatibility with num.key.fields.for.partition in
+ * [EMAIL PROTECTED] KeyFieldBasedPartitioner} */
+ public void setKeyFieldSpec(int start, int end) {
+ if (end >= start) {
+ KeyDescription k = new KeyDescription();
+ k.beginFieldIdx = start;
+ k.endFieldIdx = end;
+ keySpecSeen = true;
+ allKeySpecs.add(k);
+ }
+ }
+
+ public List<KeyDescription> keySpecs() {
+ return allKeySpecs;
+ }
+
+ public int[] getWordLengths(byte []b, int start, int end) {
+ //Given a string like "hello how are you", it returns an array
+ //like [4 5, 3, 3, 3], where the first element is the number of
+ //fields
+ if (!keySpecSeen) {
+ //if there were no key specs, then the whole key is one word
+ return new int[] {1};
+ }
+ int[] lengths = new int[10];
+ int currLenLengths = lengths.length;
+ int idx = 1;
+ int pos;
+ while ((pos = UTF8ByteArrayUtils.findBytes(b, start, end,
+ keyFieldSeparator)) != -1) {
+ if (++idx == currLenLengths) {
+ int[] temp = lengths;
+ lengths = new int[(currLenLengths = currLenLengths*2)];
+ System.arraycopy(temp, 0, lengths, 0, temp.length);
+ }
+ lengths[idx - 1] = pos - start;
+ start = pos + 1;
+ }
+
+ if (start != end) {
+ lengths[idx] = end - start;
+ }
+ lengths[0] = idx; //number of words is the first element
+ return lengths;
+ }
+ public int getStartOffset(byte[]b, int start, int end,
+ int []lengthIndices, KeyDescription k) {
+ //if -k2.5,2 is the keyspec, the startChar is lengthIndices[1] + 5
+ //note that the [0]'th element is the number of fields in the key
+ if (lengthIndices[0] >= k.beginFieldIdx) {
+ int position = 0;
+ for (int i = 1; i < k.beginFieldIdx; i++) {
+ position += lengthIndices[i] + keyFieldSeparator.length;
+ }
+ if (position + k.beginChar <= (end - start)) {
+ return start + position + k.beginChar - 1;
+ }
+ }
+ return -1;
+ }
+ public int getEndOffset(byte[]b, int start, int end,
+ int []lengthIndices, KeyDescription k) {
+ //if -k2,2.8 is the keyspec, the endChar is lengthIndices[1] + 8
+ //note that the [0]'th element is the number of fields in the key
+ if (k.endFieldIdx == 0) {
+ //there is no end field specified for this keyspec. So the remaining
+ //part of the key is considered in its entirety.
+ return end;
+ }
+ if (lengthIndices[0] >= k.endFieldIdx) {
+ int position = 0;
+ int i;
+ for (i = 1; i < k.endFieldIdx; i++) {
+ position += lengthIndices[i] + keyFieldSeparator.length;
+ }
+ if (k.endChar == 0) {
+ position += lengthIndices[i];
+ }
+ if (position + k.endChar <= (end - start)) {
+ return start + position + k.endChar - 1;
+ }
+ return end;
+ }
+ return end;
+ }
+ public void parseOption(String option) {
+ if (option == null || option.equals("")) {
+ //we will have only default comparison
+ return;
+ }
+ StringTokenizer args = new StringTokenizer(option);
+ KeyDescription global = new KeyDescription();
+ while (args.hasMoreTokens()) {
+ String arg = args.nextToken();
+ if (arg.equals("-n")) {
+ global.numeric = true;
+ }
+ if (arg.equals("-r")) {
+ global.reverse = true;
+ }
+ if (arg.equals("-nr")) {
+ global.numeric = true;
+ global.reverse = true;
+ }
+ if (arg.startsWith("-k")) {
+ KeyDescription k = parseKey(arg, args);
+ if (k != null) {
+ allKeySpecs.add(k);
+ keySpecSeen = true;
+ }
+ }
+ }
+ for (KeyDescription key : allKeySpecs) {
+ if (!(key.reverse | key.numeric)) {
+ key.reverse = global.reverse;
+ key.numeric = global.numeric;
+ }
+ }
+ if (allKeySpecs.size() == 0) {
+ allKeySpecs.add(global);
+ }
+ }
+
+ private KeyDescription parseKey(String arg, StringTokenizer args) {
+ //we allow for -k<arg> and -k <arg>
+ String keyArgs = null;
+ if (arg.length() == 2) {
+ if (args.hasMoreTokens()) {
+ keyArgs = args.nextToken();
+ }
+ } else {
+ keyArgs = arg.substring(2);
+ }
+ if (keyArgs == null || keyArgs.length() == 0) {
+ return null;
+ }
+ StringTokenizer st = new StringTokenizer(keyArgs,"nr.,",true);
+
+ KeyDescription key = new KeyDescription();
+
+ String token;
+ //the key is of the form 1[.3][nr][,1.5][nr]
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ //the first token must be a number
+ key.beginFieldIdx = Integer.parseInt(token);
+ }
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ if (token.equals(".")) {
+ token = st.nextToken();
+ key.beginChar = Integer.parseInt(token);
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ } else {
+ return key;
+ }
+ }
+ do {
+ if (token.equals("n")) {
+ key.numeric = true;
+ }
+ else if (token.equals("r")) {
+ key.reverse = true;
+ }
+ else break;
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ } else {
+ return key;
+ }
+ } while (true);
+ if (token.equals(",")) {
+ token = st.nextToken();
+ //the first token must be a number
+ key.endFieldIdx = Integer.parseInt(token);
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ if (token.equals(".")) {
+ token = st.nextToken();
+ key.endChar = Integer.parseInt(token);
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ } else {
+ return key;
+ }
+ }
+ do {
+ if (token.equals("n")) {
+ key.numeric = true;
+ }
+ else if (token.equals("r")) {
+ key.reverse = true;
+ }
+ else {
+ throw new IllegalArgumentException("Invalid -k argument. " +
+ "Must be of the form -k pos1,[pos2], where pos is of the form "
+
+ "f[.c]nr");
+ }
+ if (st.hasMoreTokens()) {
+ token = st.nextToken();
+ } else {
+ break;
+ }
+ } while (true);
+ }
+ return key;
+ }
+ throw new IllegalArgumentException("Invalid -k argument. " +
+ "Must be of the form -k pos1,[pos2], where pos is of the form " +
+ "f[.c]nr");
+ }
+ return key;
+ }
+ private void printKey(KeyDescription key) {
+ System.out.println("key.beginFieldIdx: " + key.beginFieldIdx);
+ System.out.println("key.beginChar: " + key.beginChar);
+ System.out.println("key.endFieldIdx: " + key.endFieldIdx);
+ System.out.println("key.endChar: " + key.endChar);
+ System.out.println("key.numeric: " + key.numeric);
+ System.out.println("key.reverse: " + key.reverse);
+ System.out.println("parseKey over");
+ }
+}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=683592&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
Thu Aug 7 04:49:16 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+public class TestKeyFieldBasedComparator extends HadoopTestCase {
+ JobConf conf;
+ String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.1 4444 011 011 234";
+ String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.0 4444.1";
+
+ public TestKeyFieldBasedComparator() throws IOException {
+ super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ conf = createJobConf();
+ }
+ public void configure(String keySpec, int expect) throws Exception {
+ Path testdir = new Path("build/test/test.mapred.spill");
+ Path inDir = new Path(testdir, "in");
+ Path outDir = new Path(testdir, "out");
+ FileSystem fs = getFileSystem();
+ fs.delete(testdir, true);
+ conf.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(LongWritable.class);
+
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(2);
+
+ conf.setOutputFormat(TextOutputFormat.class);
+ conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
+ conf.setKeyFieldComparatorOptions(keySpec);
+ conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
+ conf.set("map.output.key.field.separator", " ");
+ conf.setMapperClass(InverseMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ if (!fs.mkdirs(testdir)) {
+ throw new IOException("Mkdirs failed to create " + testdir.toString());
+ }
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ // set up input data in 2 files
+ Path inFile = new Path(inDir, "part0");
+ FileOutputStream fos = new FileOutputStream(inFile.toString());
+ fos.write((line1 + "\n").getBytes());
+ fos.write((line2 + "\n").getBytes());
+ fos.close();
+ JobClient jc = new JobClient(conf);
+ RunningJob r_job = jc.submitJob(conf);
+ while (!r_job.isComplete()) {
+ Thread.sleep(1000);
+ }
+
+ if (!r_job.isSuccessful()) {
+ fail("Oops! The job broke due to an unexpected error");
+ }
+ Path[] outputFiles = FileUtil.stat2Paths(
+ getFileSystem().listStatus(outDir,
+ new OutputLogFilter()));
+ if (outputFiles.length > 0) {
+ InputStream is = getFileSystem().open(outputFiles[0]);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = reader.readLine();
+ //make sure we get what we expect as the first line, and also
+ //that we have two lines (both the lines must end up in the same
+ //reducer since the partitioner takes the same key spec for all
+ //lines
+ if (expect == 1) {
+ assertTrue(line.startsWith(line1));
+ } else if (expect == 2) {
+ assertTrue(line.startsWith(line2));
+ }
+ line = reader.readLine();
+ if (expect == 1) {
+ assertTrue(line.startsWith(line2));
+ } else if (expect == 2) {
+ assertTrue(line.startsWith(line1));
+ }
+ reader.close();
+ }
+ }
+ public void testBasicUnixComparator() throws Exception {
+ configure("-k1,1n", 1);
+ configure("-k2,2n", 1);
+ configure("-k2.2,2n", 2);
+ configure("-k3.4,3n", 2);
+ configure("-k3.2,3.3n -k4,4n", 2);
+ configure("-k3.2,3.3n -k4,4nr", 1);
+ configure("-k2.4,2.4n", 2);
+ configure("-k7,7", 1);
+ configure("-k7,7n", 2);
+ configure("-k8,8n", 2);
+ configure("-k9,9n", 1);
+ configure("-k11,11",2);
+ configure("-k10,10",2);
+ }
+}