http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java b/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java new file mode 100644 index 0000000..ae76238 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; + +/** + * A tool to create the different variants that we need to benchmark against. + */ +public class ScanVariants { + + + static CommandLine parseCommandLine(String[] args) throws ParseException { + Options options = new Options() + .addOption("h", "help", false, "Provide help") + .addOption("c", "compress", true, "List of compression") + .addOption("d", "data", true, "List of data sets") + .addOption("f", "format", true, "List of formats"); + CommandLine result = new DefaultParser().parse(options, args); + if (result.hasOption("help") || result.getArgs().length == 0) { + new HelpFormatter().printHelp("scan <root>", options); + System.exit(1); + } + return result; + } + + public static void main(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + String[] compressList = + cli.getOptionValue("compress", "none,snappy,zlib").split(","); + String[] dataList = + cli.getOptionValue("data", "taxi,sales,github").split(","); + String[] formatList = + cli.getOptionValue("format", "avro,json,orc,parquet").split(","); + Configuration conf = new Configuration(); + Path root = new Path(cli.getArgs()[0]); + for(String data: dataList) { + TypeDescription schema = Utilities.loadSchema(data + ".schema"); + VectorizedRowBatch batch = schema.createRowBatch(); + for (String compress : compressList) { + CompressionKind compressKind = + CompressionKind.valueOf(compress.toUpperCase()); + for (String format : formatList) { + Path filename = Utilities.getVariant(root, data, format, + compress); + BatchReader reader = GenerateVariants.createFileReader(filename, + format, schema, conf, compressKind); + long rows = 0; + long batches = 0; + while (reader.nextBatch(batch)) { + batches += 1; + rows += batch.size; + } + System.out.println(filename + " rows: " + rows + " batches: " + + batches); + reader.close(); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java new file mode 100644 index 0000000..fc354d6 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.List; + +public class AvroReader implements BatchReader { + private final DataFileReader<GenericRecord> dataFileReader; + private GenericRecord record = null; + private final AvroConverter[] converters; + + public AvroReader(Path path, + TypeDescription schema, + Configuration conf) throws IOException { + FsInput file = new FsInput(path, conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + dataFileReader = new DataFileReader<>(file, datumReader); + List<TypeDescription> children = schema.getChildren(); + converters = new AvroConverter[children.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(children.get(c)); + } + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (dataFileReader.hasNext() && batch.size < maxSize) { + record = dataFileReader.next(record); + int row = batch.size++; + for(int c=0; c < converters.length; ++c) { + converters[c].convert(batch.cols[c], row, record.get(c)); + } + } + return batch.size != 0; + } + + @Override + public void close() throws IOException { + dataFileReader.close(); + } + + interface AvroConverter { + void convert(ColumnVector vector, int row, Object value); + } + + private static class BooleanConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Boolean) value).booleanValue() ? 1 : 0; + } + } + } + + private static class IntConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Integer) value).intValue(); + } + } + } + + private static class LongConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((Long) value).longValue(); + } + } + } + + private static class FloatConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((Float) value).floatValue(); + } + } + } + + private static class DoubleConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((Double) value).doubleValue(); + } + } + } + + private static class StringConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + byte[] bytes = ((Utf8) value).getBytes(); + ((BytesColumnVector) cv).setRef(row, bytes, 0, bytes.length); + } + } + } + + private static class BinaryConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ByteBuffer buf = (ByteBuffer) value; + ((BytesColumnVector) cv).setVal(row, buf.array(), buf.arrayOffset(), + buf.remaining()); + } + } + } + + private static class TimestampConverter implements AvroConverter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + TimestampColumnVector tc = (TimestampColumnVector) cv; + tc.time[row] = ((Long) value).longValue(); + tc.nanos[row] = 0; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + DecimalColumnVector tc = (DecimalColumnVector) cv; + tc.vector[row].set(getHiveDecimalFromByteBuffer((ByteBuffer) value, scale)); + } + } + } + + private static class ListConverter implements AvroConverter { + final AvroConverter childConverter; + + ListConverter(TypeDescription schema) { + childConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ListColumnVector tc = (ListColumnVector) cv; + GenericData.Array array = (GenericData.Array) value; + int start = tc.childCount; + int len = array.size(); + tc.childCount += len; + tc.child.ensureSize(tc.childCount, true); + for(int i=0; i < len; ++i) { + childConverter.convert(tc.child, start + i, array.get(i)); + } + } + } + } + + private static class StructConverter implements AvroConverter { + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema) { + List<TypeDescription> children = schema.getChildren(); + childConverters = new AvroConverter[children.size()]; + for(int i=0; i < childConverters.length; ++i) { + childConverters[i] = createConverter(children.get(i)); + } + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + StructColumnVector tc = (StructColumnVector) cv; + GenericData.Record record = (GenericData.Record) value; + for(int c=0; c < tc.fields.length; ++c) { + childConverters[c].convert(tc.fields[c], row, record.get(c)); + } + } + } + } + + static AvroConverter createConverter(TypeDescription types) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types); + case STRUCT: + return new StructConverter(types); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + static byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { + byteBuffer.rewind(); + byte[] result = new byte[byteBuffer.limit()]; + byteBuffer.get(result); + return result; + } + + static HiveDecimal getHiveDecimalFromByteBuffer(ByteBuffer byteBuffer, + int scale) { + byte[] result = getBytesFromByteBuffer(byteBuffer); + HiveDecimal dec = HiveDecimal.create(new BigInteger(result), scale); + return dec; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..6c72a0e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.Schema; +import org.apache.orc.TypeDescription; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * GenerateVariants Hive TypeInfo to an Avro Schema + */ +public class AvroSchemaUtils { + + private AvroSchemaUtils() { + // No instances + } + + public static Schema createAvroSchema(TypeDescription typeInfo) { + Schema schema; + switch (typeInfo.getCategory()) { + case STRING: + schema = Schema.create(Schema.Type.STRING); + break; + case CHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"char\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case VARCHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"varchar\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case BINARY: + schema = Schema.create(Schema.Type.BYTES); + break; + case BYTE: + schema = Schema.create(Schema.Type.INT); + break; + case SHORT: + schema = Schema.create(Schema.Type.INT); + break; + case INT: + schema = Schema.create(Schema.Type.INT); + break; + case LONG: + schema = Schema.create(Schema.Type.LONG); + break; + case FLOAT: + schema = Schema.create(Schema.Type.FLOAT); + break; + case DOUBLE: + schema = Schema.create(Schema.Type.DOUBLE); + break; + case BOOLEAN: + schema = Schema.create(Schema.Type.BOOLEAN); + break; + case DECIMAL: + String precision = String.valueOf(typeInfo.getPrecision()); + String scale = String.valueOf(typeInfo.getScale()); + schema = getSchemaFor("{" + + "\"type\":\"bytes\"," + + "\"logicalType\":\"decimal\"," + + "\"precision\":" + precision + "," + + "\"scale\":" + scale + "}"); + break; + case DATE: + schema = getSchemaFor("{" + + "\"type\":\"int\"," + + "\"logicalType\":\"date\"}"); + break; + case TIMESTAMP: + schema = getSchemaFor("{" + + "\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}"); + break; + case LIST: + schema = createAvroArray(typeInfo); + break; + case MAP: + schema = createAvroMap(typeInfo); + break; + case STRUCT: + schema = createAvroRecord(typeInfo); + break; + case UNION: + schema = createAvroUnion(typeInfo); + break; + default: + throw new UnsupportedOperationException(typeInfo + " is not supported."); + } + + return schema; + } + + private static Schema createAvroUnion(TypeDescription typeInfo) { + List<Schema> childSchemas = new ArrayList<>(); + for (TypeDescription childTypeInfo : typeInfo.getChildren()) { + Schema childSchema = createAvroSchema(childTypeInfo); + if (childSchema.getType() == Schema.Type.UNION) { + for (Schema grandkid: childSchema.getTypes()) { + if (childSchema.getType() != Schema.Type.NULL) { + childSchemas.add(grandkid); + } + } + } else { + childSchemas.add(childSchema); + } + } + + return wrapInUnionWithNull(Schema.createUnion(childSchemas)); + } + + private static Schema createAvroRecord(TypeDescription typeInfo) { + List<Schema.Field> childFields = new ArrayList<>(); + + List<String> fieldNames = typeInfo.getFieldNames(); + List<TypeDescription> fieldTypes = typeInfo.getChildren(); + + for (int i = 0; i < fieldNames.size(); ++i) { + TypeDescription childTypeInfo = fieldTypes.get(i); + Schema.Field field = new Schema.Field(fieldNames.get(i), + wrapInUnionWithNull(createAvroSchema(childTypeInfo)), + childTypeInfo.toString(), + (Object) null); + childFields.add(field); + } + + Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(), + typeInfo.toString(), null, false); + recordSchema.setFields(childFields); + return recordSchema; + } + + private static Schema createAvroMap(TypeDescription typeInfo) { + TypeDescription keyTypeInfo = typeInfo.getChildren().get(0); + if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) { + throw new UnsupportedOperationException("Avro only supports maps with string keys " + + typeInfo); + } + + Schema valueSchema = wrapInUnionWithNull(createAvroSchema + (typeInfo.getChildren().get(1))); + + return Schema.createMap(valueSchema); + } + + private static Schema createAvroArray(TypeDescription typeInfo) { + Schema child = createAvroSchema(typeInfo.getChildren().get(0)); + return Schema.createArray(wrapInUnionWithNull(child)); + } + + private static Schema wrapInUnionWithNull(Schema schema) { + Schema NULL = Schema.create(Schema.Type.NULL); + switch (schema.getType()) { + case NULL: + return schema; + case UNION: + List<Schema> kids = schema.getTypes(); + List<Schema> newKids = new ArrayList<>(kids.size() + 1); + newKids.add(NULL); + return Schema.createUnion(newKids); + default: + return Schema.createUnion(Arrays.asList(NULL, schema)); + } + } + + private static Schema getSchemaFor(String str) { + Schema.Parser parser = new Schema.Parser(); + return parser.parse(str); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java new file mode 100644 index 0000000..44defbf --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class AvroWriter implements BatchWriter { + + interface AvroConverter { + Object convert(ColumnVector vector, int row); + } + + private static class BooleanConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row] != 0; + } else { + return null; + } + } + } + + private static class IntConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return (int) vector.vector[row]; + } else { + return null; + } + } + } + + private static class LongConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class FloatConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return (float) vector.vector[row]; + } else { + return null; + } + } + } + + private static class DoubleConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class StringConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return new String(vector.vector[row], vector.start[row], + vector.length[row], StandardCharsets.UTF_8); + } else { + return null; + } + } + } + + private static class BinaryConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return ByteBuffer.wrap(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class TimestampConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + TimestampColumnVector vector = (TimestampColumnVector) cv; + return vector.time[row]; + } else { + return null; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DecimalColumnVector vector = (DecimalColumnVector) cv; + return getBufferFromDecimal( + vector.vector[row].getHiveDecimal(), scale); + } else { + return null; + } + } + } + + private static class ListConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter childConverter; + + ListConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + childConverter = createConverter(schema.getChildren().get(0), + removeNullable(avroSchema.getElementType())); + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + ListColumnVector vector = (ListColumnVector) cv; + int offset = (int) vector.offsets[row]; + int length = (int) vector.lengths[row]; + GenericData.Array result = new GenericData.Array(length, avroSchema); + for(int i=0; i < length; ++i) { + result.add(childConverter.convert(vector.child, offset + i)); + } + return result; + } else { + return null; + } + } + } + + private static class StructConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + List<TypeDescription> childrenTypes = schema.getChildren(); + childConverters = new AvroConverter[childrenTypes.size()]; + List<Schema.Field> fields = avroSchema.getFields(); + for(int f=0; f < childConverters.length; ++f) { + childConverters[f] = createConverter(childrenTypes.get(f), + removeNullable(fields.get(f).schema())); + } + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + StructColumnVector vector = (StructColumnVector) cv; + GenericData.Record result = new GenericData.Record(avroSchema); + for(int f=0; f < childConverters.length; ++f) { + result.put(f, childConverters[f].convert(vector.fields[f], row)); + } + return result; + } else { + return null; + } + } + } + + static AvroConverter createConverter(TypeDescription types, + Schema avroSchema) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types, avroSchema); + case STRUCT: + return new StructConverter(types, avroSchema); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + /** + * Remove the union(null, ...) wrapper around the schema. + * + * All of the types in Hive are nullable and in Avro those are represented + * by wrapping each type in a union type with the void type. + * @param avro The avro type + * @return The avro type with the nullable layer removed + */ + static Schema removeNullable(Schema avro) { + while (avro.getType() == Schema.Type.UNION) { + List<Schema> children = avro.getTypes(); + if (children.size() == 2 && + children.get(0).getType() == Schema.Type.NULL) { + avro = children.get(1); + } else { + break; + } + } + return avro; + } + + private final AvroConverter[] converters; + private final DataFileWriter writer; + private final GenericRecord record; + + public AvroWriter(Path path, TypeDescription schema, + Configuration conf, + CompressionKind compression) throws IOException { + List<TypeDescription> childTypes = schema.getChildren(); + Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); + List<Schema.Field> avroFields = avroSchema.getFields(); + converters = new AvroConverter[childTypes.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(childTypes.get(c), + removeNullable(avroFields.get(c).schema())); + } + GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); + writer = new DataFileWriter(gdw); + switch (compression) { + case NONE: + break; + case ZLIB: + writer.setCodec(CodecFactory.deflateCodec(-1)); + break; + case SNAPPY: + writer.setCodec(CodecFactory.snappyCodec()); + break; + default: + throw new IllegalArgumentException("Compression unsupported " + compression); + } + writer.create(avroSchema, path.getFileSystem(conf).create(path)); + record = new GenericData.Record(avroSchema); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + for(int f=0; f < batch.cols.length; ++f) { + record.put(f, converters[f].convert(batch.cols[f], r)); + } + writer.append(record); + } + } + + public void close() throws IOException { + writer.close(); + } + + static Buffer getBufferFromBytes(byte[] input) { + ByteBuffer bb = ByteBuffer.wrap(input); + return bb.rewind(); + } + + public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { + if (dec == null) { + return null; + } + + dec = dec.setScale(scale); + return getBufferFromBytes(dec.unscaledValue().toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java new file mode 100644 index 0000000..3246e69 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class CsvReader implements BatchReader { + private final Iterator<CSVRecord> parser; + private final ColumnReader[] readers; + + interface ColumnReader { + void read(String value, ColumnVector vect, int row); + } + + static class LongColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = Long.parseLong(value); + } + } + } + + static class DoubleColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = Double.parseDouble(value); + } + } + } + + static class StringColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value)); + } + } + } + + static class DecimalColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value)); + } + } + } + + ColumnReader createReader(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnReader(); + case FLOAT: + case DOUBLE: + return new DoubleColumnReader(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnReader(); + case DECIMAL: + return new DecimalColumnReader(); + case TIMESTAMP: + return new TimestampColumnReader(); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + public CsvReader(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compress) throws IOException { + FileSystem fs = path.getFileSystem(conf); + InputStream input = compress.read(fs.open(path)); + parser = new CSVParser(new InputStreamReader(input, StandardCharsets.UTF_8), + CSVFormat.RFC4180.withHeader()).iterator(); + List<TypeDescription> columnTypes = schema.getChildren(); + readers = new ColumnReader[columnTypes.size()]; + int c = 0; + for(TypeDescription columnType: columnTypes) { + readers[c++] = createReader(columnType); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (parser.hasNext() && batch.size < maxSize) { + CSVRecord record = parser.next(); + int c = 0; + for(String val: record) { + readers[c].read(val, batch.cols[c], batch.size); + c += 1; + } + batch.size++; + } + return batch.size != 0; + } + + public void close() { + // PASS + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java new file mode 100644 index 0000000..b4ff3122 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.convert.json; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonStreamParser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class JsonReader implements BatchReader { + private final TypeDescription schema; + private final JsonStreamParser parser; + private final JsonConverter[] converters; + + public JsonReader(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compressionKind) throws IOException { + this.schema = schema; + FileSystem fs = path.getFileSystem(conf); + InputStream input = compressionKind.read(fs.open(path)); + parser = new JsonStreamParser(new InputStreamReader(input, + StandardCharsets.UTF_8)); + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Root must be struct - " + schema); + } + List<TypeDescription> fieldTypes = schema.getChildren(); + converters = new JsonConverter[fieldTypes.size()]; + for(int c = 0; c < converters.length; ++c) { + converters[c] = createConverter(fieldTypes.get(c)); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + List<String> fieldNames = schema.getFieldNames(); + while (parser.hasNext() && batch.size < maxSize) { + JsonObject elem = parser.next().getAsJsonObject(); + for(int c=0; c < converters.length; ++c) { + // look up each field to see if it is in the input, otherwise + // set it to null. + JsonElement field = elem.get(fieldNames.get(c)); + if (field == null) { + batch.cols[c].noNulls = false; + batch.cols[c].isNull[batch.size] = true; + } else { + converters[c].convert(field, batch.cols[c], batch.size); + } + } + batch.size++; + } + return batch.size != 0; + } + + public void close() { + // PASS + } + + interface JsonConverter { + void convert(JsonElement value, ColumnVector vect, int row); + } + + static class BooleanColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsBoolean() ? 1 : 0; + } + } + } + + static class LongColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = value.getAsLong(); + } + } + } + + static class DoubleColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = value.getAsDouble(); + } + } + } + + static class StringColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getAsString().getBytes(StandardCharsets.UTF_8); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class BinaryColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + String binStr = value.getAsString(); + byte[] bytes = new byte[binStr.length()/2]; + for(int i=0; i < bytes.length; ++i) { + bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16); + } + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value.getAsString() + .replaceAll("[TZ]", " "))); + } + } + } + + static class DecimalColumnConverter implements JsonConverter { + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value.getAsString())); + } + } + } + + static class StructColumnConverter implements JsonConverter { + private JsonConverter[] childrenConverters; + private List<String> fieldNames; + + public StructColumnConverter(TypeDescription schema) { + List<TypeDescription> kids = schema.getChildren(); + childrenConverters = new JsonConverter[kids.size()]; + for(int c=0; c < childrenConverters.length; ++c) { + childrenConverters[c] = createConverter(kids.get(c)); + } + fieldNames = schema.getFieldNames(); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + StructColumnVector vector = (StructColumnVector) vect; + JsonObject obj = value.getAsJsonObject(); + for(int c=0; c < childrenConverters.length; ++c) { + JsonElement elem = obj.get(fieldNames.get(c)); + childrenConverters[c].convert(elem, vector.fields[c], row); + } + } + } + } + + static class ListColumnConverter implements JsonConverter { + private JsonConverter childrenConverter; + + public ListColumnConverter(TypeDescription schema) { + childrenConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(JsonElement value, ColumnVector vect, int row) { + if (value == null || value.isJsonNull()) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + ListColumnVector vector = (ListColumnVector) vect; + JsonArray obj = value.getAsJsonArray(); + vector.lengths[row] = obj.size(); + vector.offsets[row] = vector.childCount; + vector.childCount += vector.lengths[row]; + vector.child.ensureSize(vector.childCount, true); + for(int c=0; c < obj.size(); ++c) { + childrenConverter.convert(obj.get(c), vector.child, + (int) vector.offsets[row] + c); + } + } + } + } + + static JsonConverter createConverter(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnConverter(); + case FLOAT: + case DOUBLE: + return new DoubleColumnConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnConverter(); + case DECIMAL: + return new DecimalColumnConverter(); + case TIMESTAMP: + return new TimestampColumnConverter(); + case BINARY: + return new BinaryColumnConverter(); + case BOOLEAN: + return new BooleanColumnConverter(); + case STRUCT: + return new StructColumnConverter(schema); + case LIST: + return new ListColumnConverter(schema); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java new file mode 100644 index 0000000..bd41115 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class JsonWriter implements BatchWriter { + private final Writer outStream; + private final JsonGenerator writer; + private final TypeDescription schema; + + public JsonWriter(Path path, TypeDescription schema, + Configuration conf, + CompressionKind compression) throws IOException { + OutputStream file = path.getFileSystem(conf).create(path, true); + outStream = new OutputStreamWriter(compression.create(file), + StandardCharsets.UTF_8); + JsonFactory factory = new JsonFactory(); + factory.setRootValueSeparator("\n"); + writer = factory.createGenerator(outStream); + this.schema = schema; + } + + private static void printMap(JsonGenerator writer, + MapColumnVector vector, + TypeDescription schema, + int row) throws IOException { + writer.writeStartArray(); + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + int offset = (int) vector.offsets[row]; + for (int i = 0; i < vector.lengths[row]; ++i) { + writer.writeStartObject(); + writer.writeFieldName("_key"); + printValue(writer, vector.keys, keyType, offset + i); + writer.writeFieldName("_value"); + printValue(writer, vector.values, valueType, offset + i); + writer.writeEndObject(); + } + writer.writeEndArray(); + } + + private static void printList(JsonGenerator writer, + ListColumnVector vector, + TypeDescription schema, + int row) throws IOException { + writer.writeStartArray(); + int offset = (int) vector.offsets[row]; + TypeDescription childType = schema.getChildren().get(0); + for (int i = 0; i < vector.lengths[row]; ++i) { + printValue(writer, vector.child, childType, offset + i); + } + writer.writeEndArray(); + } + + private static void printUnion(JsonGenerator writer, + UnionColumnVector vector, + TypeDescription schema, + int row) throws IOException { + int tag = vector.tags[row]; + printValue(writer, vector.fields[tag], schema.getChildren().get(tag), row); + } + + static void printStruct(JsonGenerator writer, + StructColumnVector batch, + TypeDescription schema, + int row) throws IOException { + writer.writeStartObject(); + List<String> fieldNames = schema.getFieldNames(); + List<TypeDescription> fieldTypes = schema.getChildren(); + for (int i = 0; i < fieldTypes.size(); ++i) { + writer.writeFieldName(fieldNames.get(i)); + printValue(writer, batch.fields[i], fieldTypes.get(i), row); + } + writer.writeEndObject(); + } + + static void printBinary(JsonGenerator writer, BytesColumnVector vector, + int row) throws IOException { + StringBuilder buffer = new StringBuilder(); + int offset = vector.start[row]; + for(int i=0; i < vector.length[row]; ++i) { + int value = 0xff & (int) vector.vector[row][offset + i]; + buffer.append(String.format("%02x", value)); + } + writer.writeString(buffer.toString()); + } + + static void printValue(JsonGenerator writer, ColumnVector vector, + TypeDescription schema, int row) throws IOException { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + switch (schema.getCategory()) { + case BOOLEAN: + writer.writeBoolean(((LongColumnVector) vector).vector[row] != 0); + break; + case BYTE: + case SHORT: + case INT: + case LONG: + writer.writeNumber(((LongColumnVector) vector).vector[row]); + break; + case FLOAT: + case DOUBLE: + writer.writeNumber(((DoubleColumnVector) vector).vector[row]); + break; + case STRING: + case CHAR: + case VARCHAR: + writer.writeString(((BytesColumnVector) vector).toString(row)); + break; + case BINARY: + printBinary(writer, (BytesColumnVector) vector, row); + break; + case DECIMAL: + writer.writeString(((DecimalColumnVector) vector).vector[row].toString()); + break; + case DATE: + writer.writeString(new DateWritable( + (int) ((LongColumnVector) vector).vector[row]).toString()); + break; + case TIMESTAMP: + writer.writeString(((TimestampColumnVector) vector) + .asScratchTimestamp(row).toString()); + break; + case LIST: + printList(writer, (ListColumnVector) vector, schema, row); + break; + case MAP: + printMap(writer, (MapColumnVector) vector, schema, row); + break; + case STRUCT: + printStruct(writer, (StructColumnVector) vector, schema, row); + break; + case UNION: + printUnion(writer, (UnionColumnVector) vector, schema, row); + break; + default: + throw new IllegalArgumentException("Unknown type " + + schema.toString()); + } + } else { + writer.writeNull(); + } + } + + static void printRow(JsonGenerator writer, + VectorizedRowBatch batch, + TypeDescription schema, + int row) throws IOException { + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + List<TypeDescription> fieldTypes = schema.getChildren(); + List<String> fieldNames = schema.getFieldNames(); + writer.writeStartObject(); + for (int c = 0; c < batch.cols.length; ++c) { + writer.writeFieldName(fieldNames.get(c)); + printValue(writer, batch.cols[c], fieldTypes.get(c), row); + } + writer.writeEndObject(); + } else { + printValue(writer, batch.cols[0], schema, row); + } + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for (int r = 0; r < batch.size; ++r) { + printRow(writer, batch, schema, r); + } + } + + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java new file mode 100644 index 0000000..e648856 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; + +public class OrcReader implements BatchReader { + private final RecordReader reader; + + public OrcReader(Path path, + TypeDescription schema, + Configuration conf + ) throws IOException { + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + reader = file.rows(file.options().schema(schema)); + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + return reader.nextBatch(batch); + } + + public void close() throws IOException { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java new file mode 100644 index 0000000..af5de9b --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; + +import java.io.IOException; + +public class OrcWriter implements BatchWriter { + private final Writer writer; + + public OrcWriter(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compression + ) throws IOException { + writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf) + .setSchema(schema) + .compress(Utilities.getCodec(compression))); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + writer.addRowBatch(batch); + } + + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java new file mode 100644 index 0000000..83f70f4 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchReader; + +import java.io.IOException; +import java.util.List; + +public class ParquetReader implements BatchReader { + + private final NullWritable nada = NullWritable.get(); + private final RecordReader<NullWritable,ArrayWritable> reader; + private final ArrayWritable value; + private final Converter[] converters; + + public ParquetReader(Path path, + TypeDescription schema, + Configuration conf) throws IOException { + FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); + JobConf jobConf = new JobConf(conf); + reader = new MapredParquetInputFormat().getRecordReader(split, jobConf, + Reporter.NULL); + value = reader.createValue(); + converters = new Converter[schema.getChildren().size()]; + List<TypeDescription> children = schema.getChildren(); + for(int c = 0; c < converters.length; ++c) { + converters[c] = createConverter(children.get(c)); + } + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (batch.size < maxSize && reader.next(nada, value)) { + Writable[] values = value.get(); + int row = batch.size++; + for(int c=0; c < batch.cols.length; ++c) { + converters[c].convert(batch.cols[c], row, values[c]); + } + } + return batch.size != 0; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + interface Converter { + void convert(ColumnVector vector, int row, Object value); + } + + private static class BooleanConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((BooleanWritable) value).get() ? 1 : 0; + } + } + } + + private static class IntConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((IntWritable) value).get(); + } + } + } + + private static class LongConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((LongColumnVector) cv).vector[row] = + ((LongWritable) value).get(); + } + } + } + + private static class FloatConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((FloatWritable) value).get(); + } + } + } + + private static class DoubleConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ((DoubleColumnVector) cv).vector[row] = + ((DoubleWritable) value).get(); + } + } + } + + private static class StringConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + Text castValue = (Text) value; + ((BytesColumnVector) cv).setVal(row, castValue.getBytes(), 0, + castValue.getLength()); + } + } + } + + private static class BinaryConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + BytesWritable buf = (BytesWritable) value; + ((BytesColumnVector) cv).setVal(row, buf.getBytes(), 0, + buf.getLength()); + } + } + } + + private static class TimestampConverter implements Converter { + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + TimestampColumnVector tc = (TimestampColumnVector) cv; + tc.time[row] = ((TimestampWritable) value).getSeconds(); + tc.nanos[row] = ((TimestampWritable) value).getNanos(); + } + } + } + + private static class DecimalConverter implements Converter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + DecimalColumnVector tc = (DecimalColumnVector) cv; + tc.vector[row].set((HiveDecimalWritable) value); + } + } + } + + private static class ListConverter implements Converter { + final Converter childConverter; + + ListConverter(TypeDescription schema) { + childConverter = createConverter(schema.getChildren().get(0)); + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + ListColumnVector tc = (ListColumnVector) cv; + Writable[] array = ((ArrayWritable) value).get(); + int start = tc.childCount; + int len = array.length; + tc.childCount += len; + tc.child.ensureSize(tc.childCount, true); + for(int i=0; i < len; ++i) { + childConverter.convert(tc.child, start + i, array[i]); + } + } + } + } + + private static class StructConverter implements Converter { + final Converter[] childConverters; + + StructConverter(TypeDescription schema) { + List<TypeDescription> children = schema.getChildren(); + childConverters = new Converter[children.size()]; + for(int i=0; i < childConverters.length; ++i) { + childConverters[i] = createConverter(children.get(i)); + } + } + + public void convert(ColumnVector cv, int row, Object value) { + if (value == null) { + cv.noNulls = false; + cv.isNull[row] = true; + } else { + StructColumnVector tc = (StructColumnVector) cv; + Writable[] record = ((ArrayWritable) value).get(); + for(int c=0; c < tc.fields.length; ++c) { + childConverters[c].convert(tc.fields[c], row, record[c]); + } + } + } + } + + static Converter createConverter(TypeDescription types) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types); + case STRUCT: + return new StructConverter(types); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/48ba9241/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java new file mode 100644 index 0000000..075060e --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.convert.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcBenchmarkUtilities; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.convert.BatchWriter; +import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.Utilities; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.Properties; + +public class ParquetWriter implements BatchWriter { + private final FileSinkOperator.RecordWriter writer; + private final TypeDescription schema; + private final ParquetHiveRecord record; + + public ParquetWriter(Path path, + TypeDescription schema, + Configuration conf, + CompressionKind compression + ) throws IOException { + JobConf jobConf = new JobConf(conf); + Properties tableProperties = Utilities.convertSchemaToHiveConfig(schema); + this.schema = schema; + jobConf.set(ParquetOutputFormat.COMPRESSION, getCodec(compression).name()); + writer = new MapredParquetOutputFormat().getHiveRecordWriter(jobConf, path, + ParquetHiveRecord.class, compression != CompressionKind.NONE, + tableProperties, Reporter.NULL); + record = new ParquetHiveRecord(null, + OrcBenchmarkUtilities.createObjectInspector(schema)); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + record.value = OrcBenchmarkUtilities.nextObject(batch, schema, r, + (Writable) record.value); + writer.write(record); + } + } + + public void close() throws IOException { + writer.close(false); + } + + public static CompressionCodecName getCodec(CompressionKind kind) { + switch (kind) { + case NONE: + return CompressionCodecName.UNCOMPRESSED; + case ZLIB: + return CompressionCodecName.GZIP; + case SNAPPY: + return CompressionCodecName.SNAPPY; + default: + throw new IllegalArgumentException("Unsupported codec " + kind); + } + } +}