Repository: parquet-mr Updated Branches: refs/heads/master af9fd052d -> 57694790f
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java new file mode 100644 index 0000000..28f7f32 --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.thrift.ParquetThriftInputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that returns a simple Tuple with a single value, the "value" object + * coming out of the underlying InputFormat. + * + * This is an abstract class; implementations are expected to set up their Input/Output Formats + * correctly in the respective Init methods. + */ +public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{ + + public static final class Config<T> implements Serializable { + private final FilterPredicate filterPredicate; + private final String deprecatedProjectionString; + private final String strictProjectionString; + private final Class<T> klass; + + private Config(Class<T> klass, FilterPredicate filterPredicate, String deprecatedProjectionString, String strictProjectionString) { + this.filterPredicate = filterPredicate; + this.deprecatedProjectionString = deprecatedProjectionString; + this.strictProjectionString = strictProjectionString; + this.klass = klass; + } + + public Config() { + filterPredicate = null; + deprecatedProjectionString = null; + strictProjectionString = null; + klass = null; + } + + public FilterPredicate getFilterPredicate() { + return filterPredicate; + } + + @Deprecated + public String getProjectionString() { + return deprecatedProjectionString; + } + + public String getStrictProjectionString() { + return strictProjectionString; + } + + public Class<T> getKlass() { + return klass; + } + + public Config<T> withFilterPredicate(FilterPredicate f) { + return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.deprecatedProjectionString, this.strictProjectionString); + } + + @Deprecated + public Config<T> withProjectionString(String p) { + return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionString"), this.strictProjectionString); + } + + public Config<T> withStrictProjectionString(String p) { + return new Config<T>(this.klass, this.filterPredicate, this.deprecatedProjectionString, checkNotNull(p, "projectionString")); + } + + public Config<T> withRecordClass(Class<T> klass) { + return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate, this.deprecatedProjectionString, this.strictProjectionString); + } + } + + private static final long serialVersionUID = 157560846420730043L; + protected final Config<T> config; + + public ParquetValueScheme() { + this(new Config<T>()); + } + + public ParquetValueScheme(FilterPredicate filterPredicate) { + this(new Config<T>().withFilterPredicate(filterPredicate)); + } + + public ParquetValueScheme(Config<T> config) { + this.config = config; + } + + @Deprecated + private void setProjectionPushdown(JobConf jobConf) { + if (this.config.deprecatedProjectionString != null) { + ThriftReadSupport.setProjectionPushdown(jobConf, this.config.deprecatedProjectionString); + } + } + + private void setStrictProjectionPushdown(JobConf jobConf) { + if (this.config.strictProjectionString != null) { + ThriftReadSupport.setStrictFieldProjectionFilter(jobConf, this.config.strictProjectionString); + } + } + + private void setPredicatePushdown(JobConf jobConf) { + if (this.config.filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate); + } + } + @Override + public void sourceConfInit(FlowProcess<? extends JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, JobConf jobConf) { + setPredicatePushdown(jobConf); + setProjectionPushdown(jobConf); + setStrictProjectionPushdown(jobConf); + setRecordClass(jobConf); + } + + private void setRecordClass(JobConf jobConf) { + if (config.klass != null) { + ParquetThriftInputFormat.setThriftClass(jobConf, config.klass); + } + } + + @SuppressWarnings("unchecked") + @Override + public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc) + throws IOException { + Container<T> value = (Container<T>) sc.getInput().createValue(); + boolean hasNext = sc.getInput().next(null, value); + if (!hasNext) { return false; } + + // Skip nulls + if (value == null) { return true; } + + sc.getIncomingEntry().setTuple(new Tuple(value.get())); + return true; + } + + @SuppressWarnings("unchecked") + @Override + public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sc) + throws IOException { + TupleEntry tuple = sc.getOutgoingEntry(); + + if (tuple.size() != 1) { + throw new RuntimeException("ParquetValueScheme expects tuples with an arity of exactly 1, but found " + tuple.getFields()); + } + + T value = (T) tuple.getObject(0); + OutputCollector output = sc.getOutput(); + output.collect(null, value); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java new file mode 100644 index 0000000..7b9f817 --- /dev/null +++ b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import cascading.flow.Flow; +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowConnector; +import cascading.operation.BaseOperation; +import cascading.operation.Function; +import cascading.operation.FunctionCall; +import cascading.pipe.Each; +import cascading.pipe.Pipe; +import cascading.scheme.Scheme; +import cascading.scheme.hadoop.TextLine; +import cascading.tap.Tap; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Test; +import static org.junit.Assert.*; + +import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.thrift.test.Name; + +import java.io.File; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +public class TestParquetTBaseScheme { + final String txtInputPath = "target/test-classes/names.txt"; + final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in"; + final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out"; + final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out"; + + @Test + public void testWrite() throws Exception { + Path path = new Path(parquetOutputPath); + JobConf jobConf = new JobConf(); + final FileSystem fs = path.getFileSystem(jobConf); + if (fs.exists(path)) fs.delete(path, true); + + Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) ); + Tap source = new Hfs(sourceScheme, txtInputPath); + + Scheme sinkScheme = new ParquetTBaseScheme(Name.class); + Tap sink = new Hfs(sinkScheme, parquetOutputPath); + + Pipe assembly = new Pipe( "namecp" ); + assembly = new Each(assembly, new PackThriftFunction()); + HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector(); + Flow flow = hadoopFlowConnector.connect("namecp", source, sink, assembly); + + flow.complete(); + + assertTrue(fs.exists(new Path(parquetOutputPath))); + assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata"))); + assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata"))); + } + + @Test + public void testRead() throws Exception { + doRead(new ParquetTBaseScheme(Name.class)); + } + + @Test + public void testReadWithoutClass() throws Exception { + doRead(new ParquetTBaseScheme()); + } + + private void doRead(Scheme sourceScheme) throws Exception { + createFileForRead(); + + Path path = new Path(txtOutputPath); + final FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.exists(path)) fs.delete(path, true); + + Tap source = new Hfs(sourceScheme, parquetInputPath); + + Scheme sinkScheme = new TextLine(new Fields("first", "last")); + Tap sink = new Hfs(sinkScheme, txtOutputPath); + + Pipe assembly = new Pipe( "namecp" ); + assembly = new Each(assembly, new UnpackThriftFunction()); + Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); + + flow.complete(); + String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000")); + assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result); + } + + + private void createFileForRead() throws Exception { + final Path fileToCreate = new Path(parquetInputPath+"/names.parquet"); + + final Configuration conf = new Configuration(); + final FileSystem fs = fileToCreate.getFileSystem(conf); + if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true); + + TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); + ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); + + Name n1 = new Name(); + n1.setFirst_name("Alice"); + n1.setLast_name("Practice"); + Name n2 = new Name(); + n2.setFirst_name("Bob"); + n2.setLast_name("Hope"); + Name n3 = new Name(); + n3.setFirst_name("Charlie"); + n3.setLast_name("Horse"); + + n1.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + baos.reset(); + n2.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + baos.reset(); + n3.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + w.close(); + } + + private static class PackThriftFunction extends BaseOperation implements Function { + @Override + public void operate(FlowProcess flowProcess, FunctionCall functionCall) { + TupleEntry arguments = functionCall.getArguments(); + Tuple result = new Tuple(); + + Name name = new Name(); + name.setFirst_name(arguments.getString(0)); + name.setLast_name(arguments.getString(1)); + + result.add(name); + functionCall.getOutputCollector().add(result); + } + } + + private static class UnpackThriftFunction extends BaseOperation implements Function { + @Override + public void operate(FlowProcess flowProcess, FunctionCall functionCall) { + TupleEntry arguments = functionCall.getArguments(); + Tuple result = new Tuple(); + + Name name = (Name) arguments.getObject(0); + result.add(name.getFirst_name()); + result.add(name.getLast_name()); + functionCall.getOutputCollector().add(result); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java index 6f86062..55a6326 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java @@ -202,14 +202,20 @@ public class TestMergeMetadataFiles { ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2), mergedOut, info.conf); fail("this should throw"); } catch (RuntimeException e) { - assertEquals("could not merge metadata: key schema_num has conflicting values: [2, 1]", e.getMessage()); + boolean eq1 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting values: [2, 1]"); + boolean eq2 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting values: [1, 2]"); + + assertEquals(eq1 || eq2, true); } try { ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2), mergedCommonOut, info.conf); fail("this should throw"); } catch (RuntimeException e) { - assertEquals("could not merge metadata: key schema_num has conflicting values: [2, 1]", e.getMessage()); + boolean eq1 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting values: [2, 1]"); + boolean eq2 = e.getMessage().equals("could not merge metadata: key schema_num has conflicting values: [1, 2]"); + + assertEquals(eq1 || eq2, true); } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet_cascading.md ---------------------------------------------------------------------- diff --git a/parquet_cascading.md b/parquet_cascading.md index a1b0a68..0eeaceb 100644 --- a/parquet_cascading.md +++ b/parquet_cascading.md @@ -147,4 +147,17 @@ scheme builder classes. ### 2.2 Projection Pushdown with Tuples When using ParquetTupleScheme, specifying projection pushdown is as simple as specifying fields as the parameter of the constructor of ParquetTupleScheme: + +3. Cascading 2.0 & Cascading 3.0 +================================ +Cascading 3.0 introduced a breaking interface change in the Scheme abstract class, which causes a breaking change in all scheme implementations. +The parquet-cascading3 directory contains a separate library for use with Cascading 3.0 + +A significant part of the code remains identical; this shared part is in the parquet-cascading-common23 directory, which is not a Maven module. + +You cannot use both parquet-cascading and parquet-cascading3 in the same Classloader, which should be fine as you cannot use both cascading-core 2.x and cascading-core 3.x in the same Classloader either. + + + + `Scheme sourceScheme = new ParquetTupleScheme(new Fields("age"));` http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 94d7a02..98fd862 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,7 @@ <shade.prefix>shaded.parquet</shade.prefix> <hadoop.version>1.1.0</hadoop.version> <cascading.version>2.5.3</cascading.version> + <cascading3.version>3.0.3</cascading3.version> <parquet.format.version>2.3.1</parquet.format.version> <previous.version>1.7.0</previous.version> <thrift.executable>thrift</thrift.executable> @@ -98,6 +99,7 @@ <module>parquet-avro</module> <module>parquet-benchmarks</module> <module>parquet-cascading</module> + <module>parquet-cascading3</module> <module>parquet-column</module> <module>parquet-common</module> <module>parquet-encoding</module> @@ -197,6 +199,13 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> <artifactId>maven-enforcer-plugin</artifactId> <version>1.3.1</version> @@ -322,6 +331,23 @@ </execution> </executions--> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-resources-plugin</artifactId> + <version>2.7</version> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-remote-resources-plugin</artifactId> + <version>1.5</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> <!-- Override source and target from the ASF parent --> <groupId>org.apache.maven.plugins</groupId>
