Author: tomwhite
Date: Fri Jul 18 03:27:37 2008
New Revision: 677872
URL: http://svn.apache.org/viewvc?rev=677872&view=rev
Log:
HADOOP-372. Add support for multiple input paths with a different InputFormat
and Mapper for each path. Contributed by Chris Smith.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 18 03:27:37 2008
@@ -29,6 +29,9 @@
HADOOP-2325. Require Java 6. (cutting)
+ HADOOP-372. Add support for multiple input paths with a different
+ InputFormat and Mapper for each path. (Chris Smith via tomwhite)
+
NEW FEATURES
HADOOP-3341. Allow streaming jobs to specify the field separator for map
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java?rev=677872&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
Fri Jul 18 03:27:37 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An [EMAIL PROTECTED] InputFormat} that delegates behaviour of paths to
multiple other
+ * InputFormats.
+ *
+ * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ */
+public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
+
+ @Deprecated
+ public void validateInput(JobConf conf) throws IOException {
+ JobConf confCopy = new JobConf(conf);
+ Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
+ for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+ Path path = entry.getKey();
+ InputFormat format = entry.getValue();
+ FileInputFormat.setInputPaths(confCopy, path);
+ format.validateInput(confCopy);
+ }
+ }
+
+ public InputSplit[] getSplits(JobConf conf, int numSplits) throws
IOException {
+
+ JobConf confCopy = new JobConf(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
+ Map<Path, Class<? extends Mapper>> mapperMap = FileInputFormat
+ .getMapperTypeMap(conf);
+ Map<Class<? extends InputFormat>, List<Path>> formatPaths
+ = new HashMap<Class<? extends InputFormat>, List<Path>>();
+
+ // First, build a map of InputFormats to Paths
+ for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+ if (!formatPaths.containsKey(entry.getValue().getClass())) {
+ formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
+ }
+
+ formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
+ }
+
+ for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry :
+ formatPaths.entrySet()) {
+ Class<? extends InputFormat> formatClass = formatEntry.getKey();
+ InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+ formatClass, conf);
+ List<Path> paths = formatEntry.getValue();
+
+ Map<Class<? extends Mapper>, List<Path>> mapperPaths
+ = new HashMap<Class<? extends Mapper>, List<Path>>();
+
+ // Now, for each set of paths that have a common InputFormat, build
+ // a map of Mappers to the paths they're used for
+ for (Path path : paths) {
+ Class<? extends Mapper> mapperClass = mapperMap.get(path);
+ if (!mapperPaths.containsKey(mapperClass)) {
+ mapperPaths.put(mapperClass, new LinkedList<Path>());
+ }
+
+ mapperPaths.get(mapperClass).add(path);
+ }
+
+ // Now each set of paths that has a common InputFormat and Mapper can
+ // be added to the same job, and split together.
+ for (Entry<Class<? extends Mapper>, List<Path>> mapEntry : mapperPaths
+ .entrySet()) {
+ paths = mapEntry.getValue();
+ Class<? extends Mapper> mapperClass = mapEntry.getKey();
+
+ if (mapperClass == null) {
+ mapperClass = conf.getMapperClass();
+ }
+
+ FileInputFormat.setInputPaths(confCopy, paths.toArray(new Path[paths
+ .size()]));
+
+ // Get splits for each input path and tag with InputFormat
+ // and Mapper types by wrapping in a TaggedInputSplit.
+ InputSplit[] pathSplits = format.getSplits(confCopy, numSplits);
+ for (InputSplit pathSplit : pathSplits) {
+ splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+ mapperClass));
+ }
+ }
+ }
+
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
+ Reporter reporter) throws IOException {
+
+ // Find the InputFormat and then the RecordReader from the
+ // TaggedInputSplit.
+
+ TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+ InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+ .newInstance(taggedInputSplit.getInputFormatClass(), conf);
+ return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
+ reporter);
+ }
+}
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
Fri Jul 18 03:27:37 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An [EMAIL PROTECTED] Mapper} that delegates behaviour of paths to multiple
other
+ * mappers.
+ *
+ * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ */
+public class DelegatingMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2,
V2> {
+
+ private JobConf conf;
+
+ private Mapper<K1, V1, K2, V2> mapper;
+
+ @SuppressWarnings("unchecked")
+ public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
+ Reporter reporter) throws IOException {
+
+ if (mapper == null) {
+ // Find the Mapper from the TaggedInputSplit.
+ TaggedInputSplit inputSplit = (TaggedInputSplit)
reporter.getInputSplit();
+ mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
+ .getMapperClass(), conf);
+ }
+ mapper.map(key, value, outputCollector, reporter);
+ }
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ public void close() throws IOException {
+ if (mapper != null) {
+ mapper.close();
+ }
+ }
+
+}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
Fri Jul 18 03:27:37 2008
@@ -21,7 +21,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -398,8 +401,105 @@
conf.set("mapred.input.dir", dirs == null ? dirStr :
dirs + StringUtils.COMMA_STR + dirStr);
}
+
+ /**
+ * Add a [EMAIL PROTECTED] Path} with a custom [EMAIL PROTECTED]
InputFormat} to the list of
+ * inputs for the map-reduce job.
+ *
+ * @param conf The configuration of the job
+ * @param path [EMAIL PROTECTED] Path} to be added to the list of inputs for
the job
+ * @param inputFormatClass [EMAIL PROTECTED] InputFormat} class to use for
this path
+ */
+ public static void addInputPath(JobConf conf, Path path,
+ Class<? extends InputFormat> inputFormatClass) {
+
+ String inputFormatMapping = path.toString() + ";"
+ + inputFormatClass.getName();
+ String inputFormats = conf.get("mapred.input.dir.formats");
+ conf.set("mapred.input.dir.formats",
+ inputFormats == null ? inputFormatMapping : inputFormats + ","
+ + inputFormatMapping);
+
+ conf.setInputFormat(DelegatingInputFormat.class);
+ }
+
+ /**
+ * Add a [EMAIL PROTECTED] Path} with a custom [EMAIL PROTECTED]
InputFormat} and
+ * [EMAIL PROTECTED] Mapper} to the list of inputs for the map-reduce job.
+ *
+ * @param conf The configuration of the job
+ * @param path [EMAIL PROTECTED] Path} to be added to the list of inputs for
the job
+ * @param inputFormatClass [EMAIL PROTECTED] InputFormat} class to use for
this path
+ * @param mapperClass [EMAIL PROTECTED] Mapper} class to use for this path
+ */
+ public static void addInputPath(JobConf conf, Path path,
+ Class<? extends InputFormat> inputFormatClass,
+ Class<? extends Mapper> mapperClass) {
+
+ addInputPath(conf, path, inputFormatClass);
+
+ String mapperMapping = path.toString() + ";" + mapperClass.getName();
+ String mappers = conf.get("mapred.input.dir.mappers");
+ conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+ : mappers + "," + mapperMapping);
+
+ conf.setMapperClass(DelegatingMapper.class);
+ }
- // This method escapes commas in the glob pattern of the given paths.
+ /**
+ * Retrieves a map of [EMAIL PROTECTED] Path}s to the [EMAIL PROTECTED]
InputFormat} class
+ * that should be used for them.
+ *
+ * @param conf The confuration of the job
+ * @see #addInputPath(JobConf, Path, Class)
+ * @return A map of paths to inputformats for the job
+ */
+ static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
+ Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+ String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+ for (String pathMapping : pathMappings) {
+ String[] split = pathMapping.split(";");
+ InputFormat inputFormat;
+ try {
+ inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+ .getClassByName(split[1]), conf);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ m.put(new Path(split[0]), inputFormat);
+ }
+ return m;
+ }
+
+ /**
+ * Retrieves a map of [EMAIL PROTECTED] Path}s to the [EMAIL PROTECTED]
Mapper} class that
+ * should be used for them.
+ *
+ * @param conf The confuration of the job
+ * @see #addInputPath(JobConf, Path, Class, Class)
+ * @return A map of paths to mappers for the job
+ */
+ @SuppressWarnings("unchecked")
+ static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
+ if (conf.get("mapred.input.dir.mappers") == null) {
+ return Collections.emptyMap();
+ }
+ Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends
Mapper>>();
+ String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+ for (String pathMapping : pathMappings) {
+ String[] split = pathMapping.split(";");
+ Class<? extends Mapper> mapClass;
+ try {
+ mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ m.put(new Path(split[0]), mapClass);
+ }
+ return m;
+ }
+
+ // This method escapes commas in the glob pattern of the given paths.
private static String[] getPathStrings(String commaSeparatedPaths) {
int length = commaSeparatedPaths.length();
int curlyOpen = 0;
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
Fri Jul 18 03:27:37 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An [EMAIL PROTECTED] InputSplit} that tags another InputSplit with extra
data for use by
+ * [EMAIL PROTECTED] DelegatingInputFormat}s and [EMAIL PROTECTED]
DelegatingMapper}s.
+ */
+public class TaggedInputSplit implements Configurable, InputSplit {
+
+ private Class<? extends InputSplit> inputSplitClass;
+
+ private InputSplit inputSplit;
+
+ private Class<? extends InputFormat> inputFormatClass;
+
+ private Class<? extends Mapper> mapperClass;
+
+ private Configuration conf;
+
+ public TaggedInputSplit() {
+ // Default constructor.
+ }
+
+ /**
+ * Creates a new TaggedInputSplit.
+ *
+ * @param inputSplit The InputSplit to be tagged
+ * @param conf The configuration to use
+ * @param inputFormatClass The InputFormat class to use for this job
+ * @param mapperClass The Mapper class to use for this job
+ */
+ public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+ Class<? extends InputFormat> inputFormatClass,
+ Class<? extends Mapper> mapperClass) {
+ this.inputSplitClass = inputSplit.getClass();
+ this.inputSplit = inputSplit;
+ this.conf = conf;
+ this.inputFormatClass = inputFormatClass;
+ this.mapperClass = mapperClass;
+ }
+
+ /**
+ * Retrieves the original InputSplit.
+ *
+ * @return The InputSplit that was tagged
+ */
+ public InputSplit getInputSplit() {
+ return inputSplit;
+ }
+
+ /**
+ * Retrieves the InputFormat class to use for this split.
+ *
+ * @return The InputFormat class to use
+ */
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return inputFormatClass;
+ }
+
+ /**
+ * Retrieves the Mapper class to use for this split.
+ *
+ * @return The Mapper class to use
+ */
+ public Class<? extends Mapper> getMapperClass() {
+ return mapperClass;
+ }
+
+ public long getLength() throws IOException {
+ return inputSplit.getLength();
+ }
+
+ public String[] getLocations() throws IOException {
+ return inputSplit.getLocations();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void readFields(DataInput in) throws IOException {
+ inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+ inputSplit = (InputSplit) ReflectionUtils
+ .newInstance(inputSplitClass, conf);
+ inputSplit.readFields(in);
+ inputFormatClass = (Class<? extends InputFormat>) readClass(in);
+ mapperClass = (Class<? extends Mapper>) readClass(in);
+ }
+
+ private Class<?> readClass(DataInput in) throws IOException {
+ String className = Text.readString(in);
+ try {
+ return conf.getClassByName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("readObject can't find class", e);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, inputSplitClass.getName());
+ inputSplit.write(out);
+ Text.writeString(out, inputFormatClass.getName());
+ Text.writeString(out, mapperClass.getName());
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+}
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java?rev=677872&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
Fri Jul 18 03:27:37 2008
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestDelegatingInputFormat extends TestCase {
+
+ public void testSplitting() throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("fs.hdfs.impl",
+ "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
+ MiniDFSCluster dfs = null;
+ try {
+ dfs = new MiniDFSCluster(conf, 4, true, new String[] { "/rack0",
+ "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1",
+ "host2", "host3" });
+ FileSystem fs = dfs.getFileSystem();
+
+ Path path = getPath("/foo/bar", fs);
+ Path path2 = getPath("/foo/baz", fs);
+ Path path3 = getPath("/bar/bar", fs);
+ Path path4 = getPath("/bar/baz", fs);
+
+ final int numSplits = 100;
+
+ FileInputFormat.addInputPath(conf, path, TextInputFormat.class,
+ MapClass.class);
+ FileInputFormat.addInputPath(conf, path2, TextInputFormat.class,
+ MapClass2.class);
+ FileInputFormat.addInputPath(conf, path3, KeyValueTextInputFormat.class,
+ MapClass.class);
+ FileInputFormat.addInputPath(conf, path4, TextInputFormat.class,
+ MapClass2.class);
+ DelegatingInputFormat inFormat = new DelegatingInputFormat();
+ InputSplit[] splits = inFormat.getSplits(conf, numSplits);
+
+ int[] bins = new int[3];
+ for (InputSplit split : splits) {
+ assertTrue(split instanceof TaggedInputSplit);
+ final TaggedInputSplit tis = (TaggedInputSplit) split;
+ int index = -1;
+
+ if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
+ // path3
+ index = 0;
+ } else if (tis.getMapperClass().equals(MapClass.class)) {
+ // path
+ index = 1;
+ } else {
+ // path2 and path4
+ index = 2;
+ }
+
+ bins[index]++;
+ }
+
+ // Each bin is a unique combination of a Mapper and InputFormat, and
+ // DelegatingInputFormat should split each bin into numSplits splits,
+ // regardless of the number of paths that use that Mapper/InputFormat
+ for (int count : bins) {
+ assertEquals(numSplits, count);
+ }
+
+ assertTrue(true);
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
+ static Path getPath(final String location, final FileSystem fs)
+ throws IOException {
+ Path path = new Path(location);
+
+ // create a multi-block file on hdfs
+ DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null);
+ for (int i = 0; i < 1000; ++i) {
+ out.writeChars("Hello\n");
+ }
+ out.close();
+
+ return path;
+ }
+
+ static class MapClass implements Mapper<String, String, String, String> {
+
+ public void map(String key, String value,
+ OutputCollector<String, String> output, Reporter reporter)
+ throws IOException {
+ }
+
+ public void configure(JobConf job) {
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ static class MapClass2 extends MapClass {
+ }
+
+}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
Fri Jul 18 03:27:37 2008
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.BlockLocation;
@@ -83,4 +85,52 @@
}
}
}
+
+ public void testAddInputPathWithFormat() {
+ final JobConf conf = new JobConf();
+ FileInputFormat.addInputPath(conf, new Path("/foo"),
TextInputFormat.class);
+ FileInputFormat.addInputPath(conf, new Path("/bar"),
+ KeyValueTextInputFormat.class);
+ final Map<Path, InputFormat> inputs = FileInputFormat
+ .getInputFormatMap(conf);
+ assertEquals(TextInputFormat.class, inputs.get(new
Path("/foo")).getClass());
+ assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+ .getClass());
+ }
+
+ public void testAddInputPathWithMapper() {
+ final JobConf conf = new JobConf();
+ FileInputFormat.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
+ MapClass.class);
+ FileInputFormat.addInputPath(conf, new Path("/bar"),
+ KeyValueTextInputFormat.class, MapClass2.class);
+ final Map<Path, InputFormat> inputs = FileInputFormat
+ .getInputFormatMap(conf);
+ final Map<Path, Class<? extends Mapper>> maps = FileInputFormat
+ .getMapperTypeMap(conf);
+
+ assertEquals(TextInputFormat.class, inputs.get(new
Path("/foo")).getClass());
+ assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+ .getClass());
+ assertEquals(MapClass.class, maps.get(new Path("/foo")));
+ assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+ }
+
+ static class MapClass implements Mapper<String, String, String, String> {
+
+ public void map(String key, String value,
+ OutputCollector<String, String> output, Reporter reporter)
+ throws IOException {
+ }
+
+ public void configure(JobConf job) {
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ static class MapClass2 extends MapClass {
+ }
+
}