Author: ddas
Date: Thu May 15 04:31:20 2008
New Revision: 656585
URL: http://svn.apache.org/viewvc?rev=656585&view=rev
Log:
HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which splits
files into splits each of N lines. N can be specified by configuration property
mapred.line.input.format.linespermap, which defaults to 1. Contributed by
Amareshwari Sriramadasu.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656585&r1=656584&r2=656585&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 15 04:31:20 2008
@@ -86,6 +86,11 @@
be corrupt, retain all copies and mark the block as corrupt.
(Lohit Vjayarenu via rangadi)
+ HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which
+ splits files into splits each of N lines. N can be specified by
+ configuration property "mapred.line.input.format.linespermap", which
+ defaults to 1. (Amareshwari Sriramadasu via ddas)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=656585&r1=656584&r2=656585&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu May 15 04:31:20 2008
@@ -1057,6 +1057,13 @@
</description>
</property>
+ <property>
+ <name>mapred.line.input.format.linespermap</name>
+ <value>1</value>
+ <description> Number of lines per split in NLineInputFormat.
+ </description>
+ </property>
+
<!-- ipc properties -->
<property>
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=656585&view=auto
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
(added)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
Thu May 15 04:31:20 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper
+ * processes the same input file (s), but with computations are
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters
+ * (one set per line) as input in a control file
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified
+ * via a config variable in JobConf.).
+ *
+ * The NLineInputFormat can be used in such applications, that splits
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text>
+ implements JobConfigurable {
+ private int N = 1;
+
+ public RecordReader<LongWritable, Text> getRecordReader(
+ InputSplit genericSplit,
+ JobConf job,
+ Reporter reporter)
+ throws IOException {
+ reporter.setStatus(genericSplit.toString());
+ return new LineRecordReader(job, (FileSplit) genericSplit);
+ }
+
+ /**
+ * Logically splits the set of input files for the job, splits N lines
+ * of the input as one split.
+ *
+ * @see org.apache.hadoop.mapred.FileInputFormat#getSplits(JobConf, int)
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
+ Path[] files = listPaths(job);
+ for (int i=0; i < files.length; i++) {
+ Path fileName = files[i];
+ FileSystem fs = fileName.getFileSystem(job);
+ FileStatus status = fs.getFileStatus(fileName);
+ if (status.isDir() || !fs.exists(fileName)) {
+ throw new IOException("Not a file: " + fileName);
+ }
+ LineReader lr = null;
+ try {
+ FSDataInputStream in = fs.open(fileName);
+ lr = new LineReader(in, job);
+ Text line = new Text();
+ int numLines = 0;
+ long begin = 0;
+ long length = 0;
+ int num = -1;
+ while ((num = lr.readLine(line)) > 0) {
+ numLines++;
+ length += num;
+ if (numLines == N) {
+ splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+ begin += length;
+ length = 0;
+ numLines = 0;
+ }
+ }
+ if (numLines != 0) {
+ splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+ }
+
+ } finally {
+ if (lr != null) {
+ lr.close();
+ }
+ }
+ }
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
+ public void configure(JobConf conf) {
+ N = conf.getInt("mapred.line.input.format.linespermap", 1);
+ }
+}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=656585&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
Thu May 15 04:31:20 2008
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+public class TestLineInputFormat extends TestCase {
+ private static int MAX_LENGTH = 200;
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+
+ static {
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestLineInputFormat");
+
+ public void testFormat() throws Exception {
+ JobConf job = new JobConf();
+ Path file = new Path(workDir, "test.txt");
+
+ int seed = new Random().nextInt();
+ Random random = new Random(seed);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(job, workDir);
+ int numLinesPerMap = 5;
+ job.setInt("mapred.line.input.format.linespermap", numLinesPerMap);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH/10) + 1) {
+ // create a file with length entries
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+ checkFormat(job, numLinesPerMap);
+ }
+ }
+
+ // A reporter that does nothing
+ private static final Reporter voidReporter = Reporter.NULL;
+
+ void checkFormat(JobConf job, int expectedN) throws IOException{
+ NLineInputFormat format = new NLineInputFormat();
+ format.configure(job);
+ int ignoredNumSplits = 1;
+ InputSplit[] splits = format.getSplits(job, ignoredNumSplits);
+
+ // check all splits except last one
+ int count = 0;
+ for (int j = 0; j < splits.length -1; j++) {
+ assertEquals("There are no split locations", 0,
+ splits[j].getLocations().length);
+ RecordReader<LongWritable, Text> reader =
+ format.getRecordReader(splits[j], job, voidReporter);
+ Class readerClass = reader.getClass();
+ assertEquals("reader class is LineRecordReader.",
+ LineRecordReader.class, readerClass);
+ LongWritable key = reader.createKey();
+ Class keyClass = key.getClass();
+ assertEquals("Key class is LongWritable.", LongWritable.class, keyClass);
+ Text value = reader.createValue();
+ Class valueClass = value.getClass();
+ assertEquals("Value class is Text.", Text.class, valueClass);
+
+ try {
+ count = 0;
+ while (reader.next(key, value)) {
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals("number of lines in split is " + expectedN ,
+ expectedN, count);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestLineInputFormat().testFormat();
+ }
+}