Repository: orc Updated Branches: refs/heads/master 545fe3712 -> 3bb5ce532
http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java new file mode 100644 index 0000000..f686e05 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.mapreduce; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcMapredRecordReader; +import org.apache.orc.mapred.OrcStruct; + +import java.io.IOException; +import java.util.List; + +/** + * This record reader implements the org.apache.hadoop.mapreduce API. + * It is in the org.apache.orc.mapred package to share implementation with + * the mapred API record reader. + * @param <V> the root type of the file + */ +public class OrcMapreduceRecordReader<V extends WritableComparable> + extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, V> { + private final TypeDescription schema; + private final RecordReader batchReader; + private final VectorizedRowBatch batch; + private int rowInBatch; + private final V row; + + public OrcMapreduceRecordReader(Reader fileReader, + Reader.Options options) throws IOException { + this.batchReader = fileReader.rows(options); + if (options.getSchema() == null) { + schema = fileReader.getSchema(); + } else { + schema = options.getSchema(); + } + this.batch = schema.createRowBatch(); + rowInBatch = 0; + this.row = (V) OrcStruct.createValue(schema); + } + + /** + * If the current batch is empty, get a new one. + * @return true if we have rows available. + * @throws IOException + */ + boolean ensureBatch() throws IOException { + if (rowInBatch >= batch.size) { + rowInBatch = 0; + return batchReader.nextBatch(batch); + } + return true; + } + + @Override + public void close() throws IOException { + batchReader.close(); + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) { + // nothing required + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!ensureBatch()) { + return false; + } + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + OrcStruct result = (OrcStruct) row; + List<TypeDescription> children = schema.getChildren(); + int numberOfChildren = children.size(); + for(int i=0; i < numberOfChildren; ++i) { + result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowInBatch, + children.get(i), result.getFieldValue(i))); + } + } else { + OrcMapredRecordReader.nextValue(batch.cols[0], rowInBatch, schema, row); + } + rowInBatch += 1; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public V getCurrentValue() throws IOException, InterruptedException { + return row; + } + + @Override + public float getProgress() throws IOException { + return batchReader.getProgress(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java new file mode 100644 index 0000000..9379584 --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.mapred.OrcKey; +import org.apache.orc.mapred.OrcMapredRecordWriter; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; + +import java.io.IOException; + +public class OrcMapreduceRecordWriter<V extends Writable> + extends RecordWriter<NullWritable, V> { + + private final Writer writer; + private final VectorizedRowBatch batch; + private final TypeDescription schema; + private final boolean isTopStruct; + + public OrcMapreduceRecordWriter(Writer writer) { + this.writer = writer; + schema = writer.getSchema(); + this.batch = schema.createRowBatch(); + isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; + } + + @Override + public void write(NullWritable nullWritable, V v) throws IOException { + // if the batch is full, write it out. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + + // add the new row + int row = batch.size++; + // skip over the OrcKey or OrcValue + if (v instanceof OrcKey) { + v = (V)((OrcKey) v).key; + } else if (v instanceof OrcValue) { + v = (V)((OrcValue) v).value; + } + if (isTopStruct) { + for(int f=0; f < schema.getChildren().size(); ++f) { + OrcMapredRecordWriter.setColumn(schema.getChildren().get(f), + batch.cols[f], row, ((OrcStruct) v).getFieldValue(f)); + } + } else { + OrcMapredRecordWriter.setColumn(schema, batch.cols[0], row, v); + } + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException { + if (batch.size != 0) { + writer.addRowBatch(batch); + } + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java new file mode 100644 index 0000000..797998c --- /dev/null +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; + +/** + * An ORC output format that satisfies the org.apache.hadoop.mapreduce API. + */ +public class OrcOutputFormat<V extends Writable> + extends FileOutputFormat<NullWritable, V> { + private static final String EXTENSION = ".orc"; + // This is useful for unit tests or local runs where you don't need the + // output committer. + public static final String SKIP_TEMP_DIRECTORY = + "orc.mapreduce.output.skip-temporary-directory"; + + @Override + public RecordWriter<NullWritable, V> + getRecordWriter(TaskAttemptContext taskAttemptContext + ) throws IOException { + Configuration conf = taskAttemptContext.getConfiguration(); + Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION); + Writer writer = OrcFile.createWriter(filename, + org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)); + return new OrcMapreduceRecordWriter<V>(writer); + } + + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + if (context.getConfiguration().getBoolean(SKIP_TEMP_DIRECTORY, false)) { + return new Path(getOutputPath(context), + getUniqueFile(context, getOutputName(context), extension)); + } else { + return super.getDefaultWorkFile(context, extension); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java new file mode 100644 index 0000000..cd11603 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mrunit.MapReduceDriver; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class TestMrUnit { + JobConf conf = new JobConf(); + + /** + * Split the input struct into its two parts. + */ + public static class MyMapper + implements Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> { + private OrcKey keyWrapper = new OrcKey(); + private OrcValue valueWrapper = new OrcValue(); + + @Override + public void map(NullWritable key, OrcStruct value, + OutputCollector<OrcKey, OrcValue> outputCollector, + Reporter reporter) throws IOException { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + outputCollector.collect(keyWrapper, valueWrapper); + } + + @Override + public void close() throws IOException { + // PASS + } + + @Override + public void configure(JobConf jobConf) { + // PASS + } + } + + /** + * Glue the key and values back together. + */ + public static class MyReducer + implements Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> { + private OrcStruct output = new OrcStruct(TypeDescription.fromString + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>")); + private final NullWritable nada = NullWritable.get(); + + @Override + public void reduce(OrcKey key, Iterator<OrcValue> iterator, + OutputCollector<NullWritable, OrcStruct> collector, + Reporter reporter) throws IOException { + output.setFieldValue(0, key.key); + while (iterator.hasNext()) { + OrcValue value = iterator.next(); + output.setFieldValue(1, value.value); + collector.collect(nada, output); + } + } + + @Override + public void close() throws IOException { + // PASS + } + + @Override + public void configure(JobConf jobConf) { + // PASS + } + } + + /** + * This class is intended to support MRUnit's object copying for input and + * output objects. + * + * Real mapreduce contexts should NEVER use this class. + * + * The type string is serialized before each value. + */ + public static class OrcStructSerialization + implements Serialization<OrcStruct> { + + @Override + public boolean accept(Class<?> cls) { + return OrcStruct.class.isAssignableFrom(cls); + } + + @Override + public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) { + return new Serializer<OrcStruct>() { + DataOutputStream dataOut; + + public void open(OutputStream out) { + if(out instanceof DataOutputStream) { + dataOut = (DataOutputStream)out; + } else { + dataOut = new DataOutputStream(out); + } + } + + public void serialize(OrcStruct w) throws IOException { + Text.writeString(dataOut, w.getSchema().toString()); + w.write(dataOut); + } + + public void close() throws IOException { + dataOut.close(); + } + }; + } + + @Override + public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) { + return new Deserializer<OrcStruct>() { + DataInputStream input; + + @Override + public void open(InputStream inputStream) throws IOException { + if(inputStream instanceof DataInputStream) { + input = (DataInputStream)inputStream; + } else { + input = new DataInputStream(inputStream); + } + } + + @Override + public OrcStruct deserialize(OrcStruct orcStruct) throws IOException { + String typeStr = Text.readString(input); + OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr)); + result.readFields(input); + return result; + } + + @Override + public void close() throws IOException { + // PASS + } + }; + } + } + + @Test + public void testMapred() throws IOException { + conf.set("io.serializations", + OrcStructSerialization.class.getName() + "," + + WritableSerialization.class.getName()); + OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>"); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>"); + MyMapper mapper = new MyMapper(); + mapper.configure(conf); + MyReducer reducer = new MyReducer(); + reducer.configure(conf); + MapReduceDriver<NullWritable, OrcStruct, + OrcKey, OrcValue, + NullWritable, OrcStruct> driver = + new MapReduceDriver<>(mapper, reducer); + driver.setConfiguration(conf); + NullWritable nada = NullWritable.get(); + OrcStruct input = (OrcStruct) OrcStruct.createValue( + TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>")); + IntWritable x = + (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0); + IntWritable y = + (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1); + Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0); + + // generate the input stream + for(int r=0; r < 20; ++r) { + x.set(100 - (r / 4)); + y.set(r*2); + z.set(Integer.toHexString(r)); + driver.withInput(nada, input); + } + + // generate the expected outputs + for(int g=4; g >= 0; --g) { + x.set(100 - g); + for(int i=0; i < 4; ++i) { + int r = g * 4 + i; + y.set(r * 2); + z.set(Integer.toHexString(r)); + driver.withOutput(nada, input); + } + } + driver.runTest(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java new file mode 100644 index 0000000..a915ed3 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; + +import static org.junit.Assert.assertEquals; + +public class TestOrcOutputFormat { + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + JobConf conf = new JobConf(); + FileSystem fs; + + { + try { + fs = FileSystem.getLocal(conf).getRaw(); + fs.delete(workDir, true); + fs.mkdirs(workDir); + } catch (IOException e) { + throw new IllegalStateException("bad fs init", e); + } + } + + static class NullOutputCommitter extends OutputCommitter { + + @Override + public void setupJob(JobContext jobContext) { + // PASS + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) { + // PASS + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) { + // PASS + } + } + + @Test + public void testAllTypes() throws Exception { + conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0"); + conf.setOutputCommitter(NullOutputCommitter.class); + final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," + + "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," + + "l:array<bigint>,map:map<smallint,string>," + + "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr); + FileOutputFormat.setOutputPath(conf, workDir); + TypeDescription type = TypeDescription.fromString(typeStr); + + // build a row object + OrcStruct row = (OrcStruct) OrcStruct.createValue(type); + ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4); + ((BooleanWritable) row.getFieldValue(1)).set(true); + ((ByteWritable) row.getFieldValue(2)).set((byte) 23); + ((Text) row.getFieldValue(3)).set("aaabbbcccddd"); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays + (format.parse("2016-04-01").getTime())); + ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23")); + ((DoubleWritable) row.getFieldValue(6)).set(1.5); + ((FloatWritable) row.getFieldValue(7)).set(4.5f); + ((IntWritable) row.getFieldValue(8)).set(31415); + OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9); + longList.add(new LongWritable(123)); + longList.add(new LongWritable(456)); + OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10); + map.put(new ShortWritable((short) 1000), new Text("aaaa")); + map.put(new ShortWritable((short) 123), new Text("bbbb")); + OrcStruct struct = (OrcStruct) row.getFieldValue(11); + OrcUnion union = (OrcUnion) struct.getFieldValue(0); + union.set((byte) 1, new Text("abcde")); + ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00"); + NullWritable nada = NullWritable.get(); + RecordWriter<NullWritable, OrcStruct> writer = + new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc", + Reporter.NULL); + for(int r=0; r < 10; ++r) { + row.setFieldValue(8, new IntWritable(r * 10)); + writer.write(nada, row); + } + union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56")); + for(int r=0; r < 10; ++r) { + row.setFieldValue(8, new IntWritable(r * 10 + 100)); + writer.write(nada, row); + } + OrcStruct row2 = new OrcStruct(type); + writer.write(nada, row2); + row.setFieldValue(8, new IntWritable(210)); + writer.write(nada, row); + writer.close(Reporter.NULL); + + FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000, + new String[0]); + RecordReader<NullWritable, OrcStruct> reader = + new OrcInputFormat<OrcStruct>().getRecordReader(split, conf, + Reporter.NULL); + nada = reader.createKey(); + row = reader.createValue(); + for(int r=0; r < 22; ++r) { + assertEquals(true, reader.next(nada, row)); + if (r == 20) { + for(int c=0; c < 12; ++c) { + assertEquals(null, row.getFieldValue(c)); + } + } else { + assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0)); + assertEquals(new BooleanWritable(true), row.getFieldValue(1)); + assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2)); + assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3)); + assertEquals(new DateWritable(DateWritable.millisToDays + (format.parse("2016-04-01").getTime())), row.getFieldValue(4)); + assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5)); + assertEquals(new DoubleWritable(1.5), row.getFieldValue(6)); + assertEquals(new FloatWritable(4.5f), row.getFieldValue(7)); + assertEquals(new IntWritable(r * 10), row.getFieldValue(8)); + assertEquals(longList, row.getFieldValue(9)); + assertEquals(map, row.getFieldValue(10)); + if (r < 10) { + union.set((byte) 1, new Text("abcde")); + } else { + union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56")); + } + assertEquals("row " + r, struct, row.getFieldValue(11)); + assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"), + row.getFieldValue(12)); + } + } + assertEquals(false, reader.next(nada, row)); + } + + /** + * Test the case where the top level isn't a struct, but a long. + */ + @Test + public void testLongRoot() throws Exception { + conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0"); + conf.setOutputCommitter(NullOutputCommitter.class); + conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY"); + conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000); + conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024); + conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11"); + final String typeStr = "bigint"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr); + FileOutputFormat.setOutputPath(conf, workDir); + TypeDescription type = TypeDescription.fromString(typeStr); + LongWritable value = new LongWritable(); + NullWritable nada = NullWritable.get(); + RecordWriter<NullWritable, LongWritable> writer = + new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf, + "long.orc", Reporter.NULL); + for(long lo=0; lo < 2000; ++lo) { + value.set(lo); + writer.write(nada, value); + } + writer.close(Reporter.NULL); + + Path path = new Path(workDir, "long.orc"); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + assertEquals(CompressionKind.SNAPPY, file.getCompressionKind()); + assertEquals(2000, file.getNumberOfRows()); + assertEquals(1000, file.getRowIndexStride()); + assertEquals(64 * 1024, file.getCompressionSize()); + assertEquals(OrcFile.Version.V_0_11, file.getFileVersion()); + FileSplit split = new FileSplit(path, 0, 100000, + new String[0]); + RecordReader<NullWritable, LongWritable> reader = + new OrcInputFormat<LongWritable>().getRecordReader(split, conf, + Reporter.NULL); + nada = reader.createKey(); + value = reader.createValue(); + for(long lo=0; lo < 2000; ++lo) { + assertEquals(true, reader.next(nada, value)); + assertEquals(lo, value.get()); + } + assertEquals(false, reader.next(nada, value)); + } + + /** + * Make sure that the writer ignores the OrcKey + * @throws Exception + */ + @Test + public void testOrcKey() throws Exception { + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0"); + String TYPE_STRING = "struct<i:int,s:string>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); + conf.setOutputCommitter(NullOutputCommitter.class); + TypeDescription schema = TypeDescription.fromString(TYPE_STRING); + OrcKey key = new OrcKey(new OrcStruct(schema)); + RecordWriter<NullWritable, Writable> writer = + new OrcOutputFormat<>().getRecordWriter(fs, conf, "key.orc", + Reporter.NULL); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 2000; ++r) { + ((OrcStruct) key.key).setAllFields(new IntWritable(r), + new Text(Integer.toString(r))); + writer.write(nada, key); + } + writer.close(Reporter.NULL); + Path path = new Path(workDir, "key.orc"); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + assertEquals(2000, file.getNumberOfRows()); + assertEquals(TYPE_STRING, file.getSchema().toString()); + } + + /** + * Make sure that the writer ignores the OrcValue + * @throws Exception + */ + @Test + public void testOrcValue() throws Exception { + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0"); + String TYPE_STRING = "struct<i:int>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); + conf.setOutputCommitter(NullOutputCommitter.class); + TypeDescription schema = TypeDescription.fromString(TYPE_STRING); + OrcValue value = new OrcValue(new OrcStruct(schema)); + RecordWriter<NullWritable, Writable> writer = + new OrcOutputFormat<>().getRecordWriter(fs, conf, "value.orc", + Reporter.NULL); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 3000; ++r) { + ((OrcStruct) value.value).setAllFields(new IntWritable(r)); + writer.write(nada, value); + } + writer.close(Reporter.NULL); + Path path = new Path(workDir, "value.orc"); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + assertEquals(3000, file.getNumberOfRows()); + assertEquals(TYPE_STRING, file.getSchema().toString()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java index d32ce94..82699ed 100644 --- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java @@ -76,6 +76,11 @@ public class TestOrcStruct { assertEquals(new IntWritable(42), struct.getFieldValue("i")); assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1)); assertEquals(new Text("Moria"), struct.getFieldValue("k")); + struct.setAllFields(new IntWritable(123), new DoubleWritable(4.5), + new Text("ok")); + assertEquals("123", struct.getFieldValue(0).toString()); + assertEquals("4.5", struct.getFieldValue(1).toString()); + assertEquals("ok", struct.getFieldValue(2).toString()); } @Test http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java deleted file mode 100644 index ce5523f..0000000 --- a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.mapred.other; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.OutputCommitter; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TaskAttemptContext; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcConf; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.TypeDescription; -import org.apache.orc.mapred.OrcInputFormat; -import org.apache.orc.mapred.OrcList; -import org.apache.orc.mapred.OrcMap; -import org.apache.orc.mapred.OrcOutputFormat; -import org.apache.orc.mapred.OrcStruct; -import org.apache.orc.mapred.OrcTimestamp; -import org.apache.orc.mapred.OrcUnion; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class TestOrcOutputFormat { - - Path workDir = new Path(System.getProperty("test.tmp.dir", - "target" + File.separator + "test" + File.separator + "tmp")); - JobConf conf = new JobConf(); - FileSystem fs; - - { - try { - fs = FileSystem.getLocal(conf).getRaw(); - fs.delete(workDir, true); - fs.mkdirs(workDir); - } catch (IOException e) { - throw new IllegalStateException("bad fs init", e); - } - } - - static class NullOutputCommitter extends OutputCommitter { - - @Override - public void setupJob(JobContext jobContext) { - // PASS - } - - @Override - public void setupTask(TaskAttemptContext taskAttemptContext) { - - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) { - return false; - } - - @Override - public void commitTask(TaskAttemptContext taskAttemptContext) { - // PASS - } - - @Override - public void abortTask(TaskAttemptContext taskAttemptContext) { - // PASS - } - } - - @Test - public void testAllTypes() throws Exception { - conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0"); - conf.setOutputCommitter(NullOutputCommitter.class); - final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," + - "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," + - "l:array<bigint>,map:map<smallint,string>," + - "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>"; - conf.set(OrcConf.SCHEMA.getAttribute(), typeStr); - FileOutputFormat.setOutputPath(conf, workDir); - TypeDescription type = TypeDescription.fromString(typeStr); - - // build a row object - OrcStruct row = (OrcStruct) OrcStruct.createValue(type); - ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4); - ((BooleanWritable) row.getFieldValue(1)).set(true); - ((ByteWritable) row.getFieldValue(2)).set((byte) 23); - ((Text) row.getFieldValue(3)).set("aaabbbcccddd"); - SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays - (format.parse("2016-04-01").getTime())); - ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23")); - ((DoubleWritable) row.getFieldValue(6)).set(1.5); - ((FloatWritable) row.getFieldValue(7)).set(4.5f); - ((IntWritable) row.getFieldValue(8)).set(31415); - OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9); - longList.add(new LongWritable(123)); - longList.add(new LongWritable(456)); - OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10); - map.put(new ShortWritable((short) 1000), new Text("aaaa")); - map.put(new ShortWritable((short) 123), new Text("bbbb")); - OrcStruct struct = (OrcStruct) row.getFieldValue(11); - OrcUnion union = (OrcUnion) struct.getFieldValue(0); - union.set((byte) 1, new Text("abcde")); - ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00"); - NullWritable nada = NullWritable.get(); - RecordWriter<NullWritable, OrcStruct> writer = - new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc", - Reporter.NULL); - for(int r=0; r < 10; ++r) { - row.setFieldValue(8, new IntWritable(r * 10)); - writer.write(nada, row); - } - union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56")); - for(int r=0; r < 10; ++r) { - row.setFieldValue(8, new IntWritable(r * 10 + 100)); - writer.write(nada, row); - } - OrcStruct row2 = new OrcStruct(type); - writer.write(nada, row2); - row.setFieldValue(8, new IntWritable(210)); - writer.write(nada, row); - writer.close(Reporter.NULL); - - FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000, - new String[0]); - RecordReader<NullWritable, OrcStruct> reader = - new OrcInputFormat<OrcStruct>().getRecordReader(split, conf, - Reporter.NULL); - nada = reader.createKey(); - row = reader.createValue(); - for(int r=0; r < 22; ++r) { - assertEquals(true, reader.next(nada, row)); - if (r == 20) { - for(int c=0; c < 12; ++c) { - assertEquals(null, row.getFieldValue(c)); - } - } else { - assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0)); - assertEquals(new BooleanWritable(true), row.getFieldValue(1)); - assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2)); - assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3)); - assertEquals(new DateWritable(DateWritable.millisToDays - (format.parse("2016-04-01").getTime())), row.getFieldValue(4)); - assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5)); - assertEquals(new DoubleWritable(1.5), row.getFieldValue(6)); - assertEquals(new FloatWritable(4.5f), row.getFieldValue(7)); - assertEquals(new IntWritable(r * 10), row.getFieldValue(8)); - assertEquals(longList, row.getFieldValue(9)); - assertEquals(map, row.getFieldValue(10)); - if (r < 10) { - union.set((byte) 1, new Text("abcde")); - } else { - union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56")); - } - assertEquals("row " + r, struct, row.getFieldValue(11)); - assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"), - row.getFieldValue(12)); - } - } - assertEquals(false, reader.next(nada, row)); - } - - /** - * Test the case where the top level isn't a struct, but a long. - */ - @Test - public void testLongRoot() throws Exception { - conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0"); - conf.setOutputCommitter(NullOutputCommitter.class); - conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY"); - conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000); - conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024); - conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11"); - final String typeStr = "bigint"; - conf.set(OrcConf.SCHEMA.getAttribute(), typeStr); - FileOutputFormat.setOutputPath(conf, workDir); - TypeDescription type = TypeDescription.fromString(typeStr); - LongWritable value = new LongWritable(); - NullWritable nada = NullWritable.get(); - RecordWriter<NullWritable, LongWritable> writer = - new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf, - "long.orc", Reporter.NULL); - for(long lo=0; lo < 2000; ++lo) { - value.set(lo); - writer.write(nada, value); - } - writer.close(Reporter.NULL); - - Path path = new Path(workDir, "long.orc"); - Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - assertEquals(CompressionKind.SNAPPY, file.getCompressionKind()); - assertEquals(2000, file.getNumberOfRows()); - assertEquals(1000, file.getRowIndexStride()); - assertEquals(64 * 1024, file.getCompressionSize()); - assertEquals(OrcFile.Version.V_0_11, file.getFileVersion()); - FileSplit split = new FileSplit(path, 0, 100000, - new String[0]); - RecordReader<NullWritable, LongWritable> reader = - new OrcInputFormat<LongWritable>().getRecordReader(split, conf, - Reporter.NULL); - nada = reader.createKey(); - value = reader.createValue(); - for(long lo=0; lo < 2000; ++lo) { - assertEquals(true, reader.next(nada, value)); - assertEquals(lo, value.get()); - } - assertEquals(false, reader.next(nada, value)); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java new file mode 100644 index 0000000..27543c1 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcKey; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TestMapreduceOrcOutputFormat { + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + JobConf conf = new JobConf(); + FileSystem fs; + + { + try { + fs = FileSystem.getLocal(conf).getRaw(); + fs.delete(workDir, true); + fs.mkdirs(workDir); + } catch (IOException e) { + throw new IllegalStateException("bad fs init", e); + } + } + + @Test + public void testPredicatePushdown() throws Exception { + TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); + TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id); + final String typeStr = "struct<i:int,s:string>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr); + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000); + conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true); + OutputFormat<NullWritable, OrcStruct> outputFormat = + new OrcOutputFormat<OrcStruct>(); + RecordWriter<NullWritable, OrcStruct> writer = + outputFormat.getRecordWriter(attemptContext); + + // write 4000 rows with the integer and the binary string + TypeDescription type = TypeDescription.fromString(typeStr); + OrcStruct row = (OrcStruct) OrcStruct.createValue(type); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 4000; ++r) { + row.setFieldValue(0, new IntWritable(r)); + row.setFieldValue(1, new Text(Integer.toBinaryString(r))); + writer.write(nada, row); + } + writer.close(attemptContext); + + OrcInputFormat.setSearchArgument(conf, + SearchArgumentFactory.newBuilder() + .between("i", PredicateLeaf.Type.LONG, new Long(1500), new Long(1999)) + .build(), new String[]{null, "i", "s"}); + FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"), + 0, 1000000, new String[0]); + RecordReader<NullWritable, OrcStruct> reader = + new OrcInputFormat<OrcStruct>().createRecordReader(split, + attemptContext); + // the sarg should cause it to skip over the rows except 1000 to 2000 + for(int r=1000; r < 2000; ++r) { + assertEquals(true, reader.nextKeyValue()); + row = reader.getCurrentValue(); + assertEquals(r, ((IntWritable) row.getFieldValue(0)).get()); + assertEquals(Integer.toBinaryString(r), row.getFieldValue(1).toString()); + } + assertEquals(false, reader.nextKeyValue()); + } + + @Test + public void testColumnSelection() throws Exception { + String typeStr = "struct<i:int,j:int,k:int>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr); + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000); + conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true); + TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1); + TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id); + OutputFormat<NullWritable, OrcStruct> outputFormat = + new OrcOutputFormat<OrcStruct>(); + RecordWriter<NullWritable, OrcStruct> writer = + outputFormat.getRecordWriter(attemptContext); + + // write 4000 rows with the integer and the binary string + TypeDescription type = TypeDescription.fromString(typeStr); + OrcStruct row = (OrcStruct) OrcStruct.createValue(type); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 3000; ++r) { + row.setFieldValue(0, new IntWritable(r)); + row.setFieldValue(1, new IntWritable(r * 2)); + row.setFieldValue(2, new IntWritable(r * 3)); + writer.write(nada, row); + } + writer.close(attemptContext); + + conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute(), "0,2"); + FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"), + 0, 1000000, new String[0]); + RecordReader<NullWritable, OrcStruct> reader = + new OrcInputFormat<OrcStruct>().createRecordReader(split, + attemptContext); + // the sarg should cause it to skip over the rows except 1000 to 2000 + for(int r=0; r < 3000; ++r) { + assertEquals(true, reader.nextKeyValue()); + row = reader.getCurrentValue(); + assertEquals(r, ((IntWritable) row.getFieldValue(0)).get()); + assertEquals(null, row.getFieldValue(1)); + assertEquals(r * 3, ((IntWritable) row.getFieldValue(2)).get()); + } + assertEquals(false, reader.nextKeyValue()); + } + + + /** + * Make sure that the writer ignores the OrcKey + * @throws Exception + */ + @Test + public void testOrcKey() throws Exception { + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + String TYPE_STRING = "struct<i:int,s:string>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); + conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true); + TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1); + TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id); + TypeDescription schema = TypeDescription.fromString(TYPE_STRING); + OrcKey key = new OrcKey(new OrcStruct(schema)); + RecordWriter<NullWritable, Writable> writer = + new OrcOutputFormat<>().getRecordWriter(attemptContext); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 2000; ++r) { + ((OrcStruct) key.key).setAllFields(new IntWritable(r), + new Text(Integer.toString(r))); + writer.write(nada, key); + } + writer.close(attemptContext); + Path path = new Path(workDir, "part-m-00000.orc"); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + assertEquals(2000, file.getNumberOfRows()); + assertEquals(TYPE_STRING, file.getSchema().toString()); + } + + /** + * Make sure that the writer ignores the OrcValue + * @throws Exception + */ + @Test + public void testOrcValue() throws Exception { + conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); + String TYPE_STRING = "struct<i:int>"; + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); + conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true); + TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1); + TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id); + + TypeDescription schema = TypeDescription.fromString(TYPE_STRING); + OrcValue value = new OrcValue(new OrcStruct(schema)); + RecordWriter<NullWritable, Writable> writer = + new OrcOutputFormat<>().getRecordWriter(attemptContext); + NullWritable nada = NullWritable.get(); + for(int r=0; r < 3000; ++r) { + ((OrcStruct) value.value).setAllFields(new IntWritable(r)); + writer.write(nada, value); + } + writer.close(attemptContext); + Path path = new Path(workDir, "part-m-00000.orc"); + Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + assertEquals(3000, file.getNumberOfRows()); + assertEquals(TYPE_STRING, file.getSchema().toString()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java ---------------------------------------------------------------------- diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java new file mode 100644 index 0000000..01208e1 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcKey; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Iterator; + +public class TestMrUnit { + JobConf conf = new JobConf(); + + /** + * Split the input struct into its two parts. + */ + public static class MyMapper + extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> { + private OrcKey keyWrapper = new OrcKey(); + private OrcValue valueWrapper = new OrcValue(); + + @Override + protected void map(NullWritable key, + OrcStruct value, + Context context + ) throws IOException, InterruptedException { + keyWrapper.key = value.getFieldValue(0); + valueWrapper.value = value.getFieldValue(1); + context.write(keyWrapper, valueWrapper); + } + } + + /** + * Glue the key and values back together. + */ + public static class MyReducer + extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> { + private OrcStruct output = new OrcStruct(TypeDescription.fromString + ("struct<first:struct<x:int,y:int>,second:struct<z:string>>")); + private final NullWritable nada = NullWritable.get(); + + @Override + protected void reduce(OrcKey key, + Iterable<OrcValue> values, + Context context + ) throws IOException, InterruptedException { + output.setFieldValue(0, key.key); + for(OrcValue value: values) { + output.setFieldValue(1, value.value); + context.write(nada, output); + } + } + } + + /** + * This class is intended to support MRUnit's object copying for input and + * output objects. + * + * Real mapreduce contexts should NEVER use this class. + * + * The type string is serialized before each value. + */ + public static class OrcStructSerialization + implements Serialization<OrcStruct> { + + @Override + public boolean accept(Class<?> cls) { + return OrcStruct.class.isAssignableFrom(cls); + } + + @Override + public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) { + return new Serializer<OrcStruct>() { + DataOutputStream dataOut; + + public void open(OutputStream out) { + if(out instanceof DataOutputStream) { + dataOut = (DataOutputStream)out; + } else { + dataOut = new DataOutputStream(out); + } + } + + public void serialize(OrcStruct w) throws IOException { + Text.writeString(dataOut, w.getSchema().toString()); + w.write(dataOut); + } + + public void close() throws IOException { + dataOut.close(); + } + }; + } + + @Override + public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) { + return new Deserializer<OrcStruct>() { + DataInputStream input; + + @Override + public void open(InputStream inputStream) throws IOException { + if(inputStream instanceof DataInputStream) { + input = (DataInputStream)inputStream; + } else { + input = new DataInputStream(inputStream); + } + } + + @Override + public OrcStruct deserialize(OrcStruct orcStruct) throws IOException { + String typeStr = Text.readString(input); + OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr)); + result.readFields(input); + return result; + } + + @Override + public void close() throws IOException { + // PASS + } + }; + } + } + + @Test + public void testMapred() throws IOException { + conf.set("io.serializations", + OrcStructSerialization.class.getName() + "," + + WritableSerialization.class.getName()); + OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>"); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>"); + MyMapper mapper = new MyMapper(); + MyReducer reducer = new MyReducer(); + MapReduceDriver<NullWritable, OrcStruct, + OrcKey, OrcValue, + NullWritable, OrcStruct> driver = + new MapReduceDriver<>(mapper, reducer); + driver.setConfiguration(conf); + NullWritable nada = NullWritable.get(); + OrcStruct input = (OrcStruct) OrcStruct.createValue( + TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>")); + IntWritable x = + (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0); + IntWritable y = + (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1); + Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0); + + // generate the input stream + for(int r=0; r < 20; ++r) { + x.set(100 - (r / 4)); + y.set(r*2); + z.set(Integer.toHexString(r)); + driver.withInput(nada, input); + } + + // generate the expected outputs + for(int g=4; g >= 0; --g) { + x.set(100 - g); + for(int i=0; i < 4; ++i) { + int r = g * 4 + i; + y.set(r * 2); + z.set(Integer.toHexString(r)); + driver.withOutput(nada, input); + } + } + driver.runTest(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index 9941dee..2eacd7a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -47,6 +47,7 @@ <junit.version>4.11</junit.version> <kryo.version>3.0.3</kryo.version> <mockito.version>1.9.5</mockito.version> + <mrunit.version>1.1.0</mrunit.version> <protobuf.version>2.5.0</protobuf.version> <slf4j.version>1.7.5</slf4j.version> <snappy.version>0.2</snappy.version>
