http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml new file mode 100644 index 0000000..8792e1c --- /dev/null +++ b/java/mapreduce/pom.xml @@ -0,0 +1,148 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.orc</groupId> + <artifactId>orc</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>orc-mapreduce</artifactId> + <packaging>jar</packaging> + <name>ORC MapReduce</name> + + <dependencies> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>2.0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>1.1.0-SNAPSHOT</version> + </dependency> + + <!-- inter-project --> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <version>${kryo.version}</version> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>${commons-codec.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test inter-project --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java new file mode 100644 index 0000000..78e75f7 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; + +/** + * A MapReduce/Hive input format for ORC files. + */ +public class OrcInputFormat<V extends Writable> + extends FileInputFormat<NullWritable, V> { + + /** + * Convert a string with a comma separated list of column ids into the + * array of boolean that match the schemas. + * @param schema the schema for the reader + * @param columnsStr the comma separated list of column ids + * @return a boolean array + */ + static boolean[] parseInclude(TypeDescription schema, String columnsStr) { + if (columnsStr == null || + schema.getCategory() != TypeDescription.Category.STRUCT) { + return null; + } + boolean[] result = new boolean[schema.getMaximumId() + 1]; + result[0] = true; + List<TypeDescription> types = schema.getChildren(); + for(String idString: columnsStr.split(",")) { + TypeDescription type = types.get(Integer.parseInt(idString)); + for(int c=type.getId(); c <= type.getMaximumId(); ++c) { + result[c] = true; + } + } + return result; + } + + /** + * Put the given SearchArgument into the configuration for an OrcInputFormat. + * @param conf the configuration to modify + * @param sarg the SearchArgument to put in the configuration + * @param columnNames the list of column names for the SearchArgument + */ + public static void setSearchArgument(Configuration conf, + SearchArgument sarg, + String[] columnNames) { + Output out = new Output(); + new Kryo().writeObject(out, sarg); + conf.set(OrcConf.KRYO_SARG.getAttribute(), + Base64.encodeBase64String(out.toBytes())); + StringBuilder buffer = new StringBuilder(); + for(int i=0; i < columnNames.length; ++i) { + if (i != 0) { + buffer.append(','); + } + buffer.append(columnNames[i]); + } + conf.set(OrcConf.SARG_COLUMNS.getAttribute(), buffer.toString()); + } + + @Override + public RecordReader<NullWritable, V> + getRecordReader(InputSplit inputSplit, + JobConf conf, + Reporter reporter) throws IOException { + FileSplit split = (FileSplit) inputSplit; + Reader file = OrcFile.createReader(split.getPath(), + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))); + TypeDescription schema = + TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)); + Reader.Options options = new Reader.Options() + .range(split.getStart(), split.getLength()) + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); + if (schema == null) { + schema = file.getSchema(); + } else { + options.schema(schema); + } + options.include(parseInclude(schema, + OrcConf.INCLUDE_COLUMNS.getString(conf))); + String kryoSarg = OrcConf.KRYO_SARG.getString(conf); + String sargColumns = OrcConf.SARG_COLUMNS.getString(conf); + if (kryoSarg != null && sargColumns != null) { + byte[] sargBytes = Base64.decodeBase64(kryoSarg); + SearchArgument sarg = + new Kryo().readObject(new Input(sargBytes), SearchArgumentImpl.class); + options.searchArgument(sarg, sargColumns.split(",")); + } + return new OrcRecordReader(file, options); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java new file mode 100644 index 0000000..2b94207 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * An ArrayList implementation that implements Writable. + * @param <E> the element type, which must be Writable + */ +public class OrcList<E extends Writable> + extends ArrayList<E> implements Writable { + private final TypeDescription childSchema; + + public OrcList(TypeDescription schema) { + childSchema = schema.getChildren().get(0); + } + + public OrcList(TypeDescription schema, int initialCapacity) { + super(initialCapacity); + childSchema = schema.getChildren().get(0); + } + + @Override + public void write(DataOutput output) throws IOException { + Iterator<E> itr = iterator(); + output.writeInt(size()); + while (itr.hasNext()) { + E obj = itr.next(); + output.writeBoolean(obj != null); + if (obj != null) { + obj.write(output); + } + } + } + + @Override + public void readFields(DataInput input) throws IOException { + clear(); + int size = input.readInt(); + ensureCapacity(size); + for(int i=0; i < size; ++i) { + if (input.readBoolean()) { + E obj = (E) OrcStruct.createValue(childSchema); + obj.readFields(input); + add(obj); + } else { + add(null); + } + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java new file mode 100644 index 0000000..db961fc --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +/** + * A TreeMap implementation that implements Writable. + * @param <K> the key type, which must be Writable + * @param <V> the value type, which must be Writable + */ +public final class OrcMap<K extends Writable, V extends Writable> + extends TreeMap<K, V> implements Writable { + private final TypeDescription keySchema; + private final TypeDescription valueSchema; + + public OrcMap(TypeDescription schema) { + keySchema = schema.getChildren().get(0); + valueSchema = schema.getChildren().get(1); + } + + @Override + public void write(DataOutput output) throws IOException { + Iterator<Map.Entry<K,V>> itr = entrySet().iterator(); + output.writeInt(size()); + for(Map.Entry<K,V> entry: entrySet()) { + K key = entry.getKey(); + V value = entry.getValue(); + output.writeByte((key == null ? 0 : 2) | (value == null ? 0 : 1)); + if (key != null) { + key.write(output); + } + if (value != null) { + value.write(output); + } + } + } + + @Override + public void readFields(DataInput input) throws IOException { + clear(); + int size = input.readInt(); + for(int i=0; i < size; ++i) { + byte flag = input.readByte(); + K key; + V value; + if ((flag & 2) != 0) { + key = (K) OrcStruct.createValue(keySchema); + key.readFields(input); + } else { + key = null; + } + if ((flag & 1) != 0) { + value = (V) OrcStruct.createValue(valueSchema); + value.readFields(input); + } else { + value = null; + } + put(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java new file mode 100644 index 0000000..6186c83 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.util.Progressable; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; + +public class OrcOutputFormat<V extends Writable> + extends FileOutputFormat<NullWritable, V> { + + @Override + public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem, + JobConf conf, + String name, + Progressable progressable + ) throws IOException { + Path path = getTaskOutputPath(conf, name); + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf) + .fileSystem(fileSystem) + .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf))) + .setSchema(TypeDescription.fromString(OrcConf.SCHEMA.getString(conf))) + .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf))) + .encodingStrategy(OrcFile.EncodingStrategy.valueOf + (OrcConf.ENCODING_STRATEGY.getString(conf))) + .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf)) + .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf)) + .blockSize(OrcConf.BLOCK_SIZE.getLong(conf)) + .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf)) + .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf)) + .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf)) + .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf)) + .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf))); + return new OrcRecordWriter<>(writer); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java new file mode 100644 index 0000000..0370ae5 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +public class OrcRecordReader<V extends Writable> + implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> { + private final TypeDescription schema; + private final RecordReader batchReader; + private final VectorizedRowBatch batch; + private int rowInBatch; + + protected OrcRecordReader(Reader fileReader, + Reader.Options options) throws IOException { + this.batchReader = fileReader.rows(options); + if (options.getSchema() == null) { + schema = fileReader.getSchema(); + } else { + schema = options.getSchema(); + } + this.batch = schema.createRowBatch(); + rowInBatch = 0; + } + + /** + * If the current batch is empty, get a new one. + * @return true if we have rows available. + * @throws IOException + */ + boolean ensureBatch() throws IOException { + if (rowInBatch >= batch.size) { + rowInBatch = 0; + return batchReader.nextBatch(batch); + } + return true; + } + + @Override + public boolean next(NullWritable key, V value) throws IOException { + if (!ensureBatch()) { + return false; + } + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + OrcStruct result = (OrcStruct) value; + List<TypeDescription> children = schema.getChildren(); + int numberOfChildren = children.size(); + for(int i=0; i < numberOfChildren; ++i) { + result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch, + children.get(i), result.getFieldValue(i))); + } + } else { + nextValue(batch.cols[0], rowInBatch, schema, value); + } + rowInBatch += 1; + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public V createValue() { + return (V) OrcStruct.createValue(schema); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + batchReader.close(); + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + static BooleanWritable nextBoolean(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BooleanWritable result; + if (previous == null || previous.getClass() != BooleanWritable.class) { + result = new BooleanWritable(); + } else { + result = (BooleanWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row] != 0); + return result; + } else { + return null; + } + } + + static ByteWritable nextByte(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ByteWritable result; + if (previous == null || previous.getClass() != ByteWritable.class) { + result = new ByteWritable(); + } else { + result = (ByteWritable) previous; + } + result.set((byte) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static ShortWritable nextShort(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ShortWritable result; + if (previous == null || previous.getClass() != ShortWritable.class) { + result = new ShortWritable(); + } else { + result = (ShortWritable) previous; + } + result.set((short) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static IntWritable nextInt(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + IntWritable result; + if (previous == null || previous.getClass() != IntWritable.class) { + result = new IntWritable(); + } else { + result = (IntWritable) previous; + } + result.set((int) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static LongWritable nextLong(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + LongWritable result; + if (previous == null || previous.getClass() != LongWritable.class) { + result = new LongWritable(); + } else { + result = (LongWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static FloatWritable nextFloat(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + FloatWritable result; + if (previous == null || previous.getClass() != FloatWritable.class) { + result = new FloatWritable(); + } else { + result = (FloatWritable) previous; + } + result.set((float) ((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static DoubleWritable nextDouble(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DoubleWritable result; + if (previous == null || previous.getClass() != DoubleWritable.class) { + result = new DoubleWritable(); + } else { + result = (DoubleWritable) previous; + } + result.set(((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static Text nextString(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + Text result; + if (previous == null || previous.getClass() != Text.class) { + result = new Text(); + } else { + result = (Text) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + static BytesWritable nextBinary(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BytesWritable result; + if (previous == null || previous.getClass() != BytesWritable.class) { + result = new BytesWritable(); + } else { + result = (BytesWritable) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + static HiveDecimalWritable nextDecimal(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + HiveDecimalWritable result; + if (previous == null || previous.getClass() != HiveDecimalWritable.class) { + result = new HiveDecimalWritable(); + } else { + result = (HiveDecimalWritable) previous; + } + result.set(((DecimalColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + static DateWritable nextDate(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DateWritable result; + if (previous == null || previous.getClass() != DateWritable.class) { + result = new DateWritable(); + } else { + result = (DateWritable) previous; + } + int date = (int) ((LongColumnVector) vector).vector[row]; + result.set(date); + return result; + } else { + return null; + } + } + + static OrcTimestamp nextTimestamp(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcTimestamp result; + if (previous == null || previous.getClass() != OrcTimestamp.class) { + result = new OrcTimestamp(); + } else { + result = (OrcTimestamp) previous; + } + TimestampColumnVector tcv = (TimestampColumnVector) vector; + result.setTime(tcv.time[row]); + result.setNanos(tcv.nanos[row]); + return result; + } else { + return null; + } + } + + static OrcStruct nextStruct(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcStruct result; + List<TypeDescription> childrenTypes = schema.getChildren(); + int numChildren = childrenTypes.size(); + if (previous == null || previous.getClass() != OrcStruct.class) { + result = new OrcStruct(schema); + } else { + result = (OrcStruct) previous; + } + StructColumnVector struct = (StructColumnVector) vector; + for(int f=0; f < numChildren; ++f) { + result.setFieldValue(f, nextValue(struct.fields[f], row, + childrenTypes.get(f), result.getFieldValue(f))); + } + return result; + } else { + return null; + } + } + + static OrcUnion nextUnion(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcUnion result; + List<TypeDescription> childrenTypes = schema.getChildren(); + if (previous == null || previous.getClass() != OrcUnion.class) { + result = new OrcUnion(schema); + } else { + result = (OrcUnion) previous; + } + UnionColumnVector union = (UnionColumnVector) vector; + byte tag = (byte) union.tags[row]; + result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag), + result.getObject())); + return result; + } else { + return null; + } + } + + static OrcList nextList(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + OrcList result; + List<TypeDescription> childrenTypes = schema.getChildren(); + TypeDescription valueType = childrenTypes.get(0); + if (previous == null || + previous.getClass() != ArrayList.class) { + result = new OrcList(schema); + } else { + result = (OrcList) previous; + } + ListColumnVector list = (ListColumnVector) vector; + int length = (int) list.lengths[row]; + int offset = (int) list.offsets[row]; + result.ensureCapacity(length); + int oldLength = result.size(); + int idx = 0; + while (idx < length && idx < oldLength) { + result.set(idx, nextValue(list.child, offset + idx, valueType, + result.get(idx))); + idx += 1; + } + if (length < oldLength) { + for(int i= oldLength - 1; i >= length; --i) { + result.remove(i); + } + } else if (oldLength < length) { + while (idx < length) { + result.add(nextValue(list.child, offset + idx, valueType, null)); + idx += 1; + } + } + return result; + } else { + return null; + } + } + + static OrcMap nextMap(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + MapColumnVector map = (MapColumnVector) vector; + int length = (int) map.lengths[row]; + int offset = (int) map.offsets[row]; + OrcMap result; + List<TypeDescription> childrenTypes = schema.getChildren(); + TypeDescription keyType = childrenTypes.get(0); + TypeDescription valueType = childrenTypes.get(1); + if (previous == null || + previous.getClass() != OrcMap.class) { + result = new OrcMap(schema); + } else { + result = (OrcMap) previous; + // I couldn't think of a good way to reuse the keys and value objects + // without even more allocations, so take the easy and safe approach. + result.clear(); + } + for(int e=0; e < length; ++e) { + result.put(nextValue(map.keys, e + offset, keyType, null), + nextValue(map.values, e + offset, valueType, null)); + } + return result; + } else { + return null; + } + } + + static Writable nextValue(ColumnVector vector, + int row, + TypeDescription schema, + Object previous) { + switch (schema.getCategory()) { + case BOOLEAN: + return nextBoolean(vector, row, previous); + case BYTE: + return nextByte(vector, row, previous); + case SHORT: + return nextShort(vector, row, previous); + case INT: + return nextInt(vector, row, previous); + case LONG: + return nextLong(vector, row, previous); + case FLOAT: + return nextFloat(vector, row, previous); + case DOUBLE: + return nextDouble(vector, row, previous); + case STRING: + case CHAR: + case VARCHAR: + return nextString(vector, row, previous); + case BINARY: + return nextBinary(vector, row, previous); + case DECIMAL: + return nextDecimal(vector, row, previous); + case DATE: + return nextDate(vector, row, previous); + case TIMESTAMP: + return nextTimestamp(vector, row, previous); + case STRUCT: + return nextStruct(vector, row, schema, previous); + case UNION: + return nextUnion(vector, row, schema, previous); + case LIST: + return nextList(vector, row, schema, previous); + case MAP: + return nextMap(vector, row, schema, previous); + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java new file mode 100644 index 0000000..4237656 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class OrcRecordWriter<V extends Writable> + implements RecordWriter<NullWritable, V> { + private final Writer writer; + private final VectorizedRowBatch batch; + private final TypeDescription schema; + private final boolean isTopStruct; + + public OrcRecordWriter(Writer writer) { + this.writer = writer; + schema = writer.getSchema(); + this.batch = schema.createRowBatch(); + isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; + } + + static void setLongValue(ColumnVector vector, int row, long value) { + ((LongColumnVector) vector).vector[row] = value; + } + + static void setDoubleValue(ColumnVector vector, int row, double value) { + ((DoubleColumnVector) vector).vector[row] = value; + } + + static void setBinaryValue(ColumnVector vector, int row, + BinaryComparable value) { + ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, + value.getLength()); + } + + static void setBinaryValue(ColumnVector vector, int row, + BinaryComparable value, int maxLength) { + ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0, + Math.min(maxLength, value.getLength())); + } + + private static final ThreadLocal<byte[]> SPACE_BUFFER = + new ThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + byte[] result = new byte[100]; + Arrays.fill(result, (byte) ' '); + return result; + } + }; + + static void setCharValue(BytesColumnVector vector, + int row, + Text value, + int length) { + // we need to trim or pad the string with spaces to required length + int actualLength = value.getLength(); + if (actualLength >= length) { + setBinaryValue(vector, row, value, length); + } else { + byte[] spaces = SPACE_BUFFER.get(); + if (length - actualLength > spaces.length) { + spaces = new byte[length - actualLength]; + Arrays.fill(spaces, (byte)' '); + SPACE_BUFFER.set(spaces); + } + vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0, + length - actualLength); + } + } + + static void setStructValue(TypeDescription schema, + StructColumnVector vector, + int row, + OrcStruct value) { + List<TypeDescription> children = schema.getChildren(); + for(int c=0; c < value.getNumFields(); ++c) { + setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c)); + } + } + + static void setUnionValue(TypeDescription schema, + UnionColumnVector vector, + int row, + OrcUnion value) { + List<TypeDescription> children = schema.getChildren(); + int tag = value.getTag() & 0xff; + vector.tags[row] = tag; + setColumn(children.get(tag), vector.fields[tag], row, value.getObject()); + } + + + static void setListValue(TypeDescription schema, + ListColumnVector vector, + int row, + OrcList value) { + TypeDescription elemType = schema.getChildren().get(0); + vector.offsets[row] = vector.childCount; + vector.lengths[row] = value.size(); + vector.childCount += vector.lengths[row]; + vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0); + for(int e=0; e < vector.lengths[row]; ++e) { + setColumn(elemType, vector.child, (int) vector.offsets[row] + e, + (Writable) value.get(e)); + } + } + + static void setMapValue(TypeDescription schema, + MapColumnVector vector, + int row, + OrcMap<?,?> value) { + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + vector.offsets[row] = vector.childCount; + vector.lengths[row] = value.size(); + vector.childCount += vector.lengths[row]; + vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0); + vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0); + int e = 0; + for(Map.Entry<?,?> entry: value.entrySet()) { + setColumn(keyType, vector.keys, (int) vector.offsets[row] + e, + (Writable) entry.getKey()); + setColumn(valueType, vector.values, (int) vector.offsets[row] + e, + (Writable) entry.getValue()); + e += 1; + } + } + + static void setColumn(TypeDescription schema, + ColumnVector vector, + int row, + Writable value) { + if (value == null) { + vector.noNulls = false; + vector.isNull[row] = true; + } else { + switch (schema.getCategory()) { + case BOOLEAN: + setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0); + break; + case BYTE: + setLongValue(vector, row, ((ByteWritable) value).get()); + break; + case SHORT: + setLongValue(vector, row, ((ShortWritable) value).get()); + break; + case INT: + setLongValue(vector, row, ((IntWritable) value).get()); + break; + case LONG: + setLongValue(vector, row, ((LongWritable) value).get()); + break; + case FLOAT: + setDoubleValue(vector, row, ((FloatWritable) value).get()); + break; + case DOUBLE: + setDoubleValue(vector, row, ((DoubleWritable) value).get()); + break; + case STRING: + setBinaryValue(vector, row, (Text) value); + break; + case CHAR: + setCharValue((BytesColumnVector) vector, row, (Text) value, + schema.getMaxLength()); + break; + case VARCHAR: + setBinaryValue(vector, row, (Text) value, schema.getMaxLength()); + break; + case BINARY: + setBinaryValue(vector, row, (BytesWritable) value); + break; + case DATE: + setLongValue(vector, row, ((DateWritable) value).getDays()); + break; + case TIMESTAMP: + ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value); + break; + case DECIMAL: + ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value); + break; + case STRUCT: + setStructValue(schema, (StructColumnVector) vector, row, + (OrcStruct) value); + break; + case UNION: + setUnionValue(schema, (UnionColumnVector) vector, row, + (OrcUnion) value); + break; + case LIST: + setListValue(schema, (ListColumnVector) vector, row, (OrcList) value); + break; + case MAP: + setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value); + break; + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } + } + + @Override + public void write(NullWritable nullWritable, V v) throws IOException { + // if the batch is full, write it out. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + + // add the new row + int row = batch.size++; + if (isTopStruct) { + for(int f=0; f < schema.getChildren().size(); ++f) { + setColumn(schema.getChildren().get(f), batch.cols[f], row, + ((OrcStruct) v).getFieldValue(f)); + } + } else { + setColumn(schema, batch.cols[0], row, v); + } + } + + @Override + public void close(Reporter reporter) throws IOException { + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java new file mode 100644 index 0000000..74b3b28 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public final class OrcStruct implements Writable { + + private Writable[] fields; + private final TypeDescription schema; + + public OrcStruct(TypeDescription schema) { + this.schema = schema; + fields = new Writable[schema.getChildren().size()]; + } + + public Writable getFieldValue(int fieldIndex) { + return fields[fieldIndex]; + } + + public void setFieldValue(int fieldIndex, Writable value) { + fields[fieldIndex] = value; + } + + public int getNumFields() { + return fields.length; + } + + @Override + public void write(DataOutput output) throws IOException { + for(int f=0; f < fields.length; ++f) { + output.writeBoolean(fields[f] != null); + if (fields[f] != null) { + fields[f].write(output); + } + } + } + + @Override + public void readFields(DataInput input) throws IOException { + for(int f=0; f < fields.length; ++f) { + if (input.readBoolean()) { + if (fields[f] == null) { + fields[f] = createValue(schema.getChildren().get(f)); + } + fields[f].readFields(input); + } else { + fields[f] = null; + } + } + } + + public void setFieldValue(String fieldName, Writable value) { + int fieldIdx = schema.getFieldNames().indexOf(fieldName); + if (fieldIdx == -1) { + throw new IllegalArgumentException("Field " + fieldName + + " not found in " + schema); + } + fields[fieldIdx] = value; + } + + public Writable getFieldValue(String fieldName) { + int fieldIdx = schema.getFieldNames().indexOf(fieldName); + if (fieldIdx == -1) { + throw new IllegalArgumentException("Field " + fieldName + + " not found in " + schema); + } + return fields[fieldIdx]; + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != OrcStruct.class) { + return false; + } else { + OrcStruct oth = (OrcStruct) other; + if (fields.length != oth.fields.length) { + return false; + } + for(int i=0; i < fields.length; ++i) { + if (fields[i] == null) { + if (oth.fields[i] != null) { + return false; + } + } else { + if (!fields[i].equals(oth.fields[i])) { + return false; + } + } + } + return true; + } + } + + @Override + public int hashCode() { + int result = fields.length; + for(Object field: fields) { + if (field != null) { + result ^= field.hashCode(); + } + } + return result; + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append("{"); + for(int i=0; i < fields.length; ++i) { + if (i != 0) { + buffer.append(", "); + } + buffer.append(fields[i]); + } + buffer.append("}"); + return buffer.toString(); + } + + /* Routines for stubbing into Writables */ + + public static Writable createValue(TypeDescription type) { + switch (type.getCategory()) { + case BOOLEAN: return new BooleanWritable(); + case BYTE: return new ByteWritable(); + case SHORT: return new ShortWritable(); + case INT: return new IntWritable(); + case LONG: return new LongWritable(); + case FLOAT: return new FloatWritable(); + case DOUBLE: return new DoubleWritable(); + case BINARY: return new BytesWritable(); + case CHAR: + case VARCHAR: + case STRING: + return new Text(); + case DATE: + return new DateWritable(); + case TIMESTAMP: + return new OrcTimestamp(); + case DECIMAL: + return new HiveDecimalWritable(); + case STRUCT: { + OrcStruct result = new OrcStruct(type); + int c = 0; + for(TypeDescription child: type.getChildren()) { + result.setFieldValue(c++, createValue(child)); + } + return result; + } + case UNION: return new OrcUnion(type); + case LIST: return new OrcList(type); + case MAP: return new OrcMap(type); + default: + throw new IllegalArgumentException("Unknown type " + type); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java new file mode 100644 index 0000000..200a966 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Timestamp; + +/** + * A Timestamp implementation that implements Writable. + */ +public class OrcTimestamp extends Timestamp implements Writable { + + public OrcTimestamp() { + super(0); + } + + public OrcTimestamp(long time) { + super(time); + } + + public OrcTimestamp(String timeStr) { + super(0); + Timestamp t = Timestamp.valueOf(timeStr); + setTime(t.getTime()); + setNanos(t.getNanos()); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeLong(getTime()); + output.writeInt(getNanos()); + } + + @Override + public void readFields(DataInput input) throws IOException { + setTime(input.readLong()); + setNanos(input.readInt()); + } + + public void set(String timeStr) { + Timestamp t = Timestamp.valueOf(timeStr); + setTime(t.getTime()); + setNanos(t.getNanos()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java new file mode 100644 index 0000000..3e7c909 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * An in-memory representation of a union type. + */ +public final class OrcUnion implements Writable { + private byte tag; + private Writable object; + private final TypeDescription schema; + + public OrcUnion(TypeDescription schema) { + this.schema = schema; + } + + public void set(byte tag, Writable object) { + this.tag = tag; + this.object = object; + } + + public byte getTag() { + return tag; + } + + public Writable getObject() { + return object; + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != OrcUnion.class) { + return false; + } + OrcUnion oth = (OrcUnion) other; + if (tag != oth.tag) { + return false; + } else if (object == null) { + return oth.object == null; + } else { + return object.equals(oth.object); + } + } + + @Override + public int hashCode() { + int result = tag; + if (object != null) { + result ^= object.hashCode(); + } + return result; + } + + @Override + public String toString() { + return "uniontype(" + Integer.toString(tag & 0xff) + ", " + object + ")"; + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeByte(tag); + output.writeBoolean(object != null); + if (object != null) { + object.write(output); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + byte oldTag = tag; + tag = input.readByte(); + if (input.readBoolean()) { + if (oldTag != tag || object == null) { + object = OrcStruct.createValue(schema.getChildren().get(tag)); + } + object.readFields(input); + } else { + object = null; + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java b/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java new file mode 100644 index 0000000..c95c1f2 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <p> + * This package provides convenient access to ORC files using Hadoop's + * MapReduce InputFormat and OutputFormat. + * </p> + * + * <p> + * For reading, set the InputFormat to OrcInputFormat and your map will + * receive a stream of OrcStruct objects for each row. (Note that ORC files + * may have any type as the root object instead of structs and then the + * object type will be the appropriate one.) + * </p> + * + * <p> + * The mapping of types is: + * <table summary="Mapping of ORC types to Writable types" + * border="1"> + * <thead> + * <col width="25%"> + * <col width="75%"> + * <tr><th>ORC Type</th><th>Writable Type</th></tr> + * </thead> + * <tbody> + * <tr><td>array</td><td>OrcList</td></tr> + * <tr><td>binary</td><td>BytesWritable</td></tr> + * <tr><td>bigint</td><td>LongWritable</td></tr> + * <tr><td>boolean</td><td>BooleanWritable</td></tr> + * <tr><td>char</td><td>Text</td></tr> + * <tr><td>date</td><td>DateWritable</td></tr> + * <tr><td>decimal</td><td>HiveDecimalWritable</td></tr> + * <tr><td>double</td><td>DoubleWritable</td></tr> + * <tr><td>float</td><td>FloatWritable</td></tr> + * <tr><td>int</td><td>IntWritable</td></tr> + * <tr><td>map</td><td>OrcMap</td></tr> + * <tr><td>smallint</td><td>ShortWritable</td></tr> + * <tr><td>string</td><td>Text</td></tr> + * <tr><td>struct</td><td>OrcStruct</td></tr> + * <tr><td>timestamp</td><td>OrcTimestamp</td></tr> + * <tr><td>tinyint</td><td>ByteWritable</td></tr> + * <tr><td>uniontype</td><td>OrcUnion</td></tr> + * <tr><td>varchar</td><td>Text</td></tr> + * </tbody> + * </table> + * </p> + * + * <p> + * For writing, set the OutputFormat to OrcOutputFormat and define the + * property "orc.schema" in your configuration. The property defines the + * type of the file and uses the Hive type strings, such as + * "struct<x:int,y:string,z:timestamp>" for a row with an integer, + * string, and timestamp. You can create an example object using: + *<pre>{@code + *String typeStr = "struct<x:int,y:string,z:timestamp>"; + *OrcStruct row = (OrcStruct) OrcStruct.createValue( + * TypeDescription.fromString(typeStr)); + *}</pre> + * </p> + * + * <p> + * Please look at the OrcConf class for the configuration knobs that are + * available. + * </p> + */ +package org.apache.orc.mapred; http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java new file mode 100644 index 0000000..4ac7ca9 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestOrcList { + + static void cloneWritable(Writable source, + Writable destination) throws IOException { + DataOutputBuffer out = new DataOutputBuffer(1024); + source.write(out); + out.flush(); + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), out.getLength()); + destination.readFields(in); + } + + @Test + public void testRead() throws IOException { + TypeDescription type = + TypeDescription.createList(TypeDescription.createInt()); + OrcList<IntWritable> expected = new OrcList<>(type); + OrcList<IntWritable> actual = new OrcList<>(type); + expected.add(new IntWritable(123)); + expected.add(new IntWritable(456)); + expected.add(new IntWritable(789)); + assertNotEquals(expected, actual); + cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.clear(); + cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.add(null); + expected.add(new IntWritable(500)); + cloneWritable(expected, actual); + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java new file mode 100644 index 0000000..34e1feb --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestOrcMap { + + @Test + public void testRead() throws IOException { + TypeDescription type = + TypeDescription.createMap(TypeDescription.createInt(), + TypeDescription.createLong()); + OrcMap<IntWritable, LongWritable> expected = new OrcMap<>(type); + OrcMap<IntWritable, LongWritable> actual = new OrcMap<>(type); + expected.put(new IntWritable(999), new LongWritable(1111)); + expected.put(new IntWritable(888), new LongWritable(2222)); + expected.put(new IntWritable(777), new LongWritable(3333)); + assertNotEquals(expected, actual); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.clear(); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.put(new IntWritable(666), null); + expected.put(new IntWritable(1), new LongWritable(777)); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java new file mode 100644 index 0000000..4af6a1c --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.orc.TypeDescription; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestOrcStruct { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testRead() throws IOException { + TypeDescription type = + TypeDescription.createStruct() + .addField("f1", TypeDescription.createInt()) + .addField("f2", TypeDescription.createLong()) + .addField("f3", TypeDescription.createString()); + OrcStruct expected = new OrcStruct(type); + OrcStruct actual = new OrcStruct(type); + assertEquals(3, expected.getNumFields()); + expected.setFieldValue(0, new IntWritable(1)); + expected.setFieldValue(1, new LongWritable(2)); + expected.setFieldValue(2, new Text("wow")); + assertEquals(147710, expected.hashCode()); + assertNotEquals(expected, actual); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.setFieldValue(0, null); + expected.setFieldValue(1, null); + expected.setFieldValue(2, null); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + assertEquals(3, expected.hashCode()); + expected.setFieldValue(1, new LongWritable(111)); + assertEquals(111, ((LongWritable) expected.getFieldValue(1)).get()); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + } + + @Test + public void testFieldAccess() { + OrcStruct struct = new OrcStruct(TypeDescription.fromString + ("struct<i:int,j:double,k:string>")); + struct.setFieldValue("j", new DoubleWritable(1.5)); + struct.setFieldValue("k", new Text("Moria")); + struct.setFieldValue(0, new IntWritable(42)); + assertEquals(new IntWritable(42), struct.getFieldValue("i")); + assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1)); + assertEquals(new Text("Moria"), struct.getFieldValue("k")); + } + + @Test + public void testBadFieldRead() { + OrcStruct struct = new OrcStruct(TypeDescription.fromString + ("struct<i:int,j:double,k:string>")); + thrown.expect(IllegalArgumentException.class); + struct.getFieldValue("bad"); + } + + @Test + public void testBadFieldWrite() { + OrcStruct struct = new OrcStruct(TypeDescription.fromString + ("struct<i:int,j:double,k:string>")); + thrown.expect(IllegalArgumentException.class); + struct.setFieldValue("bad", new Text("foobar")); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java new file mode 100644 index 0000000..925eb8a --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestOrcTimestamp { + + @Test + public void testRead() throws IOException { + OrcTimestamp expected = new OrcTimestamp("2016-04-01 12:34:56.9"); + OrcTimestamp actual = new OrcTimestamp(); + assertNotEquals(expected, actual); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + assertEquals("2016-04-01 12:34:56.9", actual.toString()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java new file mode 100644 index 0000000..82fd94f --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TestOrcUnion { + + @Test + public void testRead() throws IOException { + TypeDescription type = + TypeDescription.fromString("uniontype<int,bigint,string>"); + OrcUnion expected = new OrcUnion(type); + OrcUnion actual = new OrcUnion(type); + expected.set((byte) 2, new Text("foo")); + assertEquals(131367, expected.hashCode()); + assertNotEquals(expected, actual); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.set((byte) 0, new IntWritable(111)); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.set((byte)1, new LongWritable(4567)); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.set((byte) 1, new LongWritable(12345)); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + expected.set((byte) 1, null); + TestOrcList.cloneWritable(expected, actual); + assertEquals(expected, actual); + } +}
