Hello,

You can extend FileOutputFormat class. Here is an example:

public class MultipleTextOutputFormatByPredicates<K, V> extends
>               FileOutputFormat<K, V> {
>       protected static class MultipleOutputByPredicatesLineRecordWriter<K, V> 
> extends RecordWriter<K, V> {
>               private static final String utf8 = "UTF-8";
>               private static final byte[] newline;
>               static {
>                       try {
>                               newline = "\n".getBytes(utf8);
>                       } catch (UnsupportedEncodingException uee) {
>                               throw new IllegalArgumentException("can't find 
> " + utf8
>                                               + " encoding");
>                       }
>               }
>
>               protected TaskAttemptContext job;
>               protected CompressionCodec codec;
>               protected String extension = "";
>               protected Map<String, DataOutputStream> outMap;
>               private final byte[] keyValueSeparator;
>
>               public 
> MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec,
>                               String keyValueSeparator,
>                               TaskAttemptContext job) {
>                       this.job = job;
>                       this.codec = codec;
>                       if (null != codec)
>                               this.extension = codec.getDefaultExtension();
>                       try {
>                               this.keyValueSeparator = 
> keyValueSeparator.getBytes(utf8);
>                       } catch (UnsupportedEncodingException uee) {
>                               throw new IllegalArgumentException("can't find 
> " + utf8
>                                               + " encoding");
>                       }
>                       outMap = new HashMap<String, DataOutputStream>();
>               }
>
>               public 
> MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec, 
> TaskAttemptContext job) {
>                       this(codec, "\t", job);
>               }
>
>               /**
>                * Write the object to the byte stream, handling Text as a 
> special case.
>                *
>                * @param o
>                *            the object to print
>                * @throws IOException
>                *             if the write throws, we pass it on
>                */
>               private void writeObject(Object o, DataOutputStream out) throws 
> IOException {
>                       if (o instanceof Text) {
>                               Text to = (Text) o;
>                               out.write(to.getBytes(), 0, to.getLength());
>                       } else {
>                               out.write(o.toString().getBytes(utf8));
>                       }
>               }
>
>               public synchronized void write(K key, V value) throws 
> IOException {
>
>                       boolean nullKey = key == null || key instanceof 
> NullWritable;
>                       boolean nullValue = value == null || value instanceof 
> NullWritable;
>                       if (nullKey || nullValue) {
>                               return;
>                       }
>                       String sPredicate = key.toString().replace(':', '_');
>                       DataOutputStream out = outMap.get(sPredicate);
>                       if (null == out) {
>                               Path file = new 
> Path(job.getConfiguration().get("mapred.output.dir"), sPredicate);
>                               FileSystem fs = 
> file.getFileSystem(job.getConfiguration());
>                               FSDataOutputStream fileOut = fs.create(file, 
> false);
>                               outMap.put(sPredicate, fileOut);
>                               out = fileOut;
>                       }
>                       out.write(keyValueSeparator);
>                       writeObject(value, out);
>                       out.write(newline);
>               }
>
>               public synchronized void close(TaskAttemptContext context)
>                               throws IOException {
>                       Iterator<DataOutputStream> iter = 
> outMap.values().iterator();
>                       while (iter.hasNext())
>                               iter.next().close();
>               }
>       }
>
>       public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
>                       throws IOException, InterruptedException {
>               Configuration conf = job.getConfiguration();
>               boolean isCompressed = getCompressOutput(job);
>               String keyValueSeparator = conf.get(
>                               "mapred.textoutputformat.separator", "\t");
>               CompressionCodec codec = null;
>               if (isCompressed) {
>                       Class<? extends CompressionCodec> codecClass = 
> getOutputCompressorClass(
>                                       job, GzipCodec.class);
>                       codec = (CompressionCodec) 
> ReflectionUtils.newInstance(codecClass,
>                                       conf);
>               }
>               return new MultipleOutputByPredicatesLineRecordWriter<K, 
> V>(codec, keyValueSeparator, job);
>       }
> }
>
> Thanks,
Farhan

On Mon, Dec 28, 2009 at 12:27 PM, Huazhong Ning <[email protected]> wrote:

> Hi all,
>
> I need your help on multiple file output. I have many big files and I hope
> the processing result of each file is outputted to a separate file. I know
> in the old Hadoop APIs, the class MultipleOutputFormat works for this
> propose. But I cannot find the same class in new APIs. Does anybody know in
> the new APIs how to solve this problem?
> Thanks a lot.
>
> Ning, Huazhong
>
>
>

Reply via email to