ORC-72. Add benchmarks to ORC.
Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/825a9441 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/825a9441 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/825a9441 Branch: refs/heads/orc-72 Commit: 825a9441fdcdb8ceb392ae80bed0324e93f7b07d Parents: 37b939b Author: Owen O'Malley <omal...@apache.org> Authored: Tue Jun 14 10:00:15 2016 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Mon Oct 10 13:59:16 2016 -0700 ---------------------------------------------------------------------- java/bench/.gitignore | 5 + java/bench/fetch-data.sh | 6 + java/bench/pom.xml | 138 ++++ .../hadoop/fs/TrackingLocalFileSystem.java | 57 ++ .../hadoop/hive/ql/io/orc/VectorToWritable.java | 70 ++ .../src/java/org/apache/orc/bench/AvroScan.java | 48 ++ .../java/org/apache/orc/bench/AvroWriter.java | 368 ++++++++++ .../orc/bench/ColumnProjectionBenchmark.java | 192 +++++ .../java/org/apache/orc/bench/CsvReader.java | 175 +++++ .../src/java/org/apache/orc/bench/CsvScan.java | 40 ++ .../org/apache/orc/bench/FullReadBenchmark.java | 222 ++++++ .../java/org/apache/orc/bench/GithubToAvro.java | 42 ++ .../java/org/apache/orc/bench/GithubToJson.java | 51 ++ .../java/org/apache/orc/bench/GithubToOrc.java | 48 ++ .../org/apache/orc/bench/GithubToParquet.java | 63 ++ .../java/org/apache/orc/bench/JsonReader.java | 278 ++++++++ .../src/java/org/apache/orc/bench/JsonScan.java | 61 ++ .../src/java/org/apache/orc/bench/OrcScan.java | 46 ++ .../java/org/apache/orc/bench/ParquetScan.java | 54 ++ .../org/apache/orc/bench/RandomGenerator.java | 523 ++++++++++++++ .../org/apache/orc/bench/SalesGenerator.java | 200 ++++++ .../java/org/apache/orc/bench/SalesToAvro.java | 40 ++ .../java/org/apache/orc/bench/SalesToJson.java | 49 ++ .../java/org/apache/orc/bench/SalesToOrc.java | 42 ++ .../org/apache/orc/bench/SalesToParquet.java | 61 ++ .../java/org/apache/orc/bench/TaxiToAvro.java | 53 ++ .../java/org/apache/orc/bench/TaxiToJson.java | 93 +++ .../java/org/apache/orc/bench/TaxiToOrc.java | 108 +++ .../org/apache/orc/bench/TaxiToParquet.java | 75 ++ java/bench/src/main/resources/github.schema | 702 +++++++++++++++++++ java/bench/src/main/resources/log4j.properties | 6 + java/bench/src/main/resources/nyc-taxi.schema | 21 + .../java/org/apache/orc/TypeDescription.java | 2 +- java/pom.xml | 1 + .../src/java/org/apache/orc/tools/FileDump.java | 1 - 35 files changed, 3939 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/.gitignore ---------------------------------------------------------------------- diff --git a/java/bench/.gitignore b/java/bench/.gitignore new file mode 100644 index 0000000..babcae6 --- /dev/null +++ b/java/bench/.gitignore @@ -0,0 +1,5 @@ +.*.crc +*.json.gz +*.avro +*.parquet +*.orc http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/fetch-data.sh ---------------------------------------------------------------------- diff --git a/java/bench/fetch-data.sh b/java/bench/fetch-data.sh new file mode 100644 index 0000000..79e77ff --- /dev/null +++ b/java/bench/fetch-data.sh @@ -0,0 +1,6 @@ +#!/usr/bin/bash +mkdir -p data/nyc +(cd data/nyc; wget https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-{11..12}.csv) +(cd data/nyc; gzip *.csv) +mkdir -p data/github +(cd data/github; wget http://data.githubarchive.org/2015-11-{01..15}-{0..23}.json.gz) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/pom.xml b/java/bench/pom.xml new file mode 100644 index 0000000..019bdf0 --- /dev/null +++ b/java/bench/pom.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.orc</groupId> + <artifactId>orc</artifactId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>orc-benchmarks</artifactId> + <packaging>jar</packaging> + <name>ORC Benchmarks</name> + <description> + Benchmarks for comparing ORC, Parquet, and Avro performance. + </description> + + <dependencies> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + + <!-- inter-project --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.8.1</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>2.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.12</version> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>1.12</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.5</version> + </dependency> + + <!-- test inter-project --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.orc.bench.TaxiToOrc</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>cmake</id> + <build> + <directory>${build.dir}/mapreduce</directory> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java new file mode 100644 index 0000000..0440495 --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public class TrackingLocalFileSystem extends RawLocalFileSystem { + + class TrackingFileInputStream extends RawLocalFileSystem.LocalFSFileInputStream { + public TrackingFileInputStream(Path f) throws IOException { + super(f); + } + + public int read() throws IOException { + statistics.incrementReadOps(1); + return super.read(); + } + + public int read(byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(b, off, len); + } + + public int read(long position, byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(position, b, off, len); + } + } + + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + if (!exists(f)) { + throw new FileNotFoundException(f.toString()); + } + return new FSDataInputStream(new BufferedFSInputStream( + new TrackingFileInputStream(f), bufferSize)); + } + + public FileSystem.Statistics getLocalStatistics() { + return statistics; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java new file mode 100644 index 0000000..ae8e8da --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; + +import java.util.List; + +/** + * This class is just here to provide a public API to some of the ORC internal + * methods. + */ +public class VectorToWritable { + public static ObjectInspector createObjectInspector(TypeDescription schema) { + // convert the type descr to protobuf types + List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema); + // convert the protobuf types to an ObjectInspector + return OrcStruct.createObjectInspector(0, types); + } + + public static Object createValue(VectorizedRowBatch batch, + int row, + TypeDescription schema, + Object previous) { + if(schema.getCategory() == TypeDescription.Category.STRUCT) { + List<TypeDescription> children = schema.getChildren(); + int numberOfChildren = children.size(); + OrcStruct result; + if(previous != null && previous.getClass() == OrcStruct.class) { + result = (OrcStruct)previous; + if(result.getNumFields() != numberOfChildren) { + result.setNumFields(numberOfChildren); + } + } else { + result = new OrcStruct(numberOfChildren); + previous = result; + } + + for(int i = 0; i < numberOfChildren; ++i) { + result.setFieldValue(i, RecordReaderImpl.nextValue(batch.cols[i], row, + children.get(i), result.getFieldValue(i))); + } + } else { + previous = RecordReaderImpl.nextValue(batch.cols[0], row, schema, + previous); + } + ; + return previous; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/AvroScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java new file mode 100644 index 0000000..fcb8fce --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/AvroScan.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.TypeDescription; + +public class AvroScan { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + long rowCount = 0; + for(String filename: args) { + FsInput file = new FsInput(new Path(filename), conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + rowCount += 1; + } + } + System.out.println("Rows read: " + rowCount); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/AvroWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java new file mode 100644 index 0000000..ca0984b --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeException; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; + +public class AvroWriter { + + static Properties setHiveSchema(TypeDescription schema) { + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Assumes struct type as root, not " + + schema); + } + StringBuilder fieldNames = new StringBuilder(); + StringBuilder fieldTypes = new StringBuilder(); + List<String> childNames = schema.getFieldNames(); + List<TypeDescription> childTypes = schema.getChildren(); + for(int f=0; f < childNames.size(); ++f) { + if (f != 0) { + fieldNames.append(','); + fieldTypes.append(','); + } + fieldNames.append(childNames.get(f)); + fieldTypes.append(childTypes.get(f).toString()); + } + Properties properties = new Properties(); + properties.put("columns", fieldNames.toString()); + properties.put("columns.types", fieldTypes.toString()); + return properties; + } + + static Schema createAvroSchema(TypeDescription schema, + Configuration conf + ) throws IOException, AvroSerdeException { + Properties properties = setHiveSchema(schema); + return AvroSerdeUtils.determineSchemaOrThrowException(conf, properties); + } + + interface AvroConverter { + Object convert(ColumnVector vector, int row); + } + + private static class BooleanConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row] != 0; + } else { + return null; + } + } + } + + private static class IntConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return (int) vector.vector[row]; + } else { + return null; + } + } + } + + private static class LongConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class FloatConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return (float) vector.vector[row]; + } else { + return null; + } + } + } + + private static class DoubleConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class StringConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return new String(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class BinaryConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return ByteBuffer.wrap(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class TimestampConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + TimestampColumnVector vector = (TimestampColumnVector) cv; + return vector.time[row]; + } else { + return null; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DecimalColumnVector vector = (DecimalColumnVector) cv; + return AvroSerdeUtils.getBufferFromDecimal( + vector.vector[row].getHiveDecimal(), scale); + } else { + return null; + } + } + } + + private static class ListConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter childConverter; + + ListConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + childConverter = createConverter(schema.getChildren().get(0), + removeNullable(avroSchema.getElementType())); + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + ListColumnVector vector = (ListColumnVector) cv; + int offset = (int) vector.offsets[row]; + int length = (int) vector.lengths[row]; + GenericData.Array result = new GenericData.Array(length, avroSchema); + for(int i=0; i < length; ++i) { + result.add(childConverter.convert(vector.child, offset + i)); + } + return result; + } else { + return null; + } + } + } + + private static class StructConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + List<TypeDescription> childrenTypes = schema.getChildren(); + childConverters = new AvroConverter[childrenTypes.size()]; + List<Schema.Field> fields = avroSchema.getFields(); + for(int f=0; f < childConverters.length; ++f) { + childConverters[f] = createConverter(childrenTypes.get(f), + removeNullable(fields.get(f).schema())); + } + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + StructColumnVector vector = (StructColumnVector) cv; + GenericData.Record result = new GenericData.Record(avroSchema); + for(int f=0; f < childConverters.length; ++f) { + result.put(f, childConverters[f].convert(vector.fields[f], row)); + } + return result; + } else { + return null; + } + } + } + + static AvroConverter createConverter(TypeDescription types, + Schema avroSchema) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types, avroSchema); + case STRUCT: + return new StructConverter(types, avroSchema); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + /** + * Remove the union(null, ...) wrapper around the schema. + * + * All of the types in Hive are nullable and in Avro those are represented + * by wrapping each type in a union type with the void type. + * @param avro The avro type + * @return The avro type with the nullable layer removed + */ + static Schema removeNullable(Schema avro) { + while (avro.getType() == Schema.Type.UNION) { + List<Schema> children = avro.getTypes(); + if (children.size() == 2 && + children.get(0).getType() == Schema.Type.NULL) { + avro = children.get(1); + } else { + break; + } + } + return avro; + } + + private final AvroConverter[] converters; + private final DataFileWriter writer; + private final GenericRecord record; + + public AvroWriter(Path path, TypeDescription schema, + Configuration conf, + String compression) throws IOException, AvroSerdeException { + List<TypeDescription> childTypes = schema.getChildren(); + Schema avroSchema = createAvroSchema(schema, conf); + List<Schema.Field> avroFields = avroSchema.getFields(); + converters = new AvroConverter[childTypes.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(childTypes.get(c), + removeNullable(avroFields.get(c).schema())); + } + GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); + writer = new DataFileWriter(gdw); + if (compression != null & !"".equals(compression)) { + writer.setCodec(CodecFactory.fromString(compression)); + } + writer.create(avroSchema, path.getFileSystem(conf).create(path)); + record = new GenericData.Record(avroSchema); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + for(int f=0; f < batch.cols.length; ++f) { + record.put(f, converters[f].convert(batch.cols[f], r)); + } + writer.append(record); + } + } + + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java new file mode 100644 index 0000000..4641108 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import com.google.gson.JsonStreamParser; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.iq80.snappy.SnappyInputStream; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class ColumnProjectionBenchmark { + + @Param({ "github", "sales", "taxi"}) + public String Dataset; + +// @Param({"none", "snappy", "zlib"}) + @Param({"zlib"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = new Path("generated/" + Dataset + "-" + compression + ".orc"); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + boolean[] include = new boolean[schema.getMaximumId() + 1]; + // select first two columns + List<TypeDescription> children = schema.getChildren(); + for(int c= children.get(0).getId(); c <= children.get(1).getMaximumId(); ++c) { + include[c] = true; + } + RecordReader rows = reader.rows(new Reader.Options() + .include(include)); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + if ("taxi".equals(Dataset)) { + conf.set("columns", "vendor_id,pickup_time"); + conf.set("columns.types", "int,timestamp"); + } else if ("sales".equals(Dataset)) { + conf.set("columns", "sales_id,customer_id"); + conf.set("columns.types", "bigint,bigint"); + } else if ("github".equals(Dataset)) { + conf.set("columns", "actor,created_at"); + conf.set("columns.types", "struct<avatar_url:string,gravatar_id:string," + + "id:int,login:string,url:string>,timestamp"); + } else { + throw new IllegalArgumentException("Unknown data set " + Dataset); + } + Path path = new Path("generated/" + Dataset + "-" + compression + ".parquet"); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, + Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(ColumnProjectionBenchmark.class.getSimpleName()) + .jvmArgs("-server", "-Xms256m", "-Xmx2g").build() + ).run(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/CsvReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/CsvReader.java new file mode 100644 index 0000000..5c86a89 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/CsvReader.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class CsvReader { + private final Iterator<CSVRecord> parser; + private final ColumnReader[] readers; + + interface ColumnReader { + void read(String value, ColumnVector vect, int row); + } + + static class LongColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = Long.parseLong(value); + } + } + } + + static class DoubleColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = Double.parseDouble(value); + } + } + } + + static class StringColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getBytes(); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value)); + } + } + } + + static class DecimalColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value)); + } + } + } + + ColumnReader createReader(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnReader(); + case FLOAT: + case DOUBLE: + return new DoubleColumnReader(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnReader(); + case DECIMAL: + return new DecimalColumnReader(); + case TIMESTAMP: + return new TimestampColumnReader(); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + public CsvReader(Path path, + Configuration conf, + TypeDescription schema) throws IOException { + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream raw = fs.open(path); + String name = path.getName(); + int lastDot = name.lastIndexOf("."); + InputStream input = raw; + if (lastDot >= 0) { + if (".gz".equals(name.substring(lastDot))) { + input = new DataInputStream(new GZIPInputStream(raw)); + } + } + parser = new CSVParser(new InputStreamReader(input), + CSVFormat.RFC4180.withHeader()).iterator(); + List<TypeDescription> columnTypes = schema.getChildren(); + readers = new ColumnReader[columnTypes.size()]; + int c = 0; + for(TypeDescription columnType: columnTypes) { + readers[c++] = createReader(columnType); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (parser.hasNext() && batch.size < maxSize) { + CSVRecord record = parser.next(); + int c = 0; + for(String val: record) { + readers[c].read(val, batch.cols[c], batch.size); + c += 1; + } + batch.size++; + } + return batch.size != 0; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/CsvScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/CsvScan.java new file mode 100644 index 0000000..f2ec61a --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/CsvScan.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +public class CsvScan { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + long rowCount = 0; + TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema"); + for(String filename: args) { + CsvReader reader = new CsvReader(new Path(filename), conf, schema); + VectorizedRowBatch batch = schema.createRowBatch(); + while (reader.nextBatch(batch)) { + rowCount += batch.size; + } + } + System.out.println("Rows read: " + rowCount); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java new file mode 100644 index 0000000..2610328 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import com.google.gson.JsonStreamParser; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.iq80.snappy.SnappyInputStream; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class FullReadBenchmark { + + @Param({"taxi", "sales", "github"}) + public String Dataset; + + @Param({"none", "zlib", "snappy"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = new Path("generated/" + Dataset + "-" + compression + ".orc"); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + RecordReader rows = reader.rows(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void avro(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = new Path("generated/" + Dataset + "-" + compression + ".avro"); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + FsInput file = new FsInput(path, conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = new Path("generated/" + Dataset + "-" + compression + ".parquet"); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, + Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void json(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + Path path = new Path("generated/" + Dataset + "-" + compression + ".json"); + InputStream input = fs.open(path); + if ("zlib".equals(compression)) { + input = new GZIPInputStream(input); + } else if ("snappy".equals(compression)) { + input = new SnappyInputStream(input); + } else if (!"none".equals(compression)) { + throw new IllegalArgumentException("Unknown compression " + compression); + } + JsonStreamParser parser = + new JsonStreamParser(new InputStreamReader(input)); + while (parser.hasNext()) { + parser.next(); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(FullReadBenchmark.class.getSimpleName()) + .addProfiler("hs_gc") + .jvmArgs("-server", "-Xms256m", "-Xmx2g").build() + ).run(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java new file mode 100644 index 0000000..982db64 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +public class GithubToAvro { + + public static void main(String[] args) throws Exception { + TypeDescription schema = TaxiToOrc.loadSchema("github.schema"); + Configuration conf = new Configuration(); + AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf, + TaxiToAvro.getCodec(args[1])); + VectorizedRowBatch batch = schema.createRowBatch(); + for(String inFile: TaxiToOrc.sliceArray(args, 2)) { + JsonReader reader = new JsonReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + writer.writeBatch(batch); + } + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/GithubToJson.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java new file mode 100644 index 0000000..f5ae6b1 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.tools.FileDump; + +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.zip.GZIPOutputStream; + +public class GithubToJson { + + public static void main(String[] args) throws Exception { + TypeDescription schema = TaxiToOrc.loadSchema("github.schema"); + Path path = new Path(args[0]); + VectorizedRowBatch batch = schema.createRowBatch(); + Configuration conf = new Configuration(); + Writer output = new OutputStreamWriter(TaxiToJson.getCodec(args[1]) + .create(path.getFileSystem(conf).create(path))); + for(String inFile: TaxiToOrc.sliceArray(args, 2)) { + JsonReader reader = new JsonReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + for(int r=0; r < batch.size; ++r) { + FileDump.printRow(output, batch, schema, r); + output.write("\n"); + } + } + } + output.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java new file mode 100644 index 0000000..59c758f --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; +import java.io.InputStream; + +public class GithubToOrc { + + public static void main(String[] args) throws Exception { + TypeDescription schema = TaxiToOrc.loadSchema("github.schema"); + VectorizedRowBatch batch = schema.createRowBatch(); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter(new Path(args[0]), + OrcFile.writerOptions(conf).setSchema(schema) + .compress(TaxiToOrc.getCodec(args[1]))); + for(String inFile: TaxiToOrc.sliceArray(args, 2)) { + JsonReader reader = new JsonReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + writer.addRowBatch(batch); + } + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java new file mode 100644 index 0000000..e1fafdc --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.VectorToWritable; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; + +import java.util.Properties; + +public class GithubToParquet { + + public static void main(String[] args) throws Exception { + TypeDescription schema = TaxiToOrc.loadSchema("github.schema"); + VectorizedRowBatch batch = schema.createRowBatch(); + JobConf conf = new JobConf(); + conf.set("mapred.task.id", "attempt_0_0_m_0_0"); + conf.set("parquet.compression", TaxiToParquet.getCodec(args[1])); + Path path = new Path(args[0]); + Properties properties = AvroWriter.setHiveSchema(schema); + MapredParquetOutputFormat format = new MapredParquetOutputFormat(); + FileSinkOperator.RecordWriter writer = format.getHiveRecordWriter(conf, + path, ParquetHiveRecord.class, !"none".equals(args[1]), properties, + Reporter.NULL); + ParquetHiveRecord record = new ParquetHiveRecord(); + record.inspector = + (StructObjectInspector) VectorToWritable.createObjectInspector(schema); + for(String inFile: TaxiToOrc.sliceArray(args, 2)) { + JsonReader reader = new JsonReader(new Path(inFile), conf, schema); + while (reader.nextBatch(batch)) { + for(int r=0; r < batch.size; ++r) { + record.value = VectorToWritable.createValue(batch, r, schema, + record.value); + writer.write(record); + } + } + } + writer.close(true); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/JsonReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/JsonReader.java new file mode 100644 index 0000000..599c872 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/JsonReader.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonStreamParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Timestamp; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class JsonReader { + private final TypeDescription schema; + private final JsonStreamParser parser; + private final JsonConverter[] converters; + + interface JsonConverter { + void convert(JsonElement value, ColumnVector vect, int row); + } + + static class BooleanColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsBoolean() ? 1 : 0; + } + } + } + + static class LongColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsLong(); + } + } + } + + static class DoubleColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = value.getAsDouble(); + } + } + } + + static class StringColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getAsString().getBytes(); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class BinaryColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + String binStr = value.getAsString(); + byte[] bytes = new byte[binStr.length()/2]; + for(int i=0; i < bytes.length; ++i) { + bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16); + } + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value.getAsString() + .replaceAll("[TZ]", " "))); + } + } + } + + static class DecimalColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value.getAsString())); + } + } + } + + static class StructColumnConverter implements JsonConverter { + private JsonConverter[] childrenConverters; + private List<String> fieldNames; + + public StructColumnConverter(TypeDescription schema) { + List<TypeDescription> kids = schema.getChildren(); + childrenConverters = new JsonConverter[kids.size()]; + for(int c=0; c < childrenConverters.length; ++c) { + childrenConverters[c] = createConverter(kids.get(c)); + } + fieldNames = schema.getFieldNames(); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + StructColumnVector vector = (StructColumnVector) vect; + JsonObject obj = value.getAsJsonObject(); + for(int c=0; c < childrenConverters.length; ++c) { + JsonElement elem = obj.get(fieldNames.get(c)); + childrenConverters[c].convert(elem, vector.fields[c], row); + } + } + } + } + + static class ListColumnConverter implements JsonConverter { + private JsonConverter childrenConverter; + + public ListColumnConverter(TypeDescription schema) { + childrenConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + ListColumnVector vector = (ListColumnVector) vect; + JsonArray obj = value.getAsJsonArray(); + vector.lengths[row] = obj.size(); + vector.offsets[row] = vector.childCount; + vector.childCount += vector.lengths[row]; + vector.child.ensureSize(vector.childCount, true); + for(int c=0; c < obj.size(); ++c) { + childrenConverter.convert(obj.get(c), vector.child, + (int) vector.offsets[row] + c); + } + } + } + } + + static JsonConverter createConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnConverter(); + case FLOAT: + case DOUBLE: + return new DoubleColumnConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnConverter(); + case DECIMAL: + return new DecimalColumnConverter(); + case TIMESTAMP: + return new TimestampColumnConverter(); + case BINARY: + return new BinaryColumnConverter(); + case BOOLEAN: + return new BooleanColumnConverter(); + case STRUCT: + return new StructColumnConverter(schema); + case LIST: + return new ListColumnConverter(schema); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + public JsonReader(Path path, + Configuration conf, + TypeDescription schema) throws IOException { + this.schema = schema; + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream raw = fs.open(path); + String name = path.getName(); + int lastDot = name.lastIndexOf("."); + InputStream input = raw; + if (lastDot >= 0) { + if (".gz".equals(name.substring(lastDot))) { + input = new GZIPInputStream(raw); + } + } + parser = new JsonStreamParser(new InputStreamReader(input)); + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Root must be struct - " + schema); + } + List<TypeDescription> fieldTypes = schema.getChildren(); + converters = new JsonConverter[fieldTypes.size()]; + for(int c = 0; c < converters.length; ++c) { + converters[c] = createConverter(fieldTypes.get(c)); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + List<String> fieldNames = schema.getFieldNames(); + while (parser.hasNext() && batch.size < maxSize) { + JsonObject elem = parser.next().getAsJsonObject(); + for(int c=0; c < converters.length; ++c) { + // look up each field to see if it is in the input, otherwise + // set it to null. + JsonElement field = elem.get(fieldNames.get(c)); + if (field == null) { + batch.cols[c].noNulls = false; + batch.cols[c].isNull[batch.size] = true; + } else { + converters[c].convert(field, batch.cols[c], batch.size); + } + } + batch.size++; + } + return batch.size != 0; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/JsonScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/JsonScan.java new file mode 100644 index 0000000..1115ae6 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/JsonScan.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import com.google.gson.JsonStreamParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.zip.GZIPInputStream; + +public class JsonScan { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); + long rowCount = 0; + for(String filename: args) { + Path path = new Path(filename); + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream raw = fs.open(path); + int lastDot = filename.lastIndexOf("."); + InputStream input = raw; + if (lastDot >= 0) { + if (".gz".equals(filename.substring(lastDot))) { + input = new GZIPInputStream(raw); + } + } + JsonStreamParser parser = + new JsonStreamParser(new InputStreamReader(input)); + while (parser.hasNext()) { + parser.next(); + rowCount += 1; + } + } + System.out.println("Rows read: " + rowCount); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/OrcScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/OrcScan.java new file mode 100644 index 0000000..096f3fa --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/OrcScan.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +public class OrcScan { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); + long rowCount = 0; + for(String filename: args) { + Reader reader = OrcFile.createReader(new Path(filename), options); + TypeDescription schema = reader.getSchema(); + RecordReader rows = reader.rows(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + rowCount += batch.size; + } + rows.close(); + } + System.out.println("Rows read: " + rowCount); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/825a9441/java/bench/src/java/org/apache/orc/bench/ParquetScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java new file mode 100644 index 0000000..ccaaa2a --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.parquet.hadoop.ParquetInputFormat; + +public class ParquetScan { + public static void main(String[] args) throws Exception { + JobConf conf = new JobConf(); + long rowCount = 0; + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + for(String filename: args) { + FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE, + new String[]{}); + RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, + Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + rowCount += 1; + } + recordReader.close(); + } + System.out.println("Rows read: " + rowCount); + } +}