ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)
Fixes #27 Signed-off-by: Owen O'Malley <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/3bb5ce53 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/3bb5ce53 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/3bb5ce53 Branch: refs/heads/master Commit: 3bb5ce532180fcaa03fa6d13d5829a18a153ad6e Parents: 545fe37 Author: Owen O'Malley <[email protected]> Authored: Tue May 24 14:28:36 2016 -0700 Committer: Owen O'Malley <[email protected]> Committed: Wed Jun 1 20:56:28 2016 -0700 ---------------------------------------------------------------------- java/core/src/java/org/apache/orc/OrcConf.java | 31 +- java/mapreduce/pom.xml | 7 + .../org/apache/orc/mapred/OrcInputFormat.java | 62 ++- .../src/java/org/apache/orc/mapred/OrcKey.java | 90 +++ .../src/java/org/apache/orc/mapred/OrcMap.java | 1 - .../orc/mapred/OrcMapredRecordReader.java | 551 +++++++++++++++++++ .../orc/mapred/OrcMapredRecordWriter.java | 283 ++++++++++ .../org/apache/orc/mapred/OrcOutputFormat.java | 45 +- .../org/apache/orc/mapred/OrcRecordReader.java | 547 ------------------ .../org/apache/orc/mapred/OrcRecordWriter.java | 277 ---------- .../java/org/apache/orc/mapred/OrcStruct.java | 8 + .../org/apache/orc/mapred/OrcTimestamp.java | 2 +- .../java/org/apache/orc/mapred/OrcValue.java | 69 +++ .../apache/orc/mapreduce/OrcInputFormat.java | 71 +++ .../orc/mapreduce/OrcMapreduceRecordReader.java | 119 ++++ .../orc/mapreduce/OrcMapreduceRecordWriter.java | 83 +++ .../apache/orc/mapreduce/OrcOutputFormat.java | 70 +++ .../test/org/apache/orc/mapred/TestMrUnit.java | 223 ++++++++ .../apache/orc/mapred/TestOrcOutputFormat.java | 299 ++++++++++ .../org/apache/orc/mapred/TestOrcStruct.java | 5 + .../orc/mapred/other/TestOrcOutputFormat.java | 249 --------- .../mapreduce/TestMapreduceOrcOutputFormat.java | 214 +++++++ .../org/apache/orc/mapreduce/TestMrUnit.java | 203 +++++++ java/pom.xml | 1 + 24 files changed, 2393 insertions(+), 1117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/core/src/java/org/apache/orc/OrcConf.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index df1b410..a575e18 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -101,9 +101,18 @@ public enum OrcConf { "The maximum size of the file to read for finding the file tail. This\n" + "is primarily used for streaming ingest to read intermediate\n" + "footers while the file is still open"), - SCHEMA("orc.schema", "orc.schema", null, - "The schema that the user desires to read or write. The values are\n" + + MAPRED_INPUT_SCHEMA("orc.mapred.input.schema", null, null, + "The schema that the user desires to read. The values are\n" + "interpreted using TypeDescription.fromString."), + MAPRED_SHUFFLE_KEY_SCHEMA("orc.mapred.map.output.key.schema", null, null, + "The schema of the MapReduce shuffle key. The values are\n" + + "interpreted using TypeDescription.fromString."), + MAPRED_SHUFFLE_VALUE_SCHEMA("orc.mapred.map.output.value.schema", null, null, + "The schema of the MapReduce shuffle value. The values are\n" + + "interpreted using TypeDescription.fromString."), + MAPRED_OUTPUT_SCHEMA("orc.mapred.output.schema", null, null, + "The schema that the user desires to write. The values are\n" + + "interpreted using TypeDescription.fromString."), INCLUDE_COLUMNS("orc.include.columns", "hive.io.file.readcolumn.ids", null, "The list of comma separated column ids that should be read with 0\n" + "being the first column, 1 being the next, and so on. ."), @@ -151,7 +160,7 @@ public enum OrcConf { } if (result == null && conf != null) { result = conf.get(attribute); - if (result == null) { + if (result == null && hiveConfName != null) { result = conf.get(hiveConfName); } } @@ -170,6 +179,10 @@ public enum OrcConf { return getLong(null, conf); } + public void setLong(Configuration conf, long value) { + conf.setLong(attribute, value); + } + public String getString(Properties tbl, Configuration conf) { String value = lookupValue(tbl, conf); return value == null ? (String) defaultValue : value; @@ -179,6 +192,10 @@ public enum OrcConf { return getString(null, conf); } + public void setString(Configuration conf, String value) { + conf.set(attribute, value); + } + public boolean getBoolean(Properties tbl, Configuration conf) { String value = lookupValue(tbl, conf); if (value != null) { @@ -191,6 +208,10 @@ public enum OrcConf { return getBoolean(null, conf); } + public void setBoolean(Configuration conf, boolean value) { + conf.setBoolean(attribute, value); + } + public double getDouble(Properties tbl, Configuration conf) { String value = lookupValue(tbl, conf); if (value != null) { @@ -202,4 +223,8 @@ public enum OrcConf { public double getDouble(Configuration conf) { return getDouble(null, conf); } + + public void setDouble(Configuration conf, double value) { + conf.setDouble(attribute, value); + } } http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml index 3b38a40..0b48c82 100644 --- a/java/mapreduce/pom.xml +++ b/java/mapreduce/pom.xml @@ -118,6 +118,13 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.mrunit</groupId> + <artifactId>mrunit</artifactId> + <version>${mrunit.version}</version> + <scope>test</scope> + <classifier>hadoop2</classifier> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>${mockito.version}</version> http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java index 78e75f7..ac8ca61 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java @@ -25,7 +25,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; @@ -46,7 +46,7 @@ import org.apache.orc.TypeDescription; /** * A MapReduce/Hive input format for ORC files. */ -public class OrcInputFormat<V extends Writable> +public class OrcInputFormat<V extends WritableComparable> extends FileInputFormat<NullWritable, V> { /** @@ -56,7 +56,8 @@ public class OrcInputFormat<V extends Writable> * @param columnsStr the comma separated list of column ids * @return a boolean array */ - static boolean[] parseInclude(TypeDescription schema, String columnsStr) { + public static boolean[] parseInclude(TypeDescription schema, + String columnsStr) { if (columnsStr == null || schema.getCategory() != TypeDescription.Category.STRUCT) { return null; @@ -82,39 +83,41 @@ public class OrcInputFormat<V extends Writable> public static void setSearchArgument(Configuration conf, SearchArgument sarg, String[] columnNames) { - Output out = new Output(); + Output out = new Output(100000); new Kryo().writeObject(out, sarg); - conf.set(OrcConf.KRYO_SARG.getAttribute(), - Base64.encodeBase64String(out.toBytes())); + OrcConf.KRYO_SARG.setString(conf, Base64.encodeBase64String(out.toBytes())); StringBuilder buffer = new StringBuilder(); - for(int i=0; i < columnNames.length; ++i) { + for (int i = 0; i < columnNames.length; ++i) { if (i != 0) { buffer.append(','); } buffer.append(columnNames[i]); } - conf.set(OrcConf.SARG_COLUMNS.getAttribute(), buffer.toString()); + OrcConf.SARG_COLUMNS.setString(conf, buffer.toString()); } - @Override - public RecordReader<NullWritable, V> - getRecordReader(InputSplit inputSplit, - JobConf conf, - Reporter reporter) throws IOException { - FileSplit split = (FileSplit) inputSplit; - Reader file = OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf) - .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + /** + * Build the Reader.Options object based on the JobConf and the range of + * bytes. + * @param conf the job configuratoin + * @param start the byte offset to start reader + * @param length the number of bytes to read + * @return the options to read with + */ + public static Reader.Options buildOptions(Configuration conf, + Reader reader, + long start, + long length) { TypeDescription schema = - TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)); + TypeDescription.fromString(OrcConf.MAPRED_INPUT_SCHEMA.getString(conf)); Reader.Options options = new Reader.Options() - .range(split.getStart(), split.getLength()) + .range(start, length) .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); - if (schema == null) { - schema = file.getSchema(); - } else { + if (schema != null) { options.schema(schema); + } else { + schema = reader.getSchema(); } options.include(parseInclude(schema, OrcConf.INCLUDE_COLUMNS.getString(conf))); @@ -126,6 +129,19 @@ public class OrcInputFormat<V extends Writable> new Kryo().readObject(new Input(sargBytes), SearchArgumentImpl.class); options.searchArgument(sarg, sargColumns.split(",")); } - return new OrcRecordReader(file, options); + return options; + } + + @Override + public RecordReader<NullWritable, V> + getRecordReader(InputSplit inputSplit, + JobConf conf, + Reporter reporter) throws IOException { + FileSplit split = (FileSplit) inputSplit; + Reader file = OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + return new OrcMapredRecordReader<>(file, buildOptions(conf, + file, split.getStart(), split.getLength())); } } http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java new file mode 100644 index 0000000..ad94e32 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This type provides a wrapper for OrcStruct so that it can be sent through + * the MapReduce shuffle as a key. + * + * The user should set the JobConf with orc.mapred.key.type with the type + * string of the type. + */ +public final class OrcKey + implements WritableComparable<OrcKey>, JobConfigurable { + + public WritableComparable key; + + public OrcKey(WritableComparable key) { + this.key = key; + } + + public OrcKey() { + key = null; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + key.write(dataOutput); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + key.readFields(dataInput); + } + + @Override + public void configure(JobConf conf) { + if (key == null) { + TypeDescription schema = + TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA + .getString(conf)); + key = OrcStruct.createValue(schema); + } + } + + @Override + public int compareTo(OrcKey o) { + return key.compareTo(o.key); + } + + @Override + public boolean equals(Object o) { + if (o == null || key == null) { + return false; + } else if (o.getClass() != getClass()) { + return false; + } else { + return key.equals(((OrcKey) o).key); + } + } + + @Override + public int hashCode() { + return key.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java index cf47827..38c152c 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java @@ -45,7 +45,6 @@ public final class OrcMap<K extends WritableComparable, @Override public void write(DataOutput output) throws IOException { - Iterator<Map.Entry<K,V>> itr = entrySet().iterator(); output.writeInt(size()); for(Map.Entry<K,V> entry: entrySet()) { K key = entry.getKey(); http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java new file mode 100644 index 0000000..ddbc396 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +/** + * This record reader implements the org.apache.hadoop.mapred API. + * @param <V> the root type of the file + */ +public class OrcMapredRecordReader<V extends WritableComparable> + implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> { + private final TypeDescription schema; + private final RecordReader batchReader; + private final VectorizedRowBatch batch; + private int rowInBatch; + + protected OrcMapredRecordReader(Reader fileReader, + Reader.Options options) throws IOException { + this.batchReader = fileReader.rows(options); + if (options.getSchema() == null) { + schema = fileReader.getSchema(); + } else { + schema = options.getSchema(); + } + this.batch = schema.createRowBatch(); + rowInBatch = 0; + } + + /** + * If the current batch is empty, get a new one. + * @return true if we have rows available. + * @throws IOException + */ + boolean ensureBatch() throws IOException { + if (rowInBatch >= batch.size) { + rowInBatch = 0; + return batchReader.nextBatch(batch); + } + return true; + } + + @Override + public boolean next(NullWritable key, V value) throws IOException { + if (!ensureBatch()) { + return false; + } + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + OrcStruct result = (OrcStruct) value; + List<TypeDescription> children = schema.getChildren(); + int numberOfChildren = children.size(); + for(int i=0; i < numberOfChildren; ++i) { + result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch, + children.get(i), result.getFieldValue(i))); + } + } else { + nextValue(batch.cols[0], rowInBatch, schema, value); + } + rowInBatch += 1; + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public V createValue() { + return (V) OrcStruct.createValue(schema); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + batchReader.close(); + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + static BooleanWritable nextBoolean(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BooleanWritable result; + if (previous == null || previous.getClass() != BooleanWritable.class) { + result = new BooleanWritable(); + } else { + result = (BooleanWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row] != 0); + return result; + } else { + return null; + } + } + + static ByteWritable nextByte(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ByteWritable result; + if (previous == null || previous.getClass() != ByteWritable.class) { + result = new ByteWritable(); + } else { + result = (ByteWritable) previous; + } + result.set((byte) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static ShortWritable nextShort(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ShortWritable result; + if (previous == null || previous.getClass() != ShortWritable.class) { + result = new ShortWritable(); + } else { + result = (ShortWritable) previous; + } + result.set((short) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static IntWritable nextInt(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + IntWritable result; + if (previous == null || previous.getClass() != IntWritable.class) { + result = new IntWritable(); + } else { + result = (IntWritable) previous; + } + result.set((int) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static LongWritable nextLong(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + LongWritable result; + if (previous == null || previous.getClass() != LongWritable.class) { + result = new LongWritable(); + } else { + result = (LongWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static FloatWritable nextFloat(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + FloatWritable result; + if (previous == null || previous.getClass() != FloatWritable.class) { + result = new FloatWritable(); + } else { + result = (FloatWritable) previous; + } + result.set((float) ((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static DoubleWritable nextDouble(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DoubleWritable result; + if (previous == null || previous.getClass() != DoubleWritable.class) { + result = new DoubleWritable(); + } else { + result = (DoubleWritable) previous; + } + result.set(((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static Text nextString(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + Text result; + if (previous == null || previous.getClass() != Text.class) { + result = new Text(); + } else { + result = (Text) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + static BytesWritable nextBinary(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BytesWritable result; + if (previous == null || previous.getClass() != BytesWritable.class) { + result = new BytesWritable(); + } else { + result = (BytesWritable) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + static HiveDecimalWritable nextDecimal(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + HiveDecimalWritable result; + if (previous == null || previous.getClass() != HiveDecimalWritable.class) { + result = new HiveDecimalWritable(); + } else { + result = (HiveDecimalWritable) previous; + } + result.set(((DecimalColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static DateWritable nextDate(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DateWritable result; + if (previous == null || previous.getClass() != DateWritable.class) { + result = new DateWritable(); + } else { + result = (DateWritable) previous; + } + int date = (int) ((LongColumnVector) vector).vector[row]; + result.set(date); + return result; + } else { + return null; + } + } + + static OrcTimestamp nextTimestamp(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcTimestamp result; + if (previous == null || previous.getClass() != OrcTimestamp.class) { + result = new OrcTimestamp(); + } else { + result = (OrcTimestamp) previous; + } + TimestampColumnVector tcv = (TimestampColumnVector) vector; + result.setTime(tcv.time[row]); + result.setNanos(tcv.nanos[row]); + return result; + } else { + return null; + } + } + + static OrcStruct nextStruct(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcStruct result; + List<TypeDescription> childrenTypes = schema.getChildren(); + int numChildren = childrenTypes.size(); + if (previous == null || previous.getClass() != OrcStruct.class) { + result = new OrcStruct(schema); + } else { + result = (OrcStruct) previous; + } + StructColumnVector struct = (StructColumnVector) vector; + for(int f=0; f < numChildren; ++f) { + result.setFieldValue(f, nextValue(struct.fields[f], row, + childrenTypes.get(f), result.getFieldValue(f))); + } + return result; + } else { + return null; + } + } + + static OrcUnion nextUnion(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcUnion result; + List<TypeDescription> childrenTypes = schema.getChildren(); + if (previous == null || previous.getClass() != OrcUnion.class) { + result = new OrcUnion(schema); + } else { + result = (OrcUnion) previous; + } + UnionColumnVector union = (UnionColumnVector) vector; + byte tag = (byte) union.tags[row]; + result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag), + result.getObject())); + return result; + } else { + return null; + } + } + + static OrcList nextList(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcList result; + List<TypeDescription> childrenTypes = schema.getChildren(); + TypeDescription valueType = childrenTypes.get(0); + if (previous == null || + previous.getClass() != ArrayList.class) { + result = new OrcList(schema); + } else { + result = (OrcList) previous; + } + ListColumnVector list = (ListColumnVector) vector; + int length = (int) list.lengths[row]; + int offset = (int) list.offsets[row]; + result.ensureCapacity(length); + int oldLength = result.size(); + int idx = 0; + while (idx < length && idx < oldLength) { + result.set(idx, nextValue(list.child, offset + idx, valueType, + result.get(idx))); + idx += 1; + } + if (length < oldLength) { + for(int i= oldLength - 1; i >= length; --i) { + result.remove(i); + } + } else if (oldLength < length) { + while (idx < length) { + result.add(nextValue(list.child, offset + idx, valueType, null)); + idx += 1; + } + } + return result; + } else { + return null; + } + } + + static OrcMap nextMap(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + MapColumnVector map = (MapColumnVector) vector; + int length = (int) map.lengths[row]; + int offset = (int) map.offsets[row]; + OrcMap result; + List<TypeDescription> childrenTypes = schema.getChildren(); + TypeDescription keyType = childrenTypes.get(0); + TypeDescription valueType = childrenTypes.get(1); + if (previous == null || + previous.getClass() != OrcMap.class) { + result = new OrcMap(schema); + } else { + result = (OrcMap) previous; + // I couldn't think of a good way to reuse the keys and value objects + // without even more allocations, so take the easy and safe approach. + result.clear(); + } + for(int e=0; e < length; ++e) { + result.put(nextValue(map.keys, e + offset, keyType, null), + nextValue(map.values, e + offset, valueType, null)); + } + return result; + } else { + return null; + } + } + + public static WritableComparable nextValue(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + switch (schema.getCategory()) { + case BOOLEAN: + return nextBoolean(vector, row, previous); + case BYTE: + return nextByte(vector, row, previous); + case SHORT: + return nextShort(vector, row, previous); + case INT: + return nextInt(vector, row, previous); + case LONG: + return nextLong(vector, row, previous); + case FLOAT: + return nextFloat(vector, row, previous); + case DOUBLE: + return nextDouble(vector, row, previous); + case STRING: + case CHAR: + case VARCHAR: + return nextString(vector, row, previous); + case BINARY: + return nextBinary(vector, row, previous); + case DECIMAL: + return nextDecimal(vector, row, previous); + case DATE: + return nextDate(vector, row, previous); + case TIMESTAMP: + return nextTimestamp(vector, row, previous); + case STRUCT: + return nextStruct(vector, row, schema, previous); + case UNION: + return nextUnion(vector, row, schema, previous); + case LIST: + return nextList(vector, row, schema, previous); + case MAP: + return nextMap(vector, row, schema, previous); + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java new file mode 100644 index 0000000..59f89f7 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class OrcMapredRecordWriter<V extends Writable> + implements RecordWriter<NullWritable, V> { + private final Writer writer; + private final VectorizedRowBatch batch; + private final TypeDescription schema; + private final boolean isTopStruct; + + public OrcMapredRecordWriter(Writer writer) { + this.writer = writer; + schema = writer.getSchema(); + this.batch = schema.createRowBatch(); + isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; + } + + static void setLongValue(ColumnVector vector, int row, long value) { + ((LongColumnVector) vector).vector[row] = value; + } + + static void setDoubleValue(ColumnVector vector, int row, double value) { + ((DoubleColumnVector) vector).vector[row] = value; + } + + static void setBinaryValue(ColumnVector vector, int row, + BinaryComparable value) { + ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, + value.getLength()); + } + + static void setBinaryValue(ColumnVector vector, int row, + BinaryComparable value, int maxLength) { + ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, + Math.min(maxLength, value.getLength())); + } + + private static final ThreadLocal<byte[]> SPACE_BUFFER = + new ThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + byte[] result = new byte[100]; + Arrays.fill(result, (byte) ' '); + return result; + } + }; + + static void setCharValue(BytesColumnVector vector, + int row, + Text value, + int length) { + // we need to trim or pad the string with spaces to required length + int actualLength = value.getLength(); + if (actualLength >= length) { + setBinaryValue(vector, row, value, length); + } else { + byte[] spaces = SPACE_BUFFER.get(); + if (length - actualLength > spaces.length) { + spaces = new byte[length - actualLength]; + Arrays.fill(spaces, (byte)' '); + SPACE_BUFFER.set(spaces); + } + vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0, + length - actualLength); + } + } + + static void setStructValue(TypeDescription schema, + StructColumnVector vector, + int row, + OrcStruct value) { + List<TypeDescription> children = schema.getChildren(); + for(int c=0; c < value.getNumFields(); ++c) { + setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c)); + } + } + + static void setUnionValue(TypeDescription schema, + UnionColumnVector vector, + int row, + OrcUnion value) { + List<TypeDescription> children = schema.getChildren(); + int tag = value.getTag() & 0xff; + vector.tags[row] = tag; + setColumn(children.get(tag), vector.fields[tag], row, value.getObject()); + } + + + static void setListValue(TypeDescription schema, + ListColumnVector vector, + int row, + OrcList value) { + TypeDescription elemType = schema.getChildren().get(0); + vector.offsets[row] = vector.childCount; + vector.lengths[row] = value.size(); + vector.childCount += vector.lengths[row]; + vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0); + for(int e=0; e < vector.lengths[row]; ++e) { + setColumn(elemType, vector.child, (int) vector.offsets[row] + e, + (Writable) value.get(e)); + } + } + + static void setMapValue(TypeDescription schema, + MapColumnVector vector, + int row, + OrcMap<?,?> value) { + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + vector.offsets[row] = vector.childCount; + vector.lengths[row] = value.size(); + vector.childCount += vector.lengths[row]; + vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0); + vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0); + int e = 0; + for(Map.Entry<?,?> entry: value.entrySet()) { + setColumn(keyType, vector.keys, (int) vector.offsets[row] + e, + (Writable) entry.getKey()); + setColumn(valueType, vector.values, (int) vector.offsets[row] + e, + (Writable) entry.getValue()); + e += 1; + } + } + + public static void setColumn(TypeDescription schema, + ColumnVector vector, + int row, + Writable value) { + if (value == null) { + vector.noNulls = false; + vector.isNull[row] = true; + } else { + switch (schema.getCategory()) { + case BOOLEAN: + setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0); + break; + case BYTE: + setLongValue(vector, row, ((ByteWritable) value).get()); + break; + case SHORT: + setLongValue(vector, row, ((ShortWritable) value).get()); + break; + case INT: + setLongValue(vector, row, ((IntWritable) value).get()); + break; + case LONG: + setLongValue(vector, row, ((LongWritable) value).get()); + break; + case FLOAT: + setDoubleValue(vector, row, ((FloatWritable) value).get()); + break; + case DOUBLE: + setDoubleValue(vector, row, ((DoubleWritable) value).get()); + break; + case STRING: + setBinaryValue(vector, row, (Text) value); + break; + case CHAR: + setCharValue((BytesColumnVector) vector, row, (Text) value, + schema.getMaxLength()); + break; + case VARCHAR: + setBinaryValue(vector, row, (Text) value, schema.getMaxLength()); + break; + case BINARY: + setBinaryValue(vector, row, (BytesWritable) value); + break; + case DATE: + setLongValue(vector, row, ((DateWritable) value).getDays()); + break; + case TIMESTAMP: + ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value); + break; + case DECIMAL: + ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value); + break; + case STRUCT: + setStructValue(schema, (StructColumnVector) vector, row, + (OrcStruct) value); + break; + case UNION: + setUnionValue(schema, (UnionColumnVector) vector, row, + (OrcUnion) value); + break; + case LIST: + setListValue(schema, (ListColumnVector) vector, row, (OrcList) value); + break; + case MAP: + setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } + } + + @Override + public void write(NullWritable nullWritable, V v) throws IOException { + // if the batch is full, write it out. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + + // add the new row + int row = batch.size++; + // skip over the OrcKey or OrcValue + if (v instanceof OrcKey) { + v = (V)((OrcKey) v).key; + } else if (v instanceof OrcValue) { + v = (V)((OrcValue) v).value; + } + if (isTopStruct) { + for(int f=0; f < schema.getChildren().size(); ++f) { + setColumn(schema.getChildren().get(f), batch.cols[f], row, + ((OrcStruct) v).getFieldValue(f)); + } + } else { + setColumn(schema, batch.cols[0], row, v); + } + } + + @Override + public void close(Reporter reporter) throws IOException { + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java index 6186c83..341fbcd 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java @@ -18,6 +18,7 @@ package org.apache.orc.mapred; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -34,9 +35,35 @@ import org.apache.orc.Writer; import java.io.IOException; +/** + * An ORC output format that satisfies the org.apache.hadoop.mapred API. + */ public class OrcOutputFormat<V extends Writable> extends FileOutputFormat<NullWritable, V> { + /** + * This function builds the options for the ORC Writer based on the JobConf. + * @param conf the job configuration + * @return a new options object + */ + public static OrcFile.WriterOptions buildOptions(Configuration conf) { + return OrcFile.writerOptions(conf) + .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf))) + .setSchema(TypeDescription.fromString(OrcConf.MAPRED_OUTPUT_SCHEMA + .getString(conf))) + .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf))) + .encodingStrategy(OrcFile.EncodingStrategy.valueOf + (OrcConf.ENCODING_STRATEGY.getString(conf))) + .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf)) + .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf)) + .blockSize(OrcConf.BLOCK_SIZE.getLong(conf)) + .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf)) + .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf)) + .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf)) + .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf)) + .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf)); + } + @Override public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem, JobConf conf, @@ -45,21 +72,7 @@ public class OrcOutputFormat<V extends Writable> ) throws IOException { Path path = getTaskOutputPath(conf, name); Writer writer = OrcFile.createWriter(path, - OrcFile.writerOptions(conf) - .fileSystem(fileSystem) - .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf))) - .setSchema(TypeDescription.fromString(OrcConf.SCHEMA.getString(conf))) - .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf))) - .encodingStrategy(OrcFile.EncodingStrategy.valueOf - (OrcConf.ENCODING_STRATEGY.getString(conf))) - .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf)) - .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf)) - .blockSize(OrcConf.BLOCK_SIZE.getLong(conf)) - .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf)) - .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf)) - .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf)) - .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf)) - .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf))); - return new OrcRecordWriter<>(writer); + buildOptions(conf).fileSystem(fileSystem)); + return new OrcMapredRecordWriter<>(writer); } } http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java deleted file mode 100644 index f6bf635..0000000 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java +++ /dev/null @@ -1,547 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.mapred; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; - -public class OrcRecordReader<V extends WritableComparable> - implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> { - private final TypeDescription schema; - private final RecordReader batchReader; - private final VectorizedRowBatch batch; - private int rowInBatch; - - protected OrcRecordReader(Reader fileReader, - Reader.Options options) throws IOException { - this.batchReader = fileReader.rows(options); - if (options.getSchema() == null) { - schema = fileReader.getSchema(); - } else { - schema = options.getSchema(); - } - this.batch = schema.createRowBatch(); - rowInBatch = 0; - } - - /** - * If the current batch is empty, get a new one. - * @return true if we have rows available. - * @throws IOException - */ - boolean ensureBatch() throws IOException { - if (rowInBatch >= batch.size) { - rowInBatch = 0; - return batchReader.nextBatch(batch); - } - return true; - } - - @Override - public boolean next(NullWritable key, V value) throws IOException { - if (!ensureBatch()) { - return false; - } - if (schema.getCategory() == TypeDescription.Category.STRUCT) { - OrcStruct result = (OrcStruct) value; - List<TypeDescription> children = schema.getChildren(); - int numberOfChildren = children.size(); - for(int i=0; i < numberOfChildren; ++i) { - result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch, - children.get(i), result.getFieldValue(i))); - } - } else { - nextValue(batch.cols[0], rowInBatch, schema, value); - } - rowInBatch += 1; - return true; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public V createValue() { - return (V) OrcStruct.createValue(schema); - } - - @Override - public long getPos() throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - batchReader.close(); - } - - @Override - public float getProgress() throws IOException { - return 0; - } - - static BooleanWritable nextBoolean(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - BooleanWritable result; - if (previous == null || previous.getClass() != BooleanWritable.class) { - result = new BooleanWritable(); - } else { - result = (BooleanWritable) previous; - } - result.set(((LongColumnVector) vector).vector[row] != 0); - return result; - } else { - return null; - } - } - - static ByteWritable nextByte(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - ByteWritable result; - if (previous == null || previous.getClass() != ByteWritable.class) { - result = new ByteWritable(); - } else { - result = (ByteWritable) previous; - } - result.set((byte) ((LongColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static ShortWritable nextShort(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - ShortWritable result; - if (previous == null || previous.getClass() != ShortWritable.class) { - result = new ShortWritable(); - } else { - result = (ShortWritable) previous; - } - result.set((short) ((LongColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static IntWritable nextInt(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - IntWritable result; - if (previous == null || previous.getClass() != IntWritable.class) { - result = new IntWritable(); - } else { - result = (IntWritable) previous; - } - result.set((int) ((LongColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static LongWritable nextLong(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - LongWritable result; - if (previous == null || previous.getClass() != LongWritable.class) { - result = new LongWritable(); - } else { - result = (LongWritable) previous; - } - result.set(((LongColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static FloatWritable nextFloat(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - FloatWritable result; - if (previous == null || previous.getClass() != FloatWritable.class) { - result = new FloatWritable(); - } else { - result = (FloatWritable) previous; - } - result.set((float) ((DoubleColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static DoubleWritable nextDouble(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - DoubleWritable result; - if (previous == null || previous.getClass() != DoubleWritable.class) { - result = new DoubleWritable(); - } else { - result = (DoubleWritable) previous; - } - result.set(((DoubleColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static Text nextString(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - Text result; - if (previous == null || previous.getClass() != Text.class) { - result = new Text(); - } else { - result = (Text) previous; - } - BytesColumnVector bytes = (BytesColumnVector) vector; - result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); - return result; - } else { - return null; - } - } - - static BytesWritable nextBinary(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - BytesWritable result; - if (previous == null || previous.getClass() != BytesWritable.class) { - result = new BytesWritable(); - } else { - result = (BytesWritable) previous; - } - BytesColumnVector bytes = (BytesColumnVector) vector; - result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); - return result; - } else { - return null; - } - } - - static HiveDecimalWritable nextDecimal(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - HiveDecimalWritable result; - if (previous == null || previous.getClass() != HiveDecimalWritable.class) { - result = new HiveDecimalWritable(); - } else { - result = (HiveDecimalWritable) previous; - } - result.set(((DecimalColumnVector) vector).vector[row]); - return result; - } else { - return null; - } - } - - static DateWritable nextDate(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - DateWritable result; - if (previous == null || previous.getClass() != DateWritable.class) { - result = new DateWritable(); - } else { - result = (DateWritable) previous; - } - int date = (int) ((LongColumnVector) vector).vector[row]; - result.set(date); - return result; - } else { - return null; - } - } - - static OrcTimestamp nextTimestamp(ColumnVector vector, - int row, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - OrcTimestamp result; - if (previous == null || previous.getClass() != OrcTimestamp.class) { - result = new OrcTimestamp(); - } else { - result = (OrcTimestamp) previous; - } - TimestampColumnVector tcv = (TimestampColumnVector) vector; - result.setTime(tcv.time[row]); - result.setNanos(tcv.nanos[row]); - return result; - } else { - return null; - } - } - - static OrcStruct nextStruct(ColumnVector vector, - int row, - TypeDescription schema, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - OrcStruct result; - List<TypeDescription> childrenTypes = schema.getChildren(); - int numChildren = childrenTypes.size(); - if (previous == null || previous.getClass() != OrcStruct.class) { - result = new OrcStruct(schema); - } else { - result = (OrcStruct) previous; - } - StructColumnVector struct = (StructColumnVector) vector; - for(int f=0; f < numChildren; ++f) { - result.setFieldValue(f, nextValue(struct.fields[f], row, - childrenTypes.get(f), result.getFieldValue(f))); - } - return result; - } else { - return null; - } - } - - static OrcUnion nextUnion(ColumnVector vector, - int row, - TypeDescription schema, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - OrcUnion result; - List<TypeDescription> childrenTypes = schema.getChildren(); - if (previous == null || previous.getClass() != OrcUnion.class) { - result = new OrcUnion(schema); - } else { - result = (OrcUnion) previous; - } - UnionColumnVector union = (UnionColumnVector) vector; - byte tag = (byte) union.tags[row]; - result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag), - result.getObject())); - return result; - } else { - return null; - } - } - - static OrcList nextList(ColumnVector vector, - int row, - TypeDescription schema, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - OrcList result; - List<TypeDescription> childrenTypes = schema.getChildren(); - TypeDescription valueType = childrenTypes.get(0); - if (previous == null || - previous.getClass() != ArrayList.class) { - result = new OrcList(schema); - } else { - result = (OrcList) previous; - } - ListColumnVector list = (ListColumnVector) vector; - int length = (int) list.lengths[row]; - int offset = (int) list.offsets[row]; - result.ensureCapacity(length); - int oldLength = result.size(); - int idx = 0; - while (idx < length && idx < oldLength) { - result.set(idx, nextValue(list.child, offset + idx, valueType, - result.get(idx))); - idx += 1; - } - if (length < oldLength) { - for(int i= oldLength - 1; i >= length; --i) { - result.remove(i); - } - } else if (oldLength < length) { - while (idx < length) { - result.add(nextValue(list.child, offset + idx, valueType, null)); - idx += 1; - } - } - return result; - } else { - return null; - } - } - - static OrcMap nextMap(ColumnVector vector, - int row, - TypeDescription schema, - Object previous) { - if (vector.isRepeating) { - row = 0; - } - if (vector.noNulls || !vector.isNull[row]) { - MapColumnVector map = (MapColumnVector) vector; - int length = (int) map.lengths[row]; - int offset = (int) map.offsets[row]; - OrcMap result; - List<TypeDescription> childrenTypes = schema.getChildren(); - TypeDescription keyType = childrenTypes.get(0); - TypeDescription valueType = childrenTypes.get(1); - if (previous == null || - previous.getClass() != OrcMap.class) { - result = new OrcMap(schema); - } else { - result = (OrcMap) previous; - // I couldn't think of a good way to reuse the keys and value objects - // without even more allocations, so take the easy and safe approach. - result.clear(); - } - for(int e=0; e < length; ++e) { - result.put(nextValue(map.keys, e + offset, keyType, null), - nextValue(map.values, e + offset, valueType, null)); - } - return result; - } else { - return null; - } - } - - static WritableComparable nextValue(ColumnVector vector, - int row, - TypeDescription schema, - Object previous) { - switch (schema.getCategory()) { - case BOOLEAN: - return nextBoolean(vector, row, previous); - case BYTE: - return nextByte(vector, row, previous); - case SHORT: - return nextShort(vector, row, previous); - case INT: - return nextInt(vector, row, previous); - case LONG: - return nextLong(vector, row, previous); - case FLOAT: - return nextFloat(vector, row, previous); - case DOUBLE: - return nextDouble(vector, row, previous); - case STRING: - case CHAR: - case VARCHAR: - return nextString(vector, row, previous); - case BINARY: - return nextBinary(vector, row, previous); - case DECIMAL: - return nextDecimal(vector, row, previous); - case DATE: - return nextDate(vector, row, previous); - case TIMESTAMP: - return nextTimestamp(vector, row, previous); - case STRUCT: - return nextStruct(vector, row, schema, previous); - case UNION: - return nextUnion(vector, row, schema, previous); - case LIST: - return nextList(vector, row, schema, previous); - case MAP: - return nextMap(vector, row, schema, previous); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java deleted file mode 100644 index 4237656..0000000 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.mapred; - -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class OrcRecordWriter<V extends Writable> - implements RecordWriter<NullWritable, V> { - private final Writer writer; - private final VectorizedRowBatch batch; - private final TypeDescription schema; - private final boolean isTopStruct; - - public OrcRecordWriter(Writer writer) { - this.writer = writer; - schema = writer.getSchema(); - this.batch = schema.createRowBatch(); - isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; - } - - static void setLongValue(ColumnVector vector, int row, long value) { - ((LongColumnVector) vector).vector[row] = value; - } - - static void setDoubleValue(ColumnVector vector, int row, double value) { - ((DoubleColumnVector) vector).vector[row] = value; - } - - static void setBinaryValue(ColumnVector vector, int row, - BinaryComparable value) { - ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, - value.getLength()); - } - - static void setBinaryValue(ColumnVector vector, int row, - BinaryComparable value, int maxLength) { - ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, - Math.min(maxLength, value.getLength())); - } - - private static final ThreadLocal<byte[]> SPACE_BUFFER = - new ThreadLocal<byte[]>() { - @Override - protected byte[] initialValue() { - byte[] result = new byte[100]; - Arrays.fill(result, (byte) ' '); - return result; - } - }; - - static void setCharValue(BytesColumnVector vector, - int row, - Text value, - int length) { - // we need to trim or pad the string with spaces to required length - int actualLength = value.getLength(); - if (actualLength >= length) { - setBinaryValue(vector, row, value, length); - } else { - byte[] spaces = SPACE_BUFFER.get(); - if (length - actualLength > spaces.length) { - spaces = new byte[length - actualLength]; - Arrays.fill(spaces, (byte)' '); - SPACE_BUFFER.set(spaces); - } - vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0, - length - actualLength); - } - } - - static void setStructValue(TypeDescription schema, - StructColumnVector vector, - int row, - OrcStruct value) { - List<TypeDescription> children = schema.getChildren(); - for(int c=0; c < value.getNumFields(); ++c) { - setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c)); - } - } - - static void setUnionValue(TypeDescription schema, - UnionColumnVector vector, - int row, - OrcUnion value) { - List<TypeDescription> children = schema.getChildren(); - int tag = value.getTag() & 0xff; - vector.tags[row] = tag; - setColumn(children.get(tag), vector.fields[tag], row, value.getObject()); - } - - - static void setListValue(TypeDescription schema, - ListColumnVector vector, - int row, - OrcList value) { - TypeDescription elemType = schema.getChildren().get(0); - vector.offsets[row] = vector.childCount; - vector.lengths[row] = value.size(); - vector.childCount += vector.lengths[row]; - vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0); - for(int e=0; e < vector.lengths[row]; ++e) { - setColumn(elemType, vector.child, (int) vector.offsets[row] + e, - (Writable) value.get(e)); - } - } - - static void setMapValue(TypeDescription schema, - MapColumnVector vector, - int row, - OrcMap<?,?> value) { - TypeDescription keyType = schema.getChildren().get(0); - TypeDescription valueType = schema.getChildren().get(1); - vector.offsets[row] = vector.childCount; - vector.lengths[row] = value.size(); - vector.childCount += vector.lengths[row]; - vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0); - vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0); - int e = 0; - for(Map.Entry<?,?> entry: value.entrySet()) { - setColumn(keyType, vector.keys, (int) vector.offsets[row] + e, - (Writable) entry.getKey()); - setColumn(valueType, vector.values, (int) vector.offsets[row] + e, - (Writable) entry.getValue()); - e += 1; - } - } - - static void setColumn(TypeDescription schema, - ColumnVector vector, - int row, - Writable value) { - if (value == null) { - vector.noNulls = false; - vector.isNull[row] = true; - } else { - switch (schema.getCategory()) { - case BOOLEAN: - setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0); - break; - case BYTE: - setLongValue(vector, row, ((ByteWritable) value).get()); - break; - case SHORT: - setLongValue(vector, row, ((ShortWritable) value).get()); - break; - case INT: - setLongValue(vector, row, ((IntWritable) value).get()); - break; - case LONG: - setLongValue(vector, row, ((LongWritable) value).get()); - break; - case FLOAT: - setDoubleValue(vector, row, ((FloatWritable) value).get()); - break; - case DOUBLE: - setDoubleValue(vector, row, ((DoubleWritable) value).get()); - break; - case STRING: - setBinaryValue(vector, row, (Text) value); - break; - case CHAR: - setCharValue((BytesColumnVector) vector, row, (Text) value, - schema.getMaxLength()); - break; - case VARCHAR: - setBinaryValue(vector, row, (Text) value, schema.getMaxLength()); - break; - case BINARY: - setBinaryValue(vector, row, (BytesWritable) value); - break; - case DATE: - setLongValue(vector, row, ((DateWritable) value).getDays()); - break; - case TIMESTAMP: - ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value); - break; - case DECIMAL: - ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value); - break; - case STRUCT: - setStructValue(schema, (StructColumnVector) vector, row, - (OrcStruct) value); - break; - case UNION: - setUnionValue(schema, (UnionColumnVector) vector, row, - (OrcUnion) value); - break; - case LIST: - setListValue(schema, (ListColumnVector) vector, row, (OrcList) value); - break; - case MAP: - setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value); - break; - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - } - - @Override - public void write(NullWritable nullWritable, V v) throws IOException { - // if the batch is full, write it out. - if (batch.size == batch.getMaxSize()) { - writer.addRowBatch(batch); - batch.reset(); - } - - // add the new row - int row = batch.size++; - if (isTopStruct) { - for(int f=0; f < schema.getChildren().size(); ++f) { - setColumn(schema.getChildren().get(f), batch.cols[f], row, - ((OrcStruct) v).getFieldValue(f)); - } - } else { - setColumn(schema, batch.cols[0], row, v); - } - } - - @Override - public void close(Reporter reporter) throws IOException { - if (batch.size != 0) { - writer.addRowBatch(batch); - batch.reset(); - } - writer.close(); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java index 2dd749b..51e0d60 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java @@ -82,6 +82,14 @@ public final class OrcStruct implements WritableComparable<OrcStruct> { } /** + * Get the schema for this object. + * @return the schema object + */ + public TypeDescription getSchema() { + return schema; + } + + /** * Set all of the fields in the struct * @param values the list of values for each of the fields. */ http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java index ee97b9f..5564177 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java @@ -17,12 +17,12 @@ */ package org.apache.orc.mapred; - import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Date; import java.sql.Timestamp; import java.util.Date; http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java new file mode 100644 index 0000000..dc9912d --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This type provides a wrapper for OrcStruct so that it can be sent through + * the MapReduce shuffle as a value. + * + * The user should set the JobConf with orc.mapred.value.type with the type + * string of the type. + */ +public final class OrcValue implements Writable, JobConfigurable { + + public WritableComparable value; + + public OrcValue(WritableComparable value) { + this.value = value; + } + + public OrcValue() { + value = null; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + value.write(dataOutput); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + value.readFields(dataInput); + } + + @Override + public void configure(JobConf conf) { + if (value == null) { + TypeDescription schema = + TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA + .getString(conf)); + value = OrcStruct.createValue(schema); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java new file mode 100644 index 0000000..9427e78 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + + +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; + +/** + * An ORC input format that satisfies the org.apache.hadoop.mapreduce API. + */ +public class OrcInputFormat<V extends WritableComparable> + extends FileInputFormat<NullWritable, V> { + + /** + * Put the given SearchArgument into the configuration for an OrcInputFormat. + * @param conf the configuration to modify + * @param sarg the SearchArgument to put in the configuration + * @param columnNames the list of column names for the SearchArgument + */ + public static void setSearchArgument(Configuration conf, + SearchArgument sarg, + String[] columnNames) { + org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg, + columnNames); + } + + @Override + public RecordReader<NullWritable, V> + createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext + ) throws IOException, InterruptedException { + FileSplit split = (FileSplit) inputSplit; + Configuration conf = taskAttemptContext.getConfiguration(); + Reader file = OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + return new OrcMapreduceRecordReader<>(file, + org.apache.orc.mapred.OrcInputFormat.buildOptions(conf, + file, split.getStart(), split.getLength())); + } +}
