How to provide the multiple avro inputs to Two avroMappers and same mapper
outputs to single avroReducer.

written code for single input to one avroMappers and same mapper outputs to
single avroReducer.




import java.io.File;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.AvroReducer;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class Test extends Configured implements Tool {
        
        static class TargetMapper extends AvroMapper<GenericRecord, 
Pair&lt;Utf8,
GenericRecord>> {
            public void map(GenericRecord line, AvroCollector<Pair&lt;Utf8,
GenericRecord>> collector, Reporter reporter) throws IOException {
                try{                                                            
                        collector.collect(new Pair<Utf8, GenericRecord>("data", 
line));
                }
                catch (Exception e) {
                                System.out.println("Error message "+ 
e.getMessage());
                        }
            }
        }
        
        static class TargetReducer extends AvroReducer<Utf8, GenericRecord,
GenericRecord> {                
            
                public void reduce(Utf8 key, Iterable<GenericRecord> values,
AvroCollector<GenericRecord> collector, Reporter reporter) throws
IOException 
            {
                        try{
                        Configuration conf = getConf();                         
                        Path inputDir = new Path("/user/cts336339/avro");
                            Schema output_schema = readOutputSchema(inputDir, 
conf);
                            //System.out.println("output_schema " + 
output_schema);                     
                        
                        GenericRecord output = new 
GenericData.Record(output_schema); //
Record for output
                        System.out.println("inside for loop" + output);
                        GenericRecord location = new
GenericData.Record(output_schema.getField("Location").schema());        //Record
for Location Field
                        GenericRecord data = new
GenericData.Record(output_schema.getField("Data").schema());            
//Record for
Data Field
        
                        for (GenericRecord value : values){
                                //System.out.println("inside for loop" + value);
                                location.put("Location_key",value.get("locid"));
                                location.put("Location_value","0");
                        
                                data.put("Data_key","0");
                                data.put("Data_value",value.get("rec"));
                                
                                output.put("Location",location);
                                output.put("Data",data);
                        }               
                        collector.collect(output);
                        }catch(Exception e){
                                System.out.println("Error message "+ 
e.getMessage());
                        }
            }
          }

          @Override
          public int run(String[] args) throws Exception {
            
            if (args.length != 3) {
              System.err.printf(
                "Usage: %s [generic options] <input> <output> <schema-file>\n",
                getClass().getSimpleName());
              ToolRunner.printGenericCommandUsage(System.err);
              return -1;
            }       
            String input = args[0];
            String output = args[1];
            String schemaFile = args[2];
            //System.out.println("schema1" + schemaFile);           
            
            JobConf conf = new JobConf(getConf(), getClass());      
            conf.setJobName("Avro");
                    
            FileInputFormat.addInputPath(conf, new Path(input));
            FileOutputFormat.setOutputPath(conf, new Path(output));
            
            Path inputDir = new Path("/user/cts336339/avro");
            Schema schema = readInputSchema(inputDir, conf);
            
            Schema schema1 = new Schema.Parser().parse(new File(schemaFile));
            //System.out.println("schema1" + schema1);      
            
            AvroJob.setInputSchema(conf, schema);           
            AvroJob.setMapOutputSchema(conf,
Pair.getPairSchema(schema.create(Schema.Type.STRING), schema));
            AvroJob.setOutputSchema(conf, schema1);
            
            AvroJob.setMapperClass(conf, TargetMapper.class);
            AvroJob.setReducerClass(conf, TargetReducer.class);
          
            JobClient.runJob(conf); 
            return 0;
          }
          
          private Schema readInputSchema(Path inputDir, Configuration conf) 
throws
IOException {
                    FsInput fsInput = null;
                    FileReader reader = null;
                    try {
                              fsInput = new FsInput(new Path(inputDir, 
"Users2.avro"), conf);
                              reader = DataFileReader.openReader(fsInput, new
GenericDatumReader());
                              return reader.getSchema();
                    } finally {
                              IOUtils.closeStream(fsInput);
                              IOUtils.closeStream(reader);
                    }
          }       
          private static Schema readOutputSchema(Path inputDir, Configuration 
conf)
throws IOException {
                    FsInput fsInput = null;
                    FileReader reader = null;
                    try {
                              fsInput = new FsInput(new Path(inputDir, 
"AvroWithNull.avro"),
conf);
                              reader = DataFileReader.openReader(fsInput, new
GenericDatumReader());
                      return reader.getSchema();
                    } finally {
                              IOUtils.closeStream(fsInput);
                              IOUtils.closeStream(reader);
                    }
          }       
          public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new Test(), args);
            System.exit(exitCode);
          }
}




--
View this message in context: 
http://apache-avro.679487.n3.nabble.com/How-to-provide-the-multiple-avro-inputs-to-Two-avroMappers-and-same-mapper-outputs-to-single-avroRed-tp4026644.html
Sent from the Avro - Developers mailing list archive at Nabble.com.

Reply via email to