CRUNCH-450: Adding crunch-hive module w/ORC file support. Contributed by Zhong Wang.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/363c8243 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/363c8243 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/363c8243 Branch: refs/heads/master Commit: 363c8243b90e5df0394e8edb547f268b2408f250 Parents: dee0fcf Author: Josh Wills <[email protected]> Authored: Mon Aug 11 15:31:14 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 12 09:25:06 2014 -0700 ---------------------------------------------------------------------- crunch-hive/pom.xml | 75 +++++ .../crunch/io/orc/OrcFileSourceTargetIT.java | 161 +++++++++++ .../crunch/io/orc/OrcCrunchInputFormat.java | 98 +++++++ .../crunch/io/orc/OrcCrunchOutputFormat.java | 68 +++++ .../crunch/io/orc/OrcFileReaderFactory.java | 120 ++++++++ .../org/apache/crunch/io/orc/OrcFileSource.java | 102 +++++++ .../crunch/io/orc/OrcFileSourceTarget.java | 48 ++++ .../org/apache/crunch/io/orc/OrcFileTarget.java | 51 ++++ .../org/apache/crunch/io/orc/OrcFileWriter.java | 78 ++++++ .../apache/crunch/io/orc/OrcReadableData.java | 47 ++++ .../org/apache/crunch/io/orc/OrcWritable.java | 124 +++++++++ .../org/apache/crunch/types/orc/OrcUtils.java | 262 ++++++++++++++++++ .../java/org/apache/crunch/types/orc/Orcs.java | 274 +++++++++++++++++++ .../crunch/types/orc/TupleObjectInspector.java | 233 ++++++++++++++++ .../crunch/io/orc/OrcFileReaderFactoryTest.java | 65 +++++ .../crunch/io/orc/OrcFileReaderWriterTest.java | 55 ++++ .../org/apache/crunch/io/orc/OrcFileTest.java | 48 ++++ .../apache/crunch/io/orc/OrcWritableTest.java | 125 +++++++++ .../crunch/test/orc/pojos/AddressBook.java | 141 ++++++++++ .../apache/crunch/test/orc/pojos/Person.java | 101 +++++++ .../org/apache/crunch/types/orc/OrcsTest.java | 141 ++++++++++ .../types/orc/TupleObjectInspectorTest.java | 73 +++++ pom.xml | 8 + 23 files changed, 2498 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-hive/pom.xml b/crunch-hive/pom.xml new file mode 100644 index 0000000..aef85c3 --- /dev/null +++ b/crunch-hive/pom.xml @@ -0,0 +1,75 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>0.11.0-SNAPSHOT</version> + </parent> + + <artifactId>crunch-hive</artifactId> + <name>Apache Crunch Hive</name> + + <dependencies> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <scope>test</scope> <!-- only needed for LocalJobRunner --> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java b/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java new file mode 100644 index 0000000..8cf467c --- /dev/null +++ b/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.TupleN; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.orc.OrcFileSource; +import org.apache.crunch.io.orc.OrcFileTarget; +import org.apache.crunch.io.orc.OrcFileWriter; +import org.apache.crunch.test.orc.pojos.Person; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.orc.OrcUtils; +import org.apache.crunch.types.orc.Orcs; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class OrcFileSourceTargetIT extends OrcFileTest implements Serializable { + + private void generateInputData() throws IOException { + String typeStr = "struct<name:string,age:int,numbers:array<string>>"; + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + OrcStruct s = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23), + Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913"))); + + OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, new Path(tempPath, "input.orc"), Orcs.orcs(typeInfo)); + writer.write(s); + writer.close(); + } + + private <T> void testSourceTarget(PType<T> ptype, T expected) { + Path inputPath = new Path(tempPath, "input.orc"); + Path outputPath = new Path(tempPath, "output"); + + Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf); + OrcFileSource<T> source = new OrcFileSource<T>(inputPath, ptype); + PCollection<T> rows = pipeline.read(source); + List<T> result = Lists.newArrayList(rows.materialize()); + + assertEquals(Lists.newArrayList(expected), result); + + OrcFileTarget target = new OrcFileTarget(outputPath); + pipeline.write(rows, target); + + assertTrue(pipeline.done().succeeded()); + + OrcFileReaderFactory<T> reader = new OrcFileReaderFactory<T>(ptype); + List<T> newResult = Lists.newArrayList(reader.read(fs, inputPath)); + + assertEquals(Lists.newArrayList(expected), newResult); + } + + @Test + public void testOrcs() throws IOException { + generateInputData(); + + String typeStr = "struct<name:string,age:int,numbers:array<string>>"; + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + OrcStruct expected = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23), + Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913"))); + + testSourceTarget(Orcs.orcs(typeInfo), expected); + } + + @Test + public void testReflects() throws IOException { + generateInputData(); + Person expected = new Person("Alice", 23, Arrays.asList("919-342-5555", "650-333-2913")); + testSourceTarget(Orcs.reflects(Person.class), expected); + } + + @Test + public void testTuples() throws IOException { + generateInputData(); + TupleN expected = new TupleN("Alice", 23, Arrays.asList("919-342-5555", "650-333-2913")); + testSourceTarget(Orcs.tuples(Writables.strings(), Writables.ints(), Writables.collections(Writables.strings())), + expected); + } + + @Test + public void testColumnPruning() throws IOException { + generateInputData(); + + Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf); + int[] readColumns = {0, 1}; + OrcFileSource<Person> source = new OrcFileSource<Person>(new Path(tempPath, "input.orc"), + Orcs.reflects(Person.class), readColumns); + PCollection<Person> rows = pipeline.read(source); + List<Person> result = Lists.newArrayList(rows.materialize()); + + Person expected = new Person("Alice", 23, null); + assertEquals(Lists.newArrayList(expected), result); + } + + @Test + public void testGrouping() throws IOException { + String typeStr = "struct<name:string,age:int,numbers:array<string>>"; + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + OrcStruct s1 = OrcUtils.createOrcStruct(typeInfo, new Text("Bob"), new IntWritable(28), null); + OrcStruct s2 = OrcUtils.createOrcStruct(typeInfo, new Text("Bob"), new IntWritable(28), null); + OrcStruct s3 = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23), + Arrays.asList(new Text("444-333-9999"))); + OrcStruct s4 = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(36), + Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913"))); + + Path inputPath = new Path(tempPath, "input.orc"); + OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, inputPath, Orcs.orcs(typeInfo)); + writer.write(s1); + writer.write(s2); + writer.write(s3); + writer.write(s4); + writer.close(); + + Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf); + OrcFileSource<Person> source = new OrcFileSource<Person>(inputPath, Orcs.reflects(Person.class)); + PCollection<Person> rows = pipeline.read(source); + PTable<Person, Long> count = rows.count(); + + List<Pair<Person, Long>> result = Lists.newArrayList(count.materialize()); + List<Pair<Person, Long>> expected = Lists.newArrayList( + Pair.of(new Person("Alice", 23, Arrays.asList("444-333-9999")), 1L), + Pair.of(new Person("Alice", 36, Arrays.asList("919-342-5555", "650-333-2913")), 1L), + Pair.of(new Person("Bob", 28, null), 2L)); + + assertEquals(expected, result); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java new file mode 100644 index 0000000..3ffc88b --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class OrcCrunchInputFormat extends InputFormat<NullWritable, OrcWritable> { + + private OrcNewInputFormat inputFormat = new OrcNewInputFormat(); + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, + InterruptedException { + return inputFormat.getSplits(context); + } + + @Override + public RecordReader<NullWritable, OrcWritable> createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + RecordReader<NullWritable, OrcStruct> reader = inputFormat.createRecordReader( + split, context); + return new OrcCrunchRecordReader(reader); + } + + static class OrcCrunchRecordReader extends RecordReader<NullWritable, OrcWritable> { + + private final RecordReader<NullWritable, OrcStruct> reader; + private OrcWritable value = new OrcWritable(); + + OrcCrunchRecordReader(RecordReader<NullWritable, OrcStruct> reader) { + this.reader = reader; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { + return NullWritable.get(); + } + + @Override + public OrcWritable getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return reader.getProgress(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext = reader.nextKeyValue(); + if (hasNext) { + value.set(reader.getCurrentValue()); + } + return hasNext; + } + + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java new file mode 100644 index 0000000..2d56503 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class OrcCrunchOutputFormat extends FileOutputFormat<NullWritable, OrcWritable> { + + private OrcNewOutputFormat outputFormat = new OrcNewOutputFormat(); + + @Override + public RecordWriter<NullWritable, OrcWritable> getRecordWriter( + TaskAttemptContext job) throws IOException, InterruptedException { + RecordWriter<NullWritable, Writable> writer = outputFormat.getRecordWriter(job); + return new OrcCrunchRecordWriter(writer); + } + + static class OrcCrunchRecordWriter extends RecordWriter<NullWritable, OrcWritable> { + + private final RecordWriter<NullWritable, Writable> writer; + private final OrcSerde orcSerde; + + OrcCrunchRecordWriter(RecordWriter<NullWritable, Writable> writer) { + this.writer = writer; + this.orcSerde = new OrcSerde(); + } + + @Override + public void write(NullWritable key, OrcWritable value) throws IOException, + InterruptedException { + if (value.get() == null) { + throw new NullPointerException("Cannot write null records to orc file"); + } + writer.write(key, orcSerde.serialize(value.get(), value.getObjectInspector())); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + writer.close(context); + } + + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java new file mode 100644 index 0000000..abc0ec8 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.util.Iterator; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.types.PType; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import com.google.common.collect.UnmodifiableIterator; + +public class OrcFileReaderFactory<T> implements FileReaderFactory<T> { + + private MapFn<Object, T> inputFn; + private OrcInputFormat inputFormat = new OrcInputFormat(); + private int[] readColumns; + + public OrcFileReaderFactory(PType<T> ptype) { + this(ptype, null); + } + + public OrcFileReaderFactory(PType<T> ptype, int[] readColumns) { + inputFn = ptype.getInputMapFn(); + this.readColumns = readColumns; + } + + @Override + public Iterator<T> read(FileSystem fs, final Path path) { + try { + if (!fs.isFile(path)) { + throw new CrunchRuntimeException("Not a file: " + path); + } + + inputFn.initialize(); + + FileStatus status = fs.getFileStatus(path); + FileSplit split = new FileSplit(path, 0, status.getLen(), new String[0]); + + JobConf conf = new JobConf(); + if (readColumns != null) { + conf.setBoolean(OrcFileSource.HIVE_READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, OrcFileSource.getColumnIdsStr(readColumns)); + } + final RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(split, conf, Reporter.NULL); + + return new UnmodifiableIterator<T>() { + + private boolean checked = false; + private boolean hasNext; + private OrcStruct value; + private OrcWritable writable = new OrcWritable(); + + @Override + public boolean hasNext() { + try { + if (value == null) { + value = reader.createValue(); + } + if (!checked) { + hasNext = reader.next(NullWritable.get(), value); + checked = true; + } + return hasNext; + } catch (Exception e) { + throw new CrunchRuntimeException("Error while reading local file: " + path, e); + } + } + + @Override + public T next() { + try { + if (value == null) { + value = reader.createValue(); + } + if (!checked) { + reader.next(NullWritable.get(), value); + } + checked = false; + writable.set(value); + return inputFn.map(writable); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while reading local file: " + path, e); + } + } + + }; + } catch (Exception e) { + throw new CrunchRuntimeException("Error while reading local file: " + path, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java new file mode 100644 index 0000000..8689b6c --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.IOException; +import java.util.List; + +import org.apache.crunch.ReadableData; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; + +public class OrcFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> { + + private int[] readColumns; + + public static final String HIVE_READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; + + private static <S> FormatBundle<OrcCrunchInputFormat> getBundle(int[] readColumns) { + FormatBundle<OrcCrunchInputFormat> fb = FormatBundle.forInput(OrcCrunchInputFormat.class); + if (readColumns != null) { // setting configurations for column pruning + fb.set(HIVE_READ_ALL_COLUMNS, "false"); + fb.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, getColumnIdsStr(readColumns)); + } + return fb; + } + + static String getColumnIdsStr(int[] columns) { + StringBuilder sb = new StringBuilder(); + for (int c : columns) { + sb.append(c); + sb.append(','); + } + return sb.length() > 0 ? sb.substring(0, sb.length() - 1) : ""; + } + + public OrcFileSource(Path path, PType<T> ptype) { + this(path, ptype, null); + } + + /** + * Constructor for column pruning optimization + * + * @param path + * @param ptype + * @param readColumns columns which will be read + */ + public OrcFileSource(Path path, PType<T> ptype, int[] readColumns) { + super(path, ptype, getBundle(readColumns)); + this.readColumns = readColumns; + } + + public OrcFileSource(List<Path> paths, PType<T> ptype) { + this(paths, ptype, null); + } + + /** + * Constructor for column pruning optimization + * + * @param paths + * @param ptype + * @param columns columns which will be reserved + */ + public OrcFileSource(List<Path> paths, PType<T> ptype, int[] columns) { + super(paths, ptype, getBundle(columns)); + } + + @Override + public String toString() { + return "Orc(" + pathsAsString() + ")"; + } + + @Override + public Iterable<T> read(Configuration conf) throws IOException { + return read(conf, new OrcFileReaderFactory<T>(ptype, readColumns)); + } + + @Override + public ReadableData<T> asReadable() { + return new OrcReadableData<T>(this.paths, ptype, readColumns); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java new file mode 100644 index 0000000..0226864 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.PathTarget; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; +import org.apache.crunch.types.PType; +import org.apache.hadoop.fs.Path; + +public class OrcFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> { + + public OrcFileSourceTarget(Path path, PType<T> ptype) { + this(path, ptype, SequentialFileNamingScheme.getInstance()); + } + + public OrcFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNameScheme) { + this(new OrcFileSource<T>(path, ptype), new OrcFileTarget(path), fileNameScheme); + } + + public OrcFileSourceTarget(ReadableSource<T> source, PathTarget target, + FileNamingScheme fileNamingScheme) { + super(source, target, fileNamingScheme); + } + + @Override + public String toString() { + return target.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java new file mode 100644 index 0000000..0ad0434 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.PType; +import org.apache.hadoop.fs.Path; + +public class OrcFileTarget extends FileTargetImpl { + + public OrcFileTarget(String path) { + this(new Path(path)); + } + + public OrcFileTarget(Path path) { + this(path, SequentialFileNamingScheme.getInstance()); + } + + public OrcFileTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, OrcCrunchOutputFormat.class, fileNamingScheme); + } + + @Override + public String toString() { + return "Orc(" + path.toString() + ")"; + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + return new OrcFileSourceTarget<T>(path, ptype); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java new file mode 100644 index 0000000..ebfe1fe --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.crunch.MapFn; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * A writer class which is corresponding to OrcFileReaderFactory. Mainly used + * for test purpose + * + * @param <T> + */ +public class OrcFileWriter<T> implements Closeable { + + private RecordWriter<NullWritable, Object> writer; + private MapFn<T, Object> mapFn; + private final OrcSerde serde; + + static class NullProgress implements Progressable { + @Override + public void progress() { + } + } + + public OrcFileWriter(Configuration conf, Path path, PType<T> pType) throws IOException { + JobConf jobConf = new JobConf(conf); + OutputFormat outputFormat = new OrcOutputFormat(); + writer = outputFormat.getRecordWriter(null, jobConf, path.toString(), new NullProgress()); + + mapFn = pType.getOutputMapFn(); + mapFn.initialize(); + + serde = new OrcSerde(); + } + + public void write(T t) throws IOException { + OrcWritable ow = (OrcWritable) mapFn.map(t); + if (ow.get() == null) { + throw new NullPointerException("Cannot write null records to orc file"); + } + writer.write(NullWritable.get(), serde.serialize(ow.get(), ow.getObjectInspector())); + } + + @Override + public void close() throws IOException { + writer.close(Reporter.NULL); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java new file mode 100644 index 0000000..6f342bf --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.util.List; + +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.ReadableDataImpl; +import org.apache.crunch.types.PType; +import org.apache.hadoop.fs.Path; + +public class OrcReadableData<T> extends ReadableDataImpl<T> { + + private final PType<T> ptype; + private final int[] readColumns; + + public OrcReadableData(List<Path> paths, PType<T> ptype) { + this(paths, ptype, null); + } + + public OrcReadableData(List<Path> paths, PType<T> ptype, int[] readColumns) { + super(paths); + this.ptype = ptype; + this.readColumns = readColumns; + } + + @Override + public FileReaderFactory<T> getFileReaderFactory() { + return new OrcFileReaderFactory<T>(ptype, readColumns); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java new file mode 100644 index 0000000..883d0f0 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.types.orc.OrcUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; + +public class OrcWritable implements WritableComparable<OrcWritable> { + + private OrcStruct orc; + private ObjectInspector oi; // object inspector for orc struct + + private BytesWritable blob; // serialized from orc struct + private BinarySortableSerDe serde; + + @Override + public void write(DataOutput out) throws IOException { + serialize(); + blob.write(out); + } + + private void serialize() { + try { + if (blob == null) { + // Make a copy since BinarySortableSerDe will reuse the byte buffer. + // This is not very efficient for the current implementation. Shall we + // implement a no-reuse version of BinarySortableSerDe? + byte[] bytes = ((BytesWritable) serde.serialize(orc, oi)).getBytes(); + byte[] newBytes = new byte[bytes.length]; + System.arraycopy(bytes, 0, newBytes, 0, bytes.length); + blob = new BytesWritable(newBytes); + } + } catch (SerDeException e) { + throw new CrunchRuntimeException("Unable to serialize object: " + + orc); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + blob = new BytesWritable(); + blob.readFields(in); + orc = null; // the orc struct is stale + } + + @Override + public int compareTo(OrcWritable arg0) { + serialize(); + arg0.serialize(); + return ((Comparable) blob).compareTo((Comparable) arg0.blob); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + return compareTo((OrcWritable) obj) == 0; + } + + public void setSerde(BinarySortableSerDe serde) { + this.serde = serde; + } + + public void setObjectInspector(ObjectInspector oi) { + this.oi = oi; + } + + public ObjectInspector getObjectInspector() { + return oi; + } + + public void set(OrcStruct orcStruct) { + this.orc = orcStruct; + blob = null; // the blob is stale + } + + public OrcStruct get() { + if (orc == null && blob != null) { + makeOrcStruct(); + } + return orc; + } + + private void makeOrcStruct() { + try { + Object row = serde.deserialize(blob); + StructObjectInspector rowOi = (StructObjectInspector) serde.getObjectInspector(); + orc = (OrcStruct) OrcUtils.convert(row, rowOi, oi); + } catch (SerDeException e) { + throw new CrunchRuntimeException("Unable to deserialize blob: " + blob); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java new file mode 100644 index 0000000..8d9c806 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.orc; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableTimestampObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +public class OrcUtils { + + /** + * Generate TypeInfo for a given java class based on reflection + * + * @param typeClass + * @return + */ + public static TypeInfo getTypeInfo(Class<?> typeClass) { + ObjectInspector oi = ObjectInspectorFactory + .getReflectionObjectInspector(typeClass, ObjectInspectorOptions.JAVA); + return TypeInfoUtils.getTypeInfoFromObjectInspector(oi); + } + + /** + * Create an object of OrcStruct given a type string and a list of objects + * + * @param typeStr + * @param objs + * @return + */ + public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) { + SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct + .createObjectInspector(typeInfo); + List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs(); + OrcStruct result = (OrcStruct) oi.create(); + result.setNumFields(fields.size()); + for (int i = 0; i < fields.size(); i++) { + oi.setStructFieldData(result, fields.get(i), objs[i]); + } + return result; + } + + /** + * Create a binary serde for OrcStruct serialization/deserialization + * + * @param typeInfo + * @return + */ + public static BinarySortableSerDe createBinarySerde(TypeInfo typeInfo){ + BinarySortableSerDe serde = new BinarySortableSerDe(); + + StringBuffer nameSb = new StringBuffer(); + StringBuffer typeSb = new StringBuffer(); + + StructTypeInfo sti = (StructTypeInfo) typeInfo; + for (String name : sti.getAllStructFieldNames()) { + nameSb.append(name); + nameSb.append(','); + } + for (TypeInfo info : sti.getAllStructFieldTypeInfos()) { + typeSb.append(info.toString()); + typeSb.append(','); + } + + Properties tbl = new Properties(); + String names = nameSb.length() > 0 ? nameSb.substring(0, + nameSb.length() - 1) : ""; + String types = typeSb.length() > 0 ? typeSb.substring(0, + typeSb.length() - 1) : ""; + tbl.setProperty(serdeConstants.LIST_COLUMNS, names); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, types); + + try { + serde.initialize(null, tbl); + } catch (SerDeException e) { + throw new CrunchRuntimeException("Unable to initialize binary serde"); + } + + return serde; + } + + /** + * Convert an object from / to OrcStruct + * + * @param from + * @param fromOi + * @param toOi + * @return + */ + public static Object convert(Object from, ObjectInspector fromOi, ObjectInspector toOi) { + if (from == null) { + return null; + } + Object to; + switch (fromOi.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector fromPoi = (PrimitiveObjectInspector) fromOi; + switch (fromPoi.getPrimitiveCategory()) { + case FLOAT: + SettableFloatObjectInspector floatOi = (SettableFloatObjectInspector) toOi; + return floatOi.create((Float) fromPoi.getPrimitiveJavaObject(from)); + case DOUBLE: + SettableDoubleObjectInspector doubleOi = (SettableDoubleObjectInspector) toOi; + return doubleOi.create((Double) fromPoi.getPrimitiveJavaObject(from)); + case BOOLEAN: + SettableBooleanObjectInspector boolOi = (SettableBooleanObjectInspector) toOi; + return boolOi.create((Boolean) fromPoi.getPrimitiveJavaObject(from)); + case INT: + SettableIntObjectInspector intOi = (SettableIntObjectInspector) toOi; + return intOi.create((Integer) fromPoi.getPrimitiveJavaObject(from)); + case LONG: + SettableLongObjectInspector longOi = (SettableLongObjectInspector) toOi; + return longOi.create((Long) fromPoi.getPrimitiveJavaObject(from)); + case STRING: + SettableStringObjectInspector strOi = (SettableStringObjectInspector) toOi; + return strOi.create((String) fromPoi.getPrimitiveJavaObject(from)); + case BYTE: + SettableByteObjectInspector byteOi = (SettableByteObjectInspector) toOi; + return byteOi.create((Byte) fromPoi.getPrimitiveJavaObject(from)); + case SHORT: + SettableShortObjectInspector shortOi = (SettableShortObjectInspector) toOi; + return shortOi.create((Short) fromPoi.getPrimitiveJavaObject(from)); + case BINARY: + SettableBinaryObjectInspector binOi = (SettableBinaryObjectInspector) toOi; + return binOi.create((byte[]) fromPoi.getPrimitiveJavaObject(from)); + case TIMESTAMP: + SettableTimestampObjectInspector timeOi = (SettableTimestampObjectInspector) toOi; + return timeOi.create((Timestamp) fromPoi.getPrimitiveJavaObject(from)); + case DATE: + SettableDateObjectInspector dateOi = (SettableDateObjectInspector) toOi; + return dateOi.create((Date) fromPoi.getPrimitiveJavaObject(from)); + case DECIMAL: + SettableHiveDecimalObjectInspector decimalOi = (SettableHiveDecimalObjectInspector) toOi; + return decimalOi.create((HiveDecimal) fromPoi.getPrimitiveJavaObject(from)); + case CHAR: + SettableHiveCharObjectInspector charOi = (SettableHiveCharObjectInspector) toOi; + return charOi.create((HiveChar) fromPoi.getPrimitiveJavaObject(from)); + case VARCHAR: + SettableHiveVarcharObjectInspector varcharOi = (SettableHiveVarcharObjectInspector) toOi; + return varcharOi.create((HiveVarchar) fromPoi.getPrimitiveJavaObject(from)); + case VOID: + throw new IllegalArgumentException("Void type is not supported yet"); + default: + throw new IllegalArgumentException("Unknown primitive type " + + (fromPoi).getPrimitiveCategory()); + } + case STRUCT: + StructObjectInspector fromSoi = (StructObjectInspector) fromOi; + List<StructField> fromFields = (List<StructField>) fromSoi.getAllStructFieldRefs(); + List<Object> fromItems = fromSoi.getStructFieldsDataAsList(from); + + // this is a tuple. use TupleObjectInspector to construct the result + if (toOi instanceof TupleObjectInspector) { + TupleObjectInspector toToi = (TupleObjectInspector) toOi; + List<StructField> toFields = (List<StructField>) toToi.getAllStructFieldRefs(); + Object[] values = new Object[fromItems.size()]; + for (int i = 0; i < fromItems.size(); i++) { + values[i] = convert(fromItems.get(i), + fromFields.get(i).getFieldObjectInspector(), + toFields.get(i).getFieldObjectInspector()); + } + return toToi.create(values); + } + + SettableStructObjectInspector toSoi = (SettableStructObjectInspector) toOi; + List<StructField> toFields = (List<StructField>) toSoi.getAllStructFieldRefs(); + to = toSoi.create(); + for (int i = 0; i < fromItems.size(); i++) { + Object converted = convert(fromItems.get(i), + fromFields.get(i).getFieldObjectInspector(), + toFields.get(i).getFieldObjectInspector()); + toSoi.setStructFieldData(to, toFields.get(i), converted); + } + return to; + case MAP: + MapObjectInspector fromMoi = (MapObjectInspector) fromOi; + SettableMapObjectInspector toMoi = (SettableMapObjectInspector) toOi; + to = toMoi.create(); // do not reuse + for (Map.Entry<?, ?> entry : fromMoi.getMap(from).entrySet()) { + Object convertedKey = convert(entry.getKey(), + fromMoi.getMapKeyObjectInspector(), + toMoi.getMapKeyObjectInspector()); + Object convertedValue = convert(entry.getValue(), + fromMoi.getMapValueObjectInspector(), + toMoi.getMapValueObjectInspector()); + toMoi.put(to, convertedKey, convertedValue); + } + return to; + case LIST: + ListObjectInspector fromLoi = (ListObjectInspector) fromOi; + List<?> fromList = fromLoi.getList(from); + + SettableListObjectInspector toLoi = (SettableListObjectInspector) toOi; + to = toLoi.create(fromList.size()); // do not reuse + for (int i = 0; i < fromList.size(); i++) { + Object converted = convert(fromList.get(i), + fromLoi.getListElementObjectInspector(), + toLoi.getListElementObjectInspector()); + toLoi.set(to, i, converted); + } + return to; + case UNION: + throw new IllegalArgumentException("Union type is not supported yet"); + default: + throw new IllegalArgumentException("Unknown type " + fromOi.getCategory()); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java new file mode 100644 index 0000000..d3611c2 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.orc; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import org.apache.crunch.MapFn; +import org.apache.crunch.Tuple; +import org.apache.crunch.TupleN; +import org.apache.crunch.fn.CompositeMapFn; +import org.apache.crunch.io.orc.OrcWritable; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleFactory; +import org.apache.crunch.types.writable.WritableType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +/** + * Utilities to create PTypes for ORC serialization / deserialization + * + */ +public class Orcs { + + /** + * Create a PType to directly use OrcStruct as the deserialized format. This + * is the fastest way for serialization/deserializations. However, users + * need to use ObjectInspectors to handle the OrcStruct. Currently, void and + * union types are not supported. + * + * @param typeInfo + * @return + */ + public static final PType<OrcStruct> orcs(TypeInfo typeInfo) { + return Writables.derived(OrcStruct.class, new OrcInFn(typeInfo), new OrcOutFn(typeInfo), + Writables.writables(OrcWritable.class)); + } + + /** + * Create a PType which uses reflection to serialize/deserialize java POJOs + * to/from ORC. There are some restrictions of the POJO: 1) it must have a + * default, no-arg constructor; 2) All of its fields must be Hive primitive + * types or collection types that have Hive equivalents; 3) Void and Union + * are not supported yet. + * + * @param clazz + * @return + */ + public static final <T> PType<T> reflects(Class<T> clazz) { + TypeInfo reflectInfo = createReflectTypeInfo(clazz); + return Writables.derived(clazz, new ReflectInFn<T>(clazz), + new ReflectOutFn<T>(clazz), orcs(reflectInfo)); + } + + private static TypeInfo createReflectTypeInfo(Class<?> clazz) { + ObjectInspector reflectOi = ObjectInspectorFactory + .getReflectionObjectInspector(clazz, ObjectInspectorOptions.JAVA); + return TypeInfoUtils.getTypeInfoFromObjectInspector(reflectOi); + } + + /** + * Create a tuple-based PType. Users can use other Crunch PTypes (such as + * Writables.ints(), Orcs.reflects(), Writables.pairs(), ...) to construct + * the PType. Currently, nulls and unions are not supported. + * + * @param ptypes + * @return + */ + public static final PType<TupleN> tuples(PType... ptypes) { + TypeInfo tupleInfo = createTupleTypeInfo(ptypes); + return derived(TupleN.class, new TupleInFn<TupleN>(TupleFactory.TUPLEN, ptypes), + new TupleOutFn<TupleN>(ptypes), orcs(tupleInfo), ptypes); + } + + // derived, but override subtypes + static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, + PType<S> base, PType[] subTypes) { + WritableType<S, ?> wt = (WritableType<S, ?>) base; + MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn); + MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn()); + return new WritableType(clazz, wt.getSerializationClass(), input, output, subTypes); + } + + private static TypeInfo createTupleTypeInfo(PType... ptypes) { + ObjectInspector tupleOi = new TupleObjectInspector(null, ptypes); + return TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi); + } + + private static class OrcInFn extends MapFn<OrcWritable, OrcStruct> { + + private TypeInfo typeInfo; + + private transient ObjectInspector oi; + private transient BinarySortableSerDe serde; + + public OrcInFn(TypeInfo typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public void initialize() { + oi = OrcStruct.createObjectInspector(typeInfo); + serde = OrcUtils.createBinarySerde(typeInfo); + } + + @Override + public OrcStruct map(OrcWritable input) { + input.setObjectInspector(oi); + input.setSerde(serde); + return input.get(); + } + + } + + private static class OrcOutFn extends MapFn<OrcStruct, OrcWritable> { + + private TypeInfo typeInfo; + + private transient ObjectInspector oi; + private transient BinarySortableSerDe serde; + + public OrcOutFn(TypeInfo typeInfo) { + this.typeInfo = typeInfo; + } + + @Override + public void initialize() { + oi = OrcStruct.createObjectInspector(typeInfo); + serde = OrcUtils.createBinarySerde(typeInfo); + } + + @Override + public OrcWritable map(OrcStruct input) { + OrcWritable output = new OrcWritable(); + output.setObjectInspector(oi); + output.setSerde(serde); + output.set(input); + return output; + } + + } + + private static Map<Class<?>, Field[]> fieldsCache = new HashMap<Class<?>, Field[]>(); + + private static class ReflectInFn<T> extends MapFn<OrcStruct, T> { + + private Class<T> typeClass; + private TypeInfo typeInfo; + + private transient ObjectInspector reflectOi; + private transient ObjectInspector orcOi; + + @Override + public void initialize() { + reflectOi = ObjectInspectorFactory + .getReflectionObjectInspector(typeClass, ObjectInspectorOptions.JAVA); + orcOi = OrcStruct.createObjectInspector(typeInfo); + } + + public ReflectInFn(Class<T> typeClass) { + this.typeClass = typeClass; + typeInfo = createReflectTypeInfo(typeClass); + } + + @Override + public T map(OrcStruct input) { + return (T) OrcUtils.convert(input, orcOi, reflectOi); + } + + } + + private static class ReflectOutFn<T> extends MapFn<T, OrcStruct> { + + private Class<T> typeClass; + private TypeInfo typeInfo; + + private transient ObjectInspector reflectOi; + private transient SettableStructObjectInspector orcOi; + + @Override + public void initialize() { + reflectOi = ObjectInspectorFactory.getReflectionObjectInspector(typeClass, + ObjectInspectorOptions.JAVA); + orcOi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(typeInfo); + } + + public ReflectOutFn(Class<T> typeClass) { + this.typeClass = typeClass; + typeInfo = createReflectTypeInfo(typeClass); + } + + @Override + public OrcStruct map(T input) { + return (OrcStruct) OrcUtils.convert(input, reflectOi, orcOi); + } + + } + + private static class TupleInFn<T extends Tuple> extends MapFn<OrcStruct, T> { + + private PType[] ptypes; + private TupleFactory<T> tupleFactory; + + private transient ObjectInspector tupleOi; + private transient ObjectInspector orcOi; + + public TupleInFn(TupleFactory<T> tupleFactory, PType... ptypes) { + this.tupleFactory = tupleFactory; + this.ptypes = ptypes; + } + + @Override + public void initialize() { + tupleOi = new TupleObjectInspector<T>(tupleFactory, ptypes); + TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi); + orcOi = OrcStruct.createObjectInspector(info); + } + + @Override + public T map(OrcStruct input) { + return (T) OrcUtils.convert(input, orcOi, tupleOi); + } + + } + + private static class TupleOutFn<T extends Tuple> extends MapFn<T, OrcStruct> { + + private PType[] ptypes; + + private transient ObjectInspector tupleOi; + private transient ObjectInspector orcOi; + + public TupleOutFn(PType... ptypes) { + this.ptypes = ptypes; + } + + @Override + public void initialize() { + tupleOi = new TupleObjectInspector<T>(null, ptypes); + TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi); + orcOi = OrcStruct.createObjectInspector(info); + } + + @Override + public OrcStruct map(T input) { + return (OrcStruct) OrcUtils.convert(input, tupleOi, orcOi); + } + + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java new file mode 100644 index 0000000..9f5cd94 --- /dev/null +++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.orc; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.crunch.Tuple; +import org.apache.crunch.Union; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBinaryObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; + +/** + * An object inspector to define the structure of Crunch Tuples + * + */ +public class TupleObjectInspector<T extends Tuple> extends StructObjectInspector { + + private TupleFactory<T> tupleFactory; + private List<TupleField> fields; + + public TupleObjectInspector(TupleFactory<T> tupleFactory, PType... ptypes) { + this.tupleFactory = tupleFactory; + fields = new ArrayList<TupleField>(); + for (int i = 0; i < ptypes.length; i++) { + TupleField field = new TupleField(i, ptypes[i]); + fields.add(field); + } + } + + static class TupleField implements StructField { + + private int index; + private ObjectInspector oi; + + public TupleField(int index, PType<?> ptype) { + this.index = index; + oi = createObjectInspector(ptype); + } + + private ObjectInspector createObjectInspector(PType<?> ptype) { + Class typeClass = ptype.getTypeClass(); + if (typeClass == Union.class || typeClass == Void.class) { + throw new IllegalArgumentException(typeClass.getName() + " is not supported yet"); + } + + ObjectInspector result; + if (typeClass == ByteBuffer.class) { + result = new ByteBufferObjectInspector(); + } else if (typeClass == Collection.class) { + ObjectInspector itemOi = createObjectInspector(ptype.getSubTypes().get(0)); + result = ObjectInspectorFactory.getStandardListObjectInspector(itemOi); + } else if (typeClass == Map.class) { + ObjectInspector keyOi = ObjectInspectorFactory + .getReflectionObjectInspector(String.class, ObjectInspectorOptions.JAVA); + ObjectInspector valueOi = createObjectInspector(ptype.getSubTypes().get(0)); + result = ObjectInspectorFactory.getStandardMapObjectInspector(keyOi, valueOi); + } else if (Tuple.class.isAssignableFrom(typeClass)) { + result = new TupleObjectInspector(TupleFactory.getTupleFactory(typeClass), + ptype.getSubTypes().toArray(new PType[0])); + } else { + result = ObjectInspectorFactory.getReflectionObjectInspector(typeClass, + ObjectInspectorOptions.JAVA); + } + return result; + } + + @Override + public String getFieldName() { + return "_col" + index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return oi; + } + + @Override + public String getFieldComment() { + return null; + } + + } + + @Override + public String getTypeName() { + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + public T create(Object... values) { + return tupleFactory.makeTuple(values); + } + + @Override + public List<? extends StructField> getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + for (StructField field : fields) { + if (field.getFieldName().equals(fieldName)) { + return field; + } + } + return null; + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + TupleField field = (TupleField) fieldRef; + return ((T) data).get(field.index); + } + + @Override + public List<Object> getStructFieldsDataAsList(Object data) { + T tuple = (T) data; + List<Object> result = new ArrayList<Object>(); + for (int i = 0; i < tuple.size(); i++) { + result.add(tuple.get(i)); + } + return result; + } + + + static class ByteBufferObjectInspector extends AbstractPrimitiveJavaObjectInspector implements SettableBinaryObjectInspector { + + ByteBufferObjectInspector() { + super(TypeInfoFactory.binaryTypeInfo); + } + + @Override + public ByteBuffer copyObject(Object o) { + if (o == null) { + return null; + } + byte[] oldBytes = getPrimitiveJavaObject(o); + byte[] copiedBytes = new byte[oldBytes.length]; + System.arraycopy(oldBytes, 0, copiedBytes, 0, oldBytes.length); + ByteBuffer duplicate = ByteBuffer.wrap(copiedBytes); + return duplicate; + } + + @Override + public BytesWritable getPrimitiveWritableObject(Object o) { + if (o == null) { + return null; + } + ByteBuffer buf = (ByteBuffer) o; + BytesWritable bw = new BytesWritable(); + bw.set(buf.array(), buf.arrayOffset(), buf.limit()); + return bw; + } + + @Override + public byte[] getPrimitiveJavaObject(Object o) { + if (o == null) { + return null; + } + ByteBuffer buf = (ByteBuffer) o; + byte[] b = new byte[buf.limit()]; + System.arraycopy(buf.array(), buf.arrayOffset(), b, 0, b.length); + return b; + } + + @Override + public Object set(Object o, byte[] b) { + throw new UnsupportedOperationException("set is not supported"); + } + + @Override + public Object set(Object o, BytesWritable bw) { + throw new UnsupportedOperationException("set is not supported"); + } + + @Override + public ByteBuffer create(byte[] bb) { + return bb == null ? null : ByteBuffer.wrap(bb); + } + + @Override + public ByteBuffer create(BytesWritable bw) { + return bw == null ? null : ByteBuffer.wrap(bw.getBytes()); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java new file mode 100644 index 0000000..53dbfaf --- /dev/null +++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.List; + +import org.apache.crunch.types.PType; +import org.apache.crunch.types.orc.OrcUtils; +import org.apache.crunch.types.orc.Orcs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class OrcFileReaderFactoryTest extends OrcFileTest { + + @Test + public void testColumnPruning() throws IOException { + Path path = new Path(tempPath, "test.orc"); + + String typeStr = "struct<a:int,b:string,c:float>"; + TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + StructObjectInspector soi = (StructObjectInspector) OrcStruct.createObjectInspector(info); + PType<OrcStruct> ptype = Orcs.orcs(info); + + OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, path, ptype); + writer.write(OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("Alice"), new FloatWritable(167.2f))); + writer.write(OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("Bob"), new FloatWritable(179.7f))); + writer.close(); + + int[] readColumns = {1}; + OrcFileSource<OrcStruct> source = new OrcFileSource<OrcStruct>(path, ptype, readColumns); + for (OrcStruct row : source.read(conf)) { + List<Object> list = soi.getStructFieldsDataAsList(row); + assertNull(list.get(0)); + assertNotNull(list.get(1)); + assertNull(list.get(2)); + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java new file mode 100644 index 0000000..f049608 --- /dev/null +++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.crunch.io.orc.OrcFileSource; +import org.apache.crunch.io.orc.OrcFileWriter; +import org.apache.crunch.test.orc.pojos.Person; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.orc.Orcs; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +public class OrcFileReaderWriterTest extends OrcFileTest { + + @Test + public void testReadWrite() throws IOException { + Path path = new Path(tempPath, "test.orc"); + PType<Person> ptype = Orcs.reflects(Person.class); + OrcFileWriter<Person> writer = new OrcFileWriter<Person>(conf, path, ptype); + + Person p1 = new Person("Alice", 23, Arrays.asList("666-677-9999")); + Person p2 = new Person("Bob", 26, null); + + writer.write(p1); + writer.write(p2); + writer.close(); + + OrcFileSource<Person> reader = new OrcFileSource<Person>(path, ptype); + Iterator<Person> iter = reader.read(conf).iterator(); + assertEquals(p1, iter.next()); + assertEquals(p2, iter.next()); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java new file mode 100644 index 0000000..dcc9cd6 --- /dev/null +++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; + +import com.google.common.io.Files; + +public class OrcFileTest { + + protected transient Configuration conf; + protected transient FileSystem fs; + protected transient Path tempPath; + + @Before + public void setUp() throws IOException { + conf = new Configuration(); + tempPath = new Path(Files.createTempDir().getAbsolutePath()); + fs = tempPath.getFileSystem(conf); + } + + @After + public void tearDown() throws IOException { + fs.delete(tempPath, true); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java ---------------------------------------------------------------------- diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java new file mode 100644 index 0000000..a281890 --- /dev/null +++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.orc; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.apache.crunch.types.orc.OrcUtils; +import org.apache.crunch.types.writable.WritableDeepCopier; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class OrcWritableTest { + + @Test + public void testDeepCopy() { + String typeStr = "struct<a:int,b:string,c:float>"; + TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + StructObjectInspector oi = (StructObjectInspector) OrcStruct.createObjectInspector(info); + BinarySortableSerDe serde = OrcUtils.createBinarySerde(info); + + OrcStruct struct = OrcUtils.createOrcStruct(info, + new IntWritable(1), new Text("Alice"), new FloatWritable(165.3f)); + OrcWritable writable = new OrcWritable(); + writable.set(struct); + assertTrue(struct == writable.get()); + + writable.setObjectInspector(oi); + writable.setSerde(serde); + + WritableDeepCopier<OrcWritable> deepCopier = new WritableDeepCopier<OrcWritable>(OrcWritable.class); + OrcWritable copied = deepCopier.deepCopy(writable); + assertTrue(writable != copied); + assertEquals(writable, copied); + + copied.setObjectInspector(oi); + copied.setSerde(serde); + OrcStruct copiedStruct = copied.get(); + assertTrue(struct != copiedStruct); + assertEquals(struct, copiedStruct); + + List<Object> items = oi.getStructFieldsDataAsList(struct); + List<Object> copiedItems = oi.getStructFieldsDataAsList(copiedStruct); + for (int i = 0; i < items.size(); i++) { + assertTrue(items.get(i) != copiedItems.get(i)); + assertEquals(items.get(i), copiedItems.get(i)); + } + + OrcWritable copied2 = deepCopier.deepCopy(copied); + assertTrue(copied2 != copied); + assertEquals(copied2, copied); + + copied2.setObjectInspector(oi); + copied2.setSerde(serde); + OrcStruct copiedStruct2 = copied2.get(); + assertTrue(copiedStruct2 != copiedStruct); + assertEquals(copiedStruct2, copiedStruct); + + List<Object> copiedItems2 = oi.getStructFieldsDataAsList(copiedStruct2); + for (int i = 0; i < items.size(); i++) { + assertTrue(copiedItems2.get(i) != copiedItems.get(i)); + assertEquals(copiedItems2.get(i), copiedItems.get(i)); + } + } + + @Test + public void testCompareTo() { + String typeStr = "struct<a:int,b:string,c:float>"; + TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + StructObjectInspector oi = (StructObjectInspector) OrcStruct.createObjectInspector(info); + BinarySortableSerDe serde = OrcUtils.createBinarySerde(info); + + OrcStruct struct1 = OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("AAA"), new FloatWritable(3.2f)); + OrcStruct struct2 = OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("AAB"), null); + OrcStruct struct3 = OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("AAA"), null); + OrcStruct struct4 = OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("AAA"), new FloatWritable(3.2f)); + + OrcWritable writable1 = new OrcWritable(); + writable1.set(struct1); + OrcWritable writable2 = new OrcWritable(); + writable2.set(struct2); + OrcWritable writable3 = new OrcWritable(); + writable3.set(struct3); + OrcWritable writable4 = new OrcWritable(); + writable4.set(struct4); + + writable1.setObjectInspector(oi); + writable2.setObjectInspector(oi); + writable3.setObjectInspector(oi); + writable4.setObjectInspector(oi); + writable1.setSerde(serde); + writable2.setSerde(serde); + writable3.setSerde(serde); + writable4.setSerde(serde); + + assertTrue(writable1.compareTo(writable2) < 0); + assertTrue(writable2.compareTo(writable3) < 0); + assertTrue(writable1.compareTo(writable3) < 0); + assertTrue(writable3.compareTo(writable4) < 0); + } + +}
