ORC-354. Restore the benchmark module. This reverts commit b86d70aa73289b86e066cc019ea11e0d83c1e40d.
Signed-off-by: Owen O'Malley <omal...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/48ba9241 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/48ba9241 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/48ba9241 Branch: refs/heads/master Commit: 48ba9241cd5984ce4fa32c79f729f7c20623351d Parents: 18083fe Author: Owen O'Malley <omal...@apache.org> Authored: Fri Apr 27 16:58:48 2018 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Wed May 2 10:28:40 2018 -0700 ---------------------------------------------------------------------- java/CMakeLists.txt | 2 +- java/bench/.gitignore | 5 + java/bench/README.md | 33 + java/bench/fetch-data.sh | 21 + java/bench/pom.xml | 217 ++++++ java/bench/src/assembly/uber.xml | 33 + java/bench/src/findbugs/exclude.xml | 25 + .../hadoop/fs/TrackingLocalFileSystem.java | 57 ++ .../hive/ql/io/orc/OrcBenchmarkUtilities.java | 54 ++ .../orc/bench/ColumnProjectionBenchmark.java | 188 +++++ .../org/apache/orc/bench/CompressionKind.java | 87 +++ .../src/java/org/apache/orc/bench/Driver.java | 78 +++ .../org/apache/orc/bench/FullReadBenchmark.java | 223 ++++++ .../org/apache/orc/bench/RandomGenerator.java | 524 ++++++++++++++ .../org/apache/orc/bench/SalesGenerator.java | 206 ++++++ .../java/org/apache/orc/bench/Utilities.java | 127 ++++ .../apache/orc/bench/convert/BatchReader.java | 34 + .../apache/orc/bench/convert/BatchWriter.java | 34 + .../orc/bench/convert/GenerateVariants.java | 220 ++++++ .../apache/orc/bench/convert/ScanVariants.java | 87 +++ .../orc/bench/convert/avro/AvroReader.java | 299 ++++++++ .../orc/bench/convert/avro/AvroSchemaUtils.java | 192 +++++ .../orc/bench/convert/avro/AvroWriter.java | 363 ++++++++++ .../apache/orc/bench/convert/csv/CsvReader.java | 175 +++++ .../orc/bench/convert/json/JsonReader.java | 279 ++++++++ .../orc/bench/convert/json/JsonWriter.java | 217 ++++++ .../apache/orc/bench/convert/orc/OrcReader.java | 50 ++ .../apache/orc/bench/convert/orc/OrcWriter.java | 54 ++ .../bench/convert/parquet/ParquetReader.java | 297 ++++++++ .../bench/convert/parquet/ParquetWriter.java | 86 +++ java/bench/src/main/resources/github.schema | 702 +++++++++++++++++++ java/bench/src/main/resources/log4j.properties | 18 + java/bench/src/main/resources/sales.schema | 56 ++ java/bench/src/main/resources/taxi.schema | 21 + java/pom.xml | 25 +- 35 files changed, 5071 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 6845ae1..e82898c 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -23,7 +23,7 @@ set(ORC_JARS ) if (ANALYZE_JAVA) - set(JAVA_PROFILE "-Pcmake,analyze") + set(JAVA_PROFILE "-Pcmake,analyze,benchmark") else() set(JAVA_PROFILE "-Pcmake") endif() http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/.gitignore ---------------------------------------------------------------------- diff --git a/java/bench/.gitignore b/java/bench/.gitignore new file mode 100644 index 0000000..babcae6 --- /dev/null +++ b/java/bench/.gitignore @@ -0,0 +1,5 @@ +.*.crc +*.json.gz +*.avro +*.parquet +*.orc http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/README.md ---------------------------------------------------------------------- diff --git a/java/bench/README.md b/java/bench/README.md new file mode 100644 index 0000000..12cedea --- /dev/null +++ b/java/bench/README.md @@ -0,0 +1,33 @@ +# File Format Benchmarks + +These big data file format benchmarks, compare: + +* Avro +* Json +* ORC +* Parquet + +To build this library: + +```% mvn clean package``` + +To fetch the source data: + +```% ./fetch-data.sh``` + +To generate the derived data: + +```% java -jar target/orc-benchmarks-*-uber.jar generate data``` + +To run a scan of all of the data: + +```% java -jar target/orc-benchmarks-*-uber.jar scan data``` + +To run full read benchmark: + +```% java -jar target/orc-benchmarks-*-uber.jar read-all data``` + +To run column projection benchmark: + +```% java -jar target/orc-benchmarks-*-uber.jar read-some data``` + http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/fetch-data.sh ---------------------------------------------------------------------- diff --git a/java/bench/fetch-data.sh b/java/bench/fetch-data.sh new file mode 100755 index 0000000..129d83f --- /dev/null +++ b/java/bench/fetch-data.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +mkdir -p data/sources/taxi +(cd data/sources/taxi; wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-{11,12}.csv) +(cd data/sources/taxi; gzip *.csv) +mkdir -p data/sources/github +(cd data/sources/github; wget http://data.githubarchive.org/2015-11-{01..15}-{0..23}.json.gz) http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/pom.xml b/java/bench/pom.xml new file mode 100644 index 0000000..148693a --- /dev/null +++ b/java/bench/pom.xml @@ -0,0 +1,217 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>18</version> + <relativePath></relativePath> + </parent> + + <groupId>org.apache.orc</groupId> + <artifactId>orc-benchmarks</artifactId> + <version>1.5.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>ORC Benchmarks</name> + <description> + Benchmarks for comparing ORC, Parquet, JSON, and Avro performance. + </description> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation> + + <avro.version>1.8.2</avro.version> + <hadoop.version>2.7.3</hadoop.version> + <hive.version>2.3.3</hive.version> + <orc.version>1.5.0-SNAPSHOT</orc.version> + <parquet.version>1.9.0</parquet.version> + <storage-api.version>2.5.0</storage-api.version> + <zookeeper.version>3.4.6</zookeeper.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.8.4</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.2.4</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.3.1</version> + </dependency> + <dependency> + <groupId>io.airlift</groupId> + <artifactId>aircompressor</artifactId> + <version>0.10</version> + <exclusions> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>slice</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <classifier>hadoop2</classifier> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <classifier>core</classifier> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>${storage-api.version}</version> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.jodd</groupId> + <artifactId>jodd-core</artifactId> + <version>3.5.2</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>1.18</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>${basedir}/src/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test</testSourceDirectory> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>3.0.0-M1</version> + <executions> + <execution> + <id>enforce-maven</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireMavenVersion> + <version>2.2.1</version> + </requireMavenVersion> + </rules> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.orc.bench.Driver</mainClass> + </manifest> + </archive> + <descriptors> + <descriptor>src/assembly/uber.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>cmake</id> + <build> + <directory>${build.dir}/bench</directory> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/assembly/uber.xml ---------------------------------------------------------------------- diff --git a/java/bench/src/assembly/uber.xml b/java/bench/src/assembly/uber.xml new file mode 100644 index 0000000..014eab9 --- /dev/null +++ b/java/bench/src/assembly/uber.xml @@ -0,0 +1,33 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <id>uber</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + </dependencySet> + </dependencySets> + <containerDescriptorHandlers> + <containerDescriptorHandler> + <handlerName>metaInf-services</handlerName> + </containerDescriptorHandler> + </containerDescriptorHandlers> +</assembly> http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/findbugs/exclude.xml ---------------------------------------------------------------------- diff --git a/java/bench/src/findbugs/exclude.xml b/java/bench/src/findbugs/exclude.xml new file mode 100644 index 0000000..dde1471 --- /dev/null +++ b/java/bench/src/findbugs/exclude.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + <Match> + <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2"/> + </Match> + <Match> + <Class name="~org\.openjdk\.jmh\.infra\.generated.*"/> + </Match> + <Match> + <Class name="~org\.apache\.orc\.bench\.generated.*"/> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java new file mode 100644 index 0000000..0440495 --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public class TrackingLocalFileSystem extends RawLocalFileSystem { + + class TrackingFileInputStream extends RawLocalFileSystem.LocalFSFileInputStream { + public TrackingFileInputStream(Path f) throws IOException { + super(f); + } + + public int read() throws IOException { + statistics.incrementReadOps(1); + return super.read(); + } + + public int read(byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(b, off, len); + } + + public int read(long position, byte[] b, int off, int len) throws IOException { + statistics.incrementReadOps(1); + return super.read(position, b, off, len); + } + } + + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + if (!exists(f)) { + throw new FileNotFoundException(f.toString()); + } + return new FSDataInputStream(new BufferedFSInputStream( + new TrackingFileInputStream(f), bufferSize)); + } + + public FileSystem.Statistics getLocalStatistics() { + return statistics; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java new file mode 100644 index 0000000..18c5d06 --- /dev/null +++ b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; +import org.apache.orc.TypeDescription; + +import java.util.List; + +/** + * Utilities that need the non-public methods from Hive. + */ +public class OrcBenchmarkUtilities { + + public static StructObjectInspector createObjectInspector(TypeDescription schema) { + List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema); + return (StructObjectInspector) OrcStruct.createObjectInspector(0, types); + } + + public static Writable nextObject(VectorizedRowBatch batch, + TypeDescription schema, + int rowId, + Writable obj) { + OrcStruct result = (OrcStruct) obj; + if (result == null) { + result = new OrcStruct(batch.cols.length); + } + List<TypeDescription> childrenTypes = schema.getChildren(); + for(int c=0; c < batch.cols.length; ++c) { + result.setFieldValue(c, RecordReaderImpl.nextValue(batch.cols[c], rowId, + childrenTypes.get(c), result.getFieldValue(c))); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java new file mode 100644 index 0000000..4afaaf1 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class ColumnProjectionBenchmark { + + private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; + private static final Path root; + static { + String value = System.getProperty(ROOT_ENVIRONMENT_NAME); + root = value == null ? null : new Path(value); + } + + @Param({ "github", "sales", "taxi"}) + public String dataset; + + @Param({"none", "snappy", "zlib"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + boolean[] include = new boolean[schema.getMaximumId() + 1]; + // select first two columns + List<TypeDescription> children = schema.getChildren(); + for(int c= children.get(0).getId(); c <= children.get(1).getMaximumId(); ++c) { + include[c] = true; + } + RecordReader rows = reader.rows(new Reader.Options() + .include(include)); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + if ("taxi".equals(dataset)) { + conf.set("columns", "vendor_id,pickup_time"); + conf.set("columns.types", "int,timestamp"); + } else if ("sales".equals(dataset)) { + conf.set("columns", "sales_id,customer_id"); + conf.set("columns.types", "bigint,bigint"); + } else if ("github".equals(dataset)) { + conf.set("columns", "actor,created_at"); + conf.set("columns.types", "struct<avatar_url:string,gravatar_id:string," + + "id:int,login:string,url:string>,timestamp"); + } else { + throw new IllegalArgumentException("Unknown data set " + dataset); + } + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(ColumnProjectionBenchmark.class.getSimpleName()) + .jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() + ).run(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/CompressionKind.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java new file mode 100644 index 0000000..9274de3 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import io.airlift.compress.snappy.SnappyCodec; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Enum for handling the compression codecs for the benchmark + */ +public enum CompressionKind { + NONE(".none"), + ZLIB(".gz"), + SNAPPY(".snappy"); + + CompressionKind(String extendsion) { + this.extension = extendsion; + } + + private final String extension; + + public String getExtension() { + return extension; + } + + public OutputStream create(OutputStream out) throws IOException { + switch (this) { + case NONE: + return out; + case ZLIB: + return new GZIPOutputStream(out); + case SNAPPY: + return new SnappyCodec().createOutputStream(out); + default: + throw new IllegalArgumentException("Unhandled kind " + this); + } + } + + public InputStream read(InputStream in) throws IOException { + switch (this) { + case NONE: + return in; + case ZLIB: + return new GZIPInputStream(in); + case SNAPPY: + return new SnappyCodec().createInputStream(in); + default: + throw new IllegalArgumentException("Unhandled kind " + this); + } + } + + public static CompressionKind fromPath(Path path) { + String name = path.getName(); + int lastDot = name.lastIndexOf('.'); + if (lastDot >= 0) { + String ext = name.substring(lastDot); + for (CompressionKind value : values()) { + if (ext.equals(value.getExtension())) { + return value; + } + } + } + return NONE; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/Driver.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/Driver.java b/java/bench/src/java/org/apache/orc/bench/Driver.java new file mode 100644 index 0000000..c8f1592 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/Driver.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.bench.convert.GenerateVariants; +import org.apache.orc.bench.convert.ScanVariants; + +import java.util.Arrays; + +/** + * A driver tool to call the various benchmark classes. + */ +public class Driver { + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("D", "define", true, "Change configuration settings"); + CommandLine result = new DefaultParser().parse(options, args, true); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("benchmark <command>", options); + System.err.println(); + System.err.println("Commands:"); + System.err.println(" generate - Generate data variants"); + System.err.println(" scan - Scan data variants"); + System.err.println(" read-all - Full table scan benchmark"); + System.err.println(" read-some - Column projection benchmark"); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + args = cli.getArgs(); + String command = args[0]; + args = Arrays.copyOfRange(args, 1, args.length); + switch (command) { + case "generate": + GenerateVariants.main(args); + break; + case "scan": + ScanVariants.main(args); + break; + case "read-all": + FullReadBenchmark.main(args); + break; + case "read-some": + ColumnProjectionBenchmark.main(args); + break; + default: + System.err.println("Unknown command " + command); + System.exit(1); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java new file mode 100644 index 0000000..952f18d --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import com.google.gson.JsonStreamParser; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Fork(1) +public class FullReadBenchmark { + + private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; + private static final Path root; + static { + String value = System.getProperty(ROOT_ENVIRONMENT_NAME); + root = value == null ? null : new Path(value); + } + + @Param({"taxi", "sales", "github"}) + public String dataset; + + @Param({"none", "zlib", "snappy"}) + public String compression; + + @AuxCounters + @State(Scope.Thread) + public static class ExtraCounters { + long bytesRead; + long reads; + long records; + long invocations; + + @Setup(Level.Iteration) + public void clean() { + bytesRead = 0; + reads = 0; + records = 0; + invocations = 0; + } + + @TearDown(Level.Iteration) + public void print() { + System.out.println(); + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } + + public long kilobytes() { + return bytesRead / 1024; + } + + public long records() { + return records; + } + } + + @Benchmark + public void orc(ExtraCounters counters) throws Exception{ + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + OrcFile.ReaderOptions options = OrcFile.readerOptions(conf).filesystem(fs); + Path path = Utilities.getVariant(root, dataset, "orc", compression); + Reader reader = OrcFile.createReader(path, options); + TypeDescription schema = reader.getSchema(); + RecordReader rows = reader.rows(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + counters.records += batch.size; + } + rows.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void avro(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "avro", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + FsInput file = new FsInput(path, conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void parquet(ExtraCounters counters) throws Exception { + JobConf conf = new JobConf(); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + conf.set("fs.defaultFS", "track:///"); + Path path = Utilities.getVariant(root, dataset, "parquet", compression); + FileSystem.Statistics statistics = FileSystem.getStatistics("track:///", + TrackingLocalFileSystem.class); + statistics.reset(); + ParquetInputFormat<ArrayWritable> inputFormat = + new ParquetInputFormat<>(DataWritableReadSupport.class); + + NullWritable nada = NullWritable.get(); + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader = + new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(nada, value)) { + counters.records += 1; + } + recordReader.close(); + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + @Benchmark + public void json(ExtraCounters counters) throws Exception { + Configuration conf = new Configuration(); + TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); + fs.initialize(new URI("file:///"), conf); + FileSystem.Statistics statistics = fs.getLocalStatistics(); + statistics.reset(); + Path path = Utilities.getVariant(root, dataset, "json", compression); + CompressionKind compress = CompressionKind.valueOf(compression); + InputStream input = compress.read(fs.open(path)); + JsonStreamParser parser = + new JsonStreamParser(new InputStreamReader(input, + StandardCharsets.UTF_8)); + while (parser.hasNext()) { + parser.next(); + counters.records += 1; + } + counters.bytesRead += statistics.getBytesRead(); + counters.reads += statistics.getReadOps(); + counters.invocations += 1; + } + + public static void main(String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(FullReadBenchmark.class.getSimpleName()) + .addProfiler("hs_gc") + .jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() + ).run(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java b/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java new file mode 100644 index 0000000..dfe7d43 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class RandomGenerator { + private final TypeDescription schema = TypeDescription.createStruct(); + private final List<Field> fields = new ArrayList<>(); + private final Random random; + + public RandomGenerator(int seed) { + random = new Random(seed); + } + + private abstract class ValueGenerator { + double nullProbability = 0; + abstract void generate(ColumnVector vector, int valueCount); + } + + private class RandomBoolean extends ValueGenerator { + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = random.nextInt(2); + } + } + } + } + + private class RandomList extends ValueGenerator { + private final int minSize; + private final int sizeRange; + private final Field child; + + public RandomList(int minSize, int maxSize, Field child) { + this.minSize = minSize; + this.sizeRange = maxSize - minSize + 1; + this.child = child; + } + + public void generate(ColumnVector v, int valueCount) { + ListColumnVector vector = (ListColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.offsets[r] = vector.childCount; + vector.lengths[r] = random.nextInt(sizeRange) + minSize; + vector.childCount += vector.lengths[r]; + } + } + vector.child.ensureSize(vector.childCount, false); + child.generator.generate(vector.child, vector.childCount); + } + } + + private class RandomStruct extends ValueGenerator { + private final Field[] children; + + public RandomStruct(Field[] children) { + this.children = children; + } + + public void generate(ColumnVector v, int valueCount) { + StructColumnVector vector = (StructColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } + } + for(int c=0; c < children.length; ++c) { + children[c].generator.generate(vector.fields[c], valueCount); + } + } + } + + private abstract class IntegerGenerator extends ValueGenerator { + private final long sign; + private final long mask; + + private IntegerGenerator(TypeDescription.Category kind) { + int bits = getIntegerLength(kind); + mask = bits == 64 ? 0 : -1L << bits; + sign = 1L << (bits - 1); + } + + protected void normalize(LongColumnVector vector, int valueCount) { + // make sure the value stays in range by sign extending it + for(int r=0; r < valueCount; ++r) { + if ((vector.vector[r] & sign) == 0) { + vector.vector[r] &= ~mask; + } else { + vector.vector[r] |= mask; + } + } + } + } + + private class AutoIncrement extends IntegerGenerator { + private long value; + private final long increment; + + private AutoIncrement(TypeDescription.Category kind, long start, + long increment) { + super(kind); + this.value = start; + this.increment = increment; + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() >= nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = value; + value += increment; + } + } + normalize(vector, valueCount); + } + } + + private class RandomInteger extends IntegerGenerator { + + private RandomInteger(TypeDescription.Category kind) { + super(kind); + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + vector.vector[r] = random.nextLong(); + } + } + normalize(vector, valueCount); + } + } + + private class IntegerRange extends IntegerGenerator { + private final long minimum; + private final long range; + private final long limit; + + private IntegerRange(TypeDescription.Category kind, long minimum, + long maximum) { + super(kind); + this.minimum = minimum; + this.range = maximum - minimum + 1; + if (this.range < 0) { + throw new IllegalArgumentException("Can't support a negative range " + + range); + } + limit = (Long.MAX_VALUE / range) * range; + } + + public void generate(ColumnVector v, int valueCount) { + LongColumnVector vector = (LongColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + long rand; + do { + // clear the sign bit + rand = random.nextLong() & Long.MAX_VALUE; + } while (rand >= limit); + vector.vector[r] = (rand % range) + minimum; + } + } + normalize(vector, valueCount); + } + } + + private class StringChooser extends ValueGenerator { + private final byte[][] choices; + private StringChooser(String[] values) { + choices = new byte[values.length][]; + for(int e=0; e < values.length; ++e) { + choices[e] = values[e].getBytes(StandardCharsets.UTF_8); + } + } + + public void generate(ColumnVector v, int valueCount) { + BytesColumnVector vector = (BytesColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + int val = random.nextInt(choices.length); + vector.setRef(r, choices[val], 0, choices[val].length); + } + } + } + } + + private static byte[] concat(byte[] left, byte[] right) { + byte[] result = new byte[left.length + right.length]; + System.arraycopy(left, 0, result, 0, left.length); + System.arraycopy(right, 0, result, left.length, right.length); + return result; + } + + private static byte pickOne(byte[] choices, Random random) { + return choices[random.nextInt(choices.length)]; + } + + private static final byte[] LOWER_CONSONANTS = + "bcdfghjklmnpqrstvwxyz".getBytes(StandardCharsets.UTF_8); + private static final byte[] UPPER_CONSONANTS = + "BCDFGHJKLMNPQRSTVWXYZ".getBytes(StandardCharsets.UTF_8); + private static final byte[] CONSONANTS = + concat(LOWER_CONSONANTS, UPPER_CONSONANTS); + private static final byte[] LOWER_VOWELS = "aeiou".getBytes(StandardCharsets.UTF_8); + private static final byte[] UPPER_VOWELS = "AEIOU".getBytes(StandardCharsets.UTF_8); + private static final byte[] VOWELS = concat(LOWER_VOWELS, UPPER_VOWELS); + private static final byte[] LOWER_LETTERS = + concat(LOWER_CONSONANTS, LOWER_VOWELS); + private static final byte[] UPPER_LETTERS = + concat(UPPER_CONSONANTS, UPPER_VOWELS); + private static final byte[] LETTERS = concat(LOWER_LETTERS, UPPER_LETTERS); + private static final byte[] NATURAL_DIGITS = "123456789".getBytes(StandardCharsets.UTF_8); + private static final byte[] DIGITS = "0123456789".getBytes(StandardCharsets.UTF_8); + + private class StringPattern extends ValueGenerator { + private final byte[] buffer; + private final byte[][] choices; + private final int[] locations; + + private StringPattern(String pattern) { + buffer = pattern.getBytes(StandardCharsets.UTF_8); + int locs = 0; + for(int i=0; i < buffer.length; ++i) { + switch (buffer[i]) { + case 'C': + case 'c': + case 'E': + case 'V': + case 'v': + case 'F': + case 'l': + case 'L': + case 'D': + case 'x': + case 'X': + locs += 1; + break; + default: + break; + } + } + locations = new int[locs]; + choices = new byte[locs][]; + locs = 0; + for(int i=0; i < buffer.length; ++i) { + switch (buffer[i]) { + case 'C': + locations[locs] = i; + choices[locs++] = UPPER_CONSONANTS; + break; + case 'c': + locations[locs] = i; + choices[locs++] = LOWER_CONSONANTS; + break; + case 'E': + locations[locs] = i; + choices[locs++] = CONSONANTS; + break; + case 'V': + locations[locs] = i; + choices[locs++] = UPPER_VOWELS; + break; + case 'v': + locations[locs] = i; + choices[locs++] = LOWER_VOWELS; + break; + case 'F': + locations[locs] = i; + choices[locs++] = VOWELS; + break; + case 'l': + locations[locs] = i; + choices[locs++] = LOWER_LETTERS; + break; + case 'L': + locations[locs] = i; + choices[locs++] = UPPER_LETTERS; + break; + case 'D': + locations[locs] = i; + choices[locs++] = LETTERS; + break; + case 'x': + locations[locs] = i; + choices[locs++] = NATURAL_DIGITS; + break; + case 'X': + locations[locs] = i; + choices[locs++] = DIGITS; + break; + default: + break; + } + } + } + + public void generate(ColumnVector v, int valueCount) { + BytesColumnVector vector = (BytesColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + for(int m=0; m < locations.length; ++m) { + buffer[locations[m]] = pickOne(choices[m], random); + } + vector.setVal(r, buffer, 0, buffer.length); + } + } + } + } + + private class TimestampRange extends ValueGenerator { + private final long minimum; + private final long range; + private final long limit; + + private TimestampRange(String min, String max) { + minimum = Timestamp.valueOf(min).getTime(); + range = Timestamp.valueOf(max).getTime() - minimum + 1; + if (range < 0) { + throw new IllegalArgumentException("Negative range " + range); + } + limit = (Long.MAX_VALUE / range) * range; + } + + public void generate(ColumnVector v, int valueCount) { + TimestampColumnVector vector = (TimestampColumnVector) v; + for(int r=0; r < valueCount; ++r) { + if (nullProbability != 0 && random.nextDouble() < nullProbability) { + v.noNulls = false; + v.isNull[r] = true; + } else { + long rand; + do { + // clear the sign bit + rand = random.nextLong() & Long.MAX_VALUE; + } while (rand >= limit); + vector.time[r] = (rand % range) + minimum; + vector.nanos[r] = random.nextInt(1000000); + } + } + } + } + + private static int getIntegerLength(TypeDescription.Category kind) { + switch (kind) { + case BYTE: + return 8; + case SHORT: + return 16; + case INT: + return 32; + case LONG: + return 64; + default: + throw new IllegalArgumentException("Unhandled type " + kind); + } + } + + public class Field { + private final TypeDescription type; + private Field[] children; + private ValueGenerator generator; + + private Field(TypeDescription type) { + this.type = type; + if (!type.getCategory().isPrimitive()) { + List<TypeDescription> childrenTypes = type.getChildren(); + children = new Field[childrenTypes.size()]; + for(int c=0; c < children.length; ++c) { + children[c] = new Field(childrenTypes.get(c)); + } + } + } + + public Field addAutoIncrement(long start, long increment) { + generator = new AutoIncrement(type.getCategory(), start, increment); + return this; + } + + public Field addIntegerRange(long min, long max) { + generator = new IntegerRange(type.getCategory(), min, max); + return this; + } + + public Field addRandomInt() { + generator = new RandomInteger(type.getCategory()); + return this; + } + + public Field addStringChoice(String... choices) { + if (type.getCategory() != TypeDescription.Category.STRING) { + throw new IllegalArgumentException("Must be string - " + type); + } + generator = new StringChooser(choices); + return this; + } + + public Field addStringPattern(String pattern) { + if (type.getCategory() != TypeDescription.Category.STRING) { + throw new IllegalArgumentException("Must be string - " + type); + } + generator = new StringPattern(pattern); + return this; + } + + public Field addTimestampRange(String start, String end) { + if (type.getCategory() != TypeDescription.Category.TIMESTAMP) { + throw new IllegalArgumentException("Must be timestamp - " + type); + } + generator = new TimestampRange(start, end); + return this; + } + + public Field addBoolean() { + if (type.getCategory() != TypeDescription.Category.BOOLEAN) { + throw new IllegalArgumentException("Must be boolean - " + type); + } + generator = new RandomBoolean(); + return this; + } + + public Field hasNulls(double probability) { + generator.nullProbability = probability; + return this; + } + + public Field addStruct() { + generator = new RandomStruct(children); + return this; + } + + public Field addList(int minSize, int maxSize) { + generator = new RandomList(minSize, maxSize, children[0]); + return this; + } + + public Field getChildField(int child) { + return children[child]; + } + } + + public Field addField(String name, TypeDescription.Category kind) { + TypeDescription type = new TypeDescription(kind); + return addField(name, type); + } + + public Field addField(String name, TypeDescription type) { + schema.addField(name, type); + Field result = new Field(type); + fields.add(result); + return result; + } + + public void generate(VectorizedRowBatch batch, int rowCount) { + batch.reset(); + for(int c=0; c < batch.cols.length; ++c) { + fields.get(c).generator.generate(batch.cols[c], rowCount); + } + batch.size = rowCount; + } + + /** + * Get the schema for the table that is being generated. + * @return + */ + public TypeDescription getSchema() { + return schema; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java b/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java new file mode 100644 index 0000000..2be3537 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +public class SalesGenerator implements BatchReader { + private final RandomGenerator generator; + private long rowsRemaining; + private final static double MOSTLY = 0.99999; + + public SalesGenerator(long rows) { + this(rows, 42); + } + + public SalesGenerator(long rows, int seed) { + generator = new RandomGenerator(seed); + // column 1 + generator.addField("sales_id", TypeDescription.Category.LONG) + .addAutoIncrement(1000000000, 1); + generator.addField("customer_id", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 2000000000); + generator.addField("col3", TypeDescription.Category.LONG) + .addIntegerRange(1, 10000).hasNulls(0.9993100389335173); + + // column 4 + generator.addField("item_category", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.00014784879996054823); + generator.addField("item_count", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000); + generator.addField("change_ts", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 7 + generator.addField("store_location", TypeDescription.Category.STRING) + .addStringChoice("Los Angeles", "New York", "Cupertino", "Sunnyvale", + "Boston", "Chicago", "Seattle", "Jackson", + "Palo Alto", "San Mateo", "San Jose", "Santa Clara", + "Irvine", "Torrance", "Gardena", "Hermosa", "Manhattan") + .hasNulls(0.0004928293332019384); + generator.addField("associate_id", TypeDescription.Category.STRING) + .addStringPattern("MR V").hasNulls(0.05026859198659506); + generator.addField("col9", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000).hasNulls(MOSTLY); + + // column 10 + generator.addField("rebate_id", TypeDescription.Category.STRING) + .addStringPattern("xxxxxx").hasNulls(MOSTLY); + generator.addField("create_ts", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + generator.addField("col13", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000).hasNulls(MOSTLY); + + // column 13 + generator.addField("size", TypeDescription.Category.STRING) + .addStringChoice("Small", "Medium", "Large", "XL") + .hasNulls(0.9503720861465674); + generator.addField("col14", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000); + generator.addField("fulfilled", TypeDescription.Category.BOOLEAN) + .addBoolean(); + + // column 16 + generator.addField("global_id", TypeDescription.Category.STRING) + .addStringPattern("xxxxxxxxxxxxxxxx").hasNulls(0.021388793060962974); + generator.addField("col17", TypeDescription.Category.STRING) + .addStringPattern("L-xx").hasNulls(MOSTLY); + generator.addField("col18", TypeDescription.Category.STRING) + .addStringPattern("ll").hasNulls(MOSTLY); + + // column 19 + generator.addField("col19", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000); + generator.addField("has_rebate", TypeDescription.Category.BOOLEAN) + .addBoolean(); + RandomGenerator.Field list = + generator.addField("col21", + TypeDescription.fromString("array<struct<sub1:bigint,sub2:string," + + "sub3:string,sub4:bigint,sub5:bigint,sub6:string>>")) + .addList(0, 3) + .hasNulls(MOSTLY); + RandomGenerator.Field struct = list.getChildField(0).addStruct(); + struct.getChildField(0).addIntegerRange(0, 10000000); + struct.getChildField(1).addStringPattern("VVVVV"); + struct.getChildField(2).addStringPattern("VVVVVVVV"); + struct.getChildField(3).addIntegerRange(0, 10000000); + struct.getChildField(4).addIntegerRange(0, 10000000); + struct.getChildField(5).addStringPattern("VVVVVVVV"); + + // column 38 + generator.addField("vendor_id", TypeDescription.Category.STRING) + .addStringPattern("Lxxxxxx").hasNulls(0.1870780148834459); + generator.addField("country", TypeDescription.Category.STRING) + .addStringChoice("USA", "Germany", "Ireland", "Canada", "Mexico", + "Denmark").hasNulls(0.0004928293332019384); + + // column 40 + generator.addField("backend_version", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.0005913951998423039); + generator.addField("col41", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 100000000000L); + generator.addField("col42", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + + // column 43 + generator.addField("col43", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 10000000000L).hasNulls(0.9763934749396284); + generator.addField("col44", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000000); + generator.addField("col45", TypeDescription.Category.LONG) + .addIntegerRange(1, 100000000); + + // column 46 + generator.addField("col46", TypeDescription.Category.LONG) + .addIntegerRange(1, 10000000); + generator.addField("col47", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000); + generator.addField("col48", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(MOSTLY); + + // column 49 + generator.addField("col49", TypeDescription.Category.STRING) + .addStringPattern("xxxx").hasNulls(0.0004928293332019384); + generator.addField("col50", TypeDescription.Category.STRING) + .addStringPattern("ll").hasNulls(0.9496821250800848); + generator.addField("col51", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.9999014341333596); + + // column 52 + generator.addField("col52", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(0.9980779656005125); + generator.addField("col53", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + generator.addField("col54", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000000); + + // column 55 + generator.addField("col55", TypeDescription.Category.STRING) + .addStringChoice("X"); + generator.addField("col56", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + generator.addField("col57", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 58 + generator.addField("md5", TypeDescription.Category.LONG) + .addRandomInt(); + generator.addField("col59", TypeDescription.Category.LONG) + .addIntegerRange(1000000000, 10000000000L); + generator.addField("col69", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59") + .hasNulls(MOSTLY); + + // column 61 + generator.addField("col61", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.11399142476960233); + generator.addField("col62", TypeDescription.Category.STRING) + .addStringPattern("X.xx").hasNulls(0.9986200778670347); + generator.addField("col63", TypeDescription.Category.TIMESTAMP) + .addTimestampRange("2003-01-01 00:00:00", "2017-03-14 23:59:59"); + + // column 64 + generator.addField("col64", TypeDescription.Category.LONG) + .addIntegerRange(1, 1000000).hasNulls(MOSTLY); + rowsRemaining = rows; + } + + public boolean nextBatch(VectorizedRowBatch batch) { + int rows = (int) Math.min(batch.getMaxSize(), rowsRemaining); + generator.generate(batch, rows); + rowsRemaining -= rows; + return rows != 0; + } + + @Override + public void close() { + // PASS + } + + public TypeDescription getSchema() { + return generator.getSchema(); + } + + public static void main(String[] args) throws Exception { + SalesGenerator sales = new SalesGenerator(10, 42); + System.out.println("Schema " + sales.getSchema()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/Utilities.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/Utilities.java b/java/bench/src/java/org/apache/orc/bench/Utilities.java new file mode 100644 index 0000000..7016f5e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/Utilities.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Properties; + +public class Utilities { + + public static TypeDescription loadSchema(String name) throws IOException { + InputStream in = Utilities.class.getClassLoader().getResourceAsStream(name); + byte[] buffer= new byte[1 * 1024]; + int len = in.read(buffer); + StringBuilder string = new StringBuilder(); + while (len > 0) { + for(int i=0; i < len; ++i) { + // strip out + if (buffer[i] != '\n' && buffer[i] != ' ') { + string.append((char) buffer[i]); + } + } + len = in.read(buffer); + } + return TypeDescription.fromString(string.toString()); + } + + public static org.apache.orc.CompressionKind getCodec(CompressionKind compression) { + switch (compression) { + case NONE: + return org.apache.orc.CompressionKind.NONE; + case ZLIB: + return org.apache.orc.CompressionKind.ZLIB; + case SNAPPY: + return org.apache.orc.CompressionKind.SNAPPY; + default: + throw new IllegalArgumentException("Unknown compression " + compression); + } + } + + public static Iterable<String> sliceArray(final String[] array, + final int start) { + return new Iterable<String>() { + String[] values = array; + int posn = start; + + @Override + public Iterator<String> iterator() { + return new Iterator<String>() { + @Override + public boolean hasNext() { + return posn < values.length; + } + + @Override + public String next() { + if (posn >= values.length) { + throw new NoSuchElementException("Index off end of array." + + " index = " + posn + " length = " + values.length); + } else { + return values[posn++]; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("No remove"); + } + }; + } + }; + } + + public static Properties convertSchemaToHiveConfig(TypeDescription schema) { + Properties result = new Properties(); + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Hive requires struct root types" + + " instead of " + schema); + } + StringBuilder columns = new StringBuilder(); + StringBuilder types = new StringBuilder(); + List<String> columnNames = schema.getFieldNames(); + List<TypeDescription> columnTypes = schema.getChildren(); + for(int c=0; c < columnNames.size(); ++c) { + if (c != 0) { + columns.append(","); + types.append(","); + } + columns.append(columnNames.get(c)); + types.append(columnTypes.get(c)); + } + result.setProperty(serdeConstants.LIST_COLUMNS, columns.toString()); + result.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); + return result; + } + + public static Path getVariant(Path root, + String data, + String format, + String compress) { + return new Path(root, "generated/" + data + "/" + format + "." + compress); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java b/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java new file mode 100644 index 0000000..b9ea356 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +/** + * Generic interface for reading data. + */ +public interface BatchReader extends AutoCloseable { + + boolean nextBatch(VectorizedRowBatch batch) throws IOException; + + @Override + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java new file mode 100644 index 0000000..c79d937 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import java.io.IOException; + +/** + * Generic interface for writing data. + */ +public interface BatchWriter extends AutoCloseable { + + void writeBatch(VectorizedRowBatch batch) throws IOException; + + @Override + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java b/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java new file mode 100644 index 0000000..7f57468 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.SalesGenerator; +import org.apache.orc.bench.Utilities; +import org.apache.orc.bench.convert.avro.AvroReader; +import org.apache.orc.bench.convert.avro.AvroWriter; +import org.apache.orc.bench.convert.csv.CsvReader; +import org.apache.orc.bench.convert.json.JsonReader; +import org.apache.orc.bench.convert.json.JsonWriter; +import org.apache.orc.bench.convert.orc.OrcReader; +import org.apache.orc.bench.convert.orc.OrcWriter; +import org.apache.orc.bench.convert.parquet.ParquetReader; +import org.apache.orc.bench.convert.parquet.ParquetWriter; + +import java.io.IOException; + +/** + * A tool to create the different variants that we need to benchmark against. + */ +public class GenerateVariants { + + public static BatchWriter createFileWriter(Path file, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress + ) throws IOException { + FileSystem fs = file.getFileSystem(conf); + fs.delete(file, false); + fs.mkdirs(file.getParent()); + switch (format) { + case "json": + return new JsonWriter(file, schema, conf, compress); + case "orc": + return new OrcWriter(file, schema, conf, compress); + case "avro": + return new AvroWriter(file, schema, conf, compress); + case "parquet": + return new ParquetWriter(file, schema, conf, compress); + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + + public static BatchReader createFileReader(Path file, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress + ) throws IOException { + switch (format) { + case "csv": + return new CsvReader(file, schema, conf, compress); + case "json": + return new JsonReader(file, schema, conf, compress); + case "orc": + return new OrcReader(file, schema, conf); + case "avro": + return new AvroReader(file, schema, conf); + case "parquet": + return new ParquetReader(file, schema, conf); + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + + static class RecursiveReader implements BatchReader { + private final RemoteIterator<LocatedFileStatus> filenames; + private final String format; + private final TypeDescription schema; + private final Configuration conf; + private final CompressionKind compress; + private BatchReader current = null; + + RecursiveReader(Path root, + String format, + TypeDescription schema, + Configuration conf, + CompressionKind compress) throws IOException { + FileSystem fs = root.getFileSystem(conf); + filenames = fs.listFiles(root, true); + this.format = format; + this.schema = schema; + this.conf = conf; + this.compress = compress; + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + while (current == null || !current.nextBatch(batch)) { + if (filenames.hasNext()) { + LocatedFileStatus next = filenames.next(); + if (next.isFile()) { + current = createFileReader(next.getPath(), format, schema, conf, + compress); + } + } else { + return false; + } + } + return true; + } + + @Override + public void close() throws IOException { + if (current != null) { + current.close(); + } + } + } + + public static BatchReader createReader(Path root, + String dataName, + TypeDescription schema, + Configuration conf, + long salesRecords) throws IOException { + switch (dataName) { + case "taxi": + return new RecursiveReader(new Path(root, "sources/" + dataName), "csv", + schema, conf, CompressionKind.ZLIB); + case "sales": + return new SalesGenerator(salesRecords); + case "github": + return new RecursiveReader(new Path(root, "sources/" + dataName), "json", + schema, conf, CompressionKind.ZLIB); + default: + throw new IllegalArgumentException("Unknown data name " + dataName); + } + } + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("c", "compress", true, "List of compression") + .addOption("d", "data", true, "List of data sets") + .addOption("f", "format", true, "List of formats") + .addOption("s", "sales", true, "Number of records for sales"); + CommandLine result = new DefaultParser().parse(options, args); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("convert <root>", options); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + String[] compressList = + cli.getOptionValue("compress", "none,snappy,zlib").split(","); + String[] dataList = + cli.getOptionValue("data", "taxi,sales,github").split(","); + String[] formatList = + cli.getOptionValue("format", "avro,json,orc,parquet").split(","); + long records = Long.parseLong(cli.getOptionValue("sales", "25000000")); + Configuration conf = new Configuration(); + Path root = new Path(cli.getArgs()[0]); + for(String data: dataList) { + // Set up the reader + TypeDescription schema = Utilities.loadSchema(data + ".schema"); + BatchReader reader = createReader(root, data, schema, conf, records); + + // Set up the writers for each combination + BatchWriter[] writers = new BatchWriter[compressList.length * formatList.length]; + for(int compress=0; compress < compressList.length; ++compress) { + CompressionKind compressionKind = + CompressionKind.valueOf(compressList[compress].toUpperCase()); + for(int format=0; format < formatList.length; ++format) { + Path outPath = Utilities.getVariant(root, data, formatList[format], + compressList[compress]); + writers[compress * formatList.length + format] = + createFileWriter(outPath, formatList[format], schema, conf, + compressionKind); + } + } + + // Copy the rows + VectorizedRowBatch batch = schema.createRowBatch(); + while (reader.nextBatch(batch)) { + for(BatchWriter writer: writers) { + writer.writeBatch(batch); + } + } + reader.close(); + for(BatchWriter writer: writers) { + writer.close(); + } + } + } +}