I have developed a customized InputReader to Read NetCDF format files in
Hadoop, which you can see the code here:
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.InputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.io.NetCDFArrayWritable;
import java.util.List;
import ucar.nc2.*;
import ucar.nc2.iosp.*;
import ucar.nc2.iosp.netcdf3.*;
import ucar.unidata.io.*;
import ucar.nc2.dataset.*;
import ucar.ma2.Array;
import ucar.ma2.ArrayFloat;
import java.util.Arrays;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
public class NetCDFInputFormat extends FileInputFormat<Text,
NetCDFArrayWritable> {
private static final Log LOG
= LogFactory.getLog(NetCDFInputFormat.class.getName());
private NetCDFInfo getNetCDFInfo(Path file, FileSystem fs, JobConf
job)
{
//traverse header and return chunk start and size arrays
NetCDFInfo result = new NetCDFInfo();//library call
NetcdfFile ncFile;
Variable v;
ncFile = null;
try {
ncFile = NetcdfDataset.openFile(file.toString(), null);
v = ncFile.findVariable("rsut");
//List<Variable> vs = ncFile.getVariables();
//v = vs.get(vs.size()-1);
LOG.info("Variable is "+ v.getFullName());
result.fileSize = ncFile.vfileSize;
result.recStart = ncFile.vrecStart;
Long[] metaArray = v.reallyReadMeta().toArray(new
Long[(int)(ncFile.vnumRecs)]);
result.chunkStarts =ArrayUtils.toPrimitive(metaArray);
//result.chunkSizes = nc.chunkSizes;
result.numRecs = ncFile.vnumRecs;
result.recSize = ncFile.vrecSize;
result.smallRecSize = ncFile.vsmallRecSize;
//result.shape = v.shape;
} catch (Exception e)
{
LOG.info("Bad... "+ e);
}
try{if (ncFile!=null)ncFile.close();}catch (Exception e)
{LOG.info("Bad2... "+e);}
return result;
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job);
LOG.info( "[SAMAN] beginning of getSplits" );
LOG.info( "[SAMAN] " + files.length );
// Save the number of input files in the job-conf
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have
valid files
if (file.isDir()) {
throw new IOException("Not a file: "+ file.getPath());
}
LOG.info ("[net] adding "+file.getPath());
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
// minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
LOG.info("get file len of "+file.getPath());
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
NetCDFInfo netInfo = getNetCDFInfo(path, fs, job);
long recStart = netInfo.recStart;
long[] chunkSizes = netInfo.chunkSizes;
long[] chunkStarts = netInfo.chunkStarts;
long smallSize = netInfo.smallRecSize;
long recSize = netInfo.recSize;
long splitSize = 0;
int chunkIndex = 0;
long bytesRemaining = chunkStarts[chunkStarts.length-1] +
recSize - recStart - 2*smallSize;
long thisStart = recStart; //file position
long thisChunk = 0;
long blockNo = 1;
while ( bytesRemaining > 0) {
while ( chunkIndex < chunkStarts.length &&
chunkStarts[chunkIndex] < blockNo * blockSize ) {
chunkIndex++;
}
long tempStart = thisStart;
long endChunk;
if (chunkIndex >= chunkStarts.length) {
splitSize = chunkStarts[chunkStarts.length-1] + recSize -
thisStart - smallSize;
//bytesRemaining should be 0 after this round
}
else {
splitSize = chunkStarts[chunkIndex] - thisStart - smallSize;
thisStart = chunkStarts[chunkIndex];
}
endChunk = chunkIndex;
LOG.info("[net] split "+path+ " start "+tempStart+" size "
+splitSize +" from chunk " +thisChunk +" to "+ endChunk);
blockNo++;
String[] splitHosts = getSplitHosts(blkLocations, tempStart,
splitSize, clusterMap);
FileSplit split = new FileSplit(path, tempStart, splitSize,
splitHosts);
split.getFileSplit().startChunk = thisChunk;
split.getFileSplit().endChunk = endChunk;
splits.add(split);
bytesRemaining -= splitSize;
LOG.info("[net] split " +path+" remaining "+bytesRemaining);
thisChunk = endChunk;
}
} else if (length != 0) {
String[] splitHosts =
getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.info("Total # of splits: " + splits.size());
for (int i=0; i<splits.size(); i++)
{
FileSplit split = splits.get(i);
LOG.info("split "+(i+1)+" "+ split.getStart()+ "
"+split.getLength() + " " + split.getFileSplit().startChunk +" "+
split.getFileSplit().endChunk);
}
InputSplit[] test = splits.toArray(new FileSplit[splits.size()]);
for( int i = 0; i < test.length; i++ ){
LOG.info( "[SAMAN] Final Split " +
((FileSplit)test[i]).getLength() + " " + ((FileSplit)test[i]).getPath() );
}
return splits.toArray(new FileSplit[splits.size()]);
}
@Override
public RecordReader<Text, NetCDFArrayWritable> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new NetCDFReaderWithCaching(job, (FileSplit) genericSplit);
}
}
And also here is my record reader:
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.NetCDFArrayWritable;
import java.util.List;
import ucar.nc2.*;
import ucar.nc2.iosp.*;
import ucar.nc2.iosp.netcdf3.*;
import ucar.unidata.io.*;
import ucar.nc2.dataset.*;
import ucar.ma2.Array;
import ucar.ma2.ArrayFloat;
import java.util.Arrays;
/**
* Treats keys as offset in file and value as array.
*/
public class NetCDFReaderWithCaching implements RecordReader<Text,
NetCDFArrayWritable> {
private static final Log LOG
= LogFactory.getLog(NetCDFReaderWithCaching.class.getName());
private long start;
private long pos;
private long pos1D = 0; // 1D Index for dimensions [2:n]
private long end;
private int nDims;
private NetcdfFile ncFile;
private Variable v;
private List<Dimension> dimensions;
private long max1DSize = 1; // total number of elements could be
stored in dimensions [2:n]
private Text key = new Text();
private NetCDFArrayWritable value = new NetCDFArrayWritable();
private String sectionLocator = null;
private Array chunk = null;
private float[] my;
private FloatWritable[] fw;
private long previousTime = System.nanoTime();
public NetCDFReaderWithCaching( Configuration job, FileSplit split )
throws IOException {
initialize( split );
}
public void initialize(FileSplit genericSplit) throws IOException {
FileSplit fSplit = (FileSplit) genericSplit;
start = fSplit.getFileSplit().startChunk; //split.getStart();
end = fSplit.getFileSplit().endChunk; //start + split.getLength();
final Path file = fSplit.getPath();
LOG.info("Map is reading from input: " + file +" start chunk "+
start+" end chunk "+end);
ncFile = NetcdfDataset.openFile(file.toString(), null);
List<Variable> vs = ncFile.getVariables();
v = vs.get(vs.size()-1);
LOG.info("Variable is "+ v.getFullName());
dimensions = v.getDimensions();
this.pos = start;
StringBuilder sb = new StringBuilder();
for (int i=0; i<dimensions.size()-1; i++) {
sb.append(",:");
}
sectionLocator = sb.toString();
/*
for ( int i=0; i < dimensions.size(); i++ ){
max1DSize *= dimensions.get(i).getLength();
}
*/
//max1DSize = chunk.getSize();
//LOG.info( "max1DSize is " + max1DSize );
}
private void convert1DtoND( long OneDIndex, long[] NDIndex ){
for( int i = dimensions.size()-1; i >= 1; i-- ){
int temp = 1;
for( int j = dimensions.size()-1; j > i; j-- ){
temp *= dimensions.get(j).getLength();
}
temp = (int)(OneDIndex / temp);
temp = temp % dimensions.get(i).getLength();
NDIndex[i-1] = temp;
}
}
public Text createKey(){
return new Text();
}
public NetCDFArrayWritable createValue(){
return new NetCDFArrayWritable();
}
/** Read a line. */
public synchronized boolean next( Text key, NetCDFArrayWritable value
)
throws IOException {
long time1, time2, time3, time4, time5, time6;
long[] NDIndex = new long[dimensions.size()-1];
time1 = System.nanoTime();
convert1DtoND( pos1D, NDIndex );
time2 = System.nanoTime();
String keyword = new String();
keyword += pos;
for ( int i = 0; i < NDIndex.length; i++ )
keyword = keyword + "-" +NDIndex[i];
time3 = System.nanoTime();
time4 = System.nanoTime();
time5 = 0;
LOG.info( "[SAMAN] position is " + pos + " and end is " + end );
if (pos < end) {
key.set(keyword);
time5=0;
time4 = System.nanoTime();
if( pos1D == 0 ){
try{
LOG.info( "[SAMAN]
rsut("+pos+":"+pos+sectionLocator+")" );
chunk =
ncFile.readSection("rsut("+pos+":"+pos+sectionLocator+")");
time5 = System.nanoTime();
if (chunk == null) {LOG.info("chunk is null");return
false;}
LOG.info(chunk.getSize()+" elements and
"+chunk.getSizeBytes()+" bytes, shape is
"+Arrays.toString(chunk.getShape()));
max1DSize = chunk.getSize();
my = (float[])chunk.get1DJavaArray(Float.class);
fw = new FloatWritable[my.length];
for (int i=0; i< fw.length; i++) {
fw[i]=new FloatWritable(my[i]);
}
} catch (ucar.ma2.InvalidRangeException e)
{
LOG.info("section error " + e);
}
}
LOG.info("[YIQI] "+(System.nanoTime()-previousTime));
previousTime = System.nanoTime();
FloatWritable[] fwResult = new FloatWritable[dimensions.size()+1];
fwResult[0] = new FloatWritable((float)pos);
for( int i = 1; i < dimensions.size(); i++ ){
fwResult[i] = new FloatWritable((float)(NDIndex[i-1]));
}
fwResult[dimensions.size()] = new FloatWritable(
(float)my[(int)pos1D] );
// ADDED BY SAMAN
String floatNumbers = new String();
//for( int i = 0; i < fw.length; i++ ){
// floatNumbers = floatNumbers + fw[i] + " ";
//}
//LOG.info( "[SAMAN] " + floatNumbers );
String location = new String();
location = location + pos + ":" + pos;
for( int i = 0; i < NDIndex.length; i++ ){
location = location + "," + NDIndex[i] + ":" + NDIndex[i];
}
////LOG.info( "[SAMAN] location was (" + location + ")" );
value.set(fwResult);
if( pos1D == max1DSize-1 ){
////LOG.info( "[SAMAN] pos1D " + pos1D + " reached " +
max1DSize );
pos++;
pos1D=0;
}else{
////LOG.info( "[SAMAN] pos1D " + pos1D + " less than " +
max1DSize );
pos1D++;
}
//pos ++;
return true;
}
LOG.info("Reaching chunk end");
return false;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
if (ncFile != null) {
ncFile.close();
}
}
}
I have tested these classes with a sample Hadoop code and everything is
working fine. I have tried to integrate this code with hive. As a result I
have implemented a SerDe class as below:
package org.apache.hadoop.hive.serde2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.NetCDFArrayWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.CharArrayReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* Created by saman on 7/31/15.
*/
public final class NetCDFSerde extends AbstractSerDe {
public static final Log LOG =
LogFactory.getLog(NetCDFSerde.class.getName());
private ObjectInspector inspector;
private String[] outputFields;
private int numCols;
private List<Float> row;
@Override
public void initialize(final Configuration conf, final Properties
tbl) throws SerDeException {
System.out.println( "[Serde] initializing Serde!" );
final List<String> columnNames =
Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS)
.split(","));
numCols = columnNames.size();
final List<ObjectInspector> columnOIs = new
ArrayList<ObjectInspector>(numCols);
for (int i = 0; i < numCols; i++) {
columnOIs.add(PrimitiveObjectInspectorFactory.javaFloatObjectInspector);
}
inspector =
ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,
columnOIs);
outputFields = new String[numCols];
row = new ArrayList<Float>(numCols);
for (int i = 0; i < numCols; i++) {
row.add(null);
}
}
private char getProperty(final Properties tbl, final String
property, final char def) {
final String val = tbl.getProperty(property);
if (val != null) {
return val.charAt(0);
}
return def;
}
@Override
public Writable serialize(Object obj, ObjectInspector objInspector)
throws SerDeException {
System.out.println( "[Serde] We are in Serialize, which returns
null." );
//return null;
final StructObjectInspector outputRowOI =
(StructObjectInspector) objInspector;
final List<? extends StructField> outputFieldRefs =
outputRowOI.getAllStructFieldRefs();
if (outputFieldRefs.size() != numCols) {
throw new SerDeException("Cannot serialize the object
because there are "
+ outputFieldRefs.size() + " fields but the table has "
+ numCols + " columns.");
}
FloatWritable[] floatArray = new FloatWritable[numCols];
for (int c = 0; c < numCols; c++) {
final Object field =
outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c));
final ObjectInspector fieldOI =
outputFieldRefs.get(c).getFieldObjectInspector();
final FloatObjectInspector fieldStringOI =
(FloatObjectInspector) fieldOI;
//outputFields[c] =
fieldStringOI.getPrimitiveJavaObject(field);
System.out.println( "[Serde] " +
(Float)(fieldStringOI.getPrimitiveJavaObject(field)) );
floatArray[c].set((Float)(fieldStringOI.getPrimitiveJavaObject(field)));
}
NetCDFArrayWritable result = new NetCDFArrayWritable();
result.set( floatArray );
return result;
}
@Override
public Object deserialize(final Writable blob) throws
SerDeException {
FloatWritable[] records =
(FloatWritable[])(((NetCDFArrayWritable)blob).toArray());
System.out.println( "[Serde] Im in deserialize" );
try{
for( int i = 0; i < numCols; i++ ){
System.out.println( "[Serde] I've got " +
records[i].get() );
row.set( i, Float.valueOf( records[i].get() ) );
}
return row;
}catch ( final Exception e ){
throw new SerDeException();
}
}
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return inspector;
}
@Override
public Class<? extends Writable> getSerializedClass() {
return NetCDFArrayWritable.class;
}
@Override
public SerDeStats getSerDeStats() {
return null;
}
}
Then I have tried this code to see if I can run Hive queries on the table
I've created. When Hive doesn't create any mapreduce job, it works all
fine, like `SELECT * FROM netcdf;` but when the query gets more complicated
like `SELECT MAX(val) FROM netcdf;` and it creates map reduce jobs, I
receive below error:
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.exec.MapOperator.getNominalPath(MapOperator.java:399)
at
org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:459)
at
org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1071)
at
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:503)
at
org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
... 8 more
Printing all logs show me that my input reader works fine, but I don't know
what is wrong with Hive that it cannot retrieve file paths in the Map Tasks.
Could anybody help me with this issue?