PARQUET-480: Update for Cascading 3.0 The code in parquet-cascading is adapted to the API as of Cascading 2.5.3
Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo<JobConf> to requiring a Foo<? extends JobConf> Author: Cyrille Chépélov (TP12) <[email protected]> Closes #284 from cchepelov/try_cascading3 and squashes the following commits: e7d1304 [Cyrille Chépélov (TP12)] Adding a @Deprecated notice on parquet-cascading's remaining classes 05a417d [Cyrille Chépélov (TP12)] cascading2/3: share back TupleWriteSupport.java (accidentally unmerged) 7fff2d4 [Cyrille Chépélov (TP12)] cascading/cascading3: remove duplicates, push common files into parquet-cascading-common23 338a416 [Cyrille Chépélov (TP12)] Removing unwanted file (what?!) + .gitignoring this kind of files d9f0455 [Cyrille Chépélov (TP12)] TupleEntry#get is now TupleEntry#getObject a7f490a [Cyrille Chépélov (TP12)] Revert "Missing test conversion to Cascading 3.0" cc8b870 [Cyrille Chépélov (TP12)] Missing test conversion to Cascading 3.0 2d73512 [Cyrille Chépélov (TP12)] conflicting values can come in one order or the other. Accept both. 33355d5 [Cyrille Chépélov (TP12)] Fix version mismatch (duh!) 7128639 [Cyrille Chépélov (TP12)] non-C locale can break tests implementation (decimal formats) 53aa2f9 [Cyrille Chépélov (TP12)] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes) Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/57694790 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/57694790 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/57694790 Branch: refs/heads/master Commit: 57694790f8ca0e1a4f3ac76fbd25a6dd13041e03 Parents: af9fd05 Author: Cyrille Chépélov (TP12) <[email protected]> Authored: Sun Jan 31 19:21:48 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Sun Jan 31 19:21:48 2016 -0800 ---------------------------------------------------------------------- .gitignore | 3 + README.md | 2 +- .../parquet/cascading/SchemaIntersection.java | 63 ++++++ .../parquet/cascading/TupleReadSupport.java | 80 ++++++++ .../parquet/cascading/TupleWriteSupport.java | 111 +++++++++++ .../cascading/convert/TupleConverter.java | 115 +++++++++++ .../convert/TupleRecordMaterializer.java | 46 +++++ .../cascading/TestParquetTupleScheme.java | 182 ++++++++++++++++++ .../src/test/resources/names.txt | 3 + .../src/test/thrift/test.thrift | 25 +++ parquet-cascading/pom.xml | 47 +++++ .../parquet/cascading/ParquetTBaseScheme.java | 1 + .../parquet/cascading/ParquetTupleScheme.java | 1 + .../parquet/cascading/ParquetValueScheme.java | 1 + .../parquet/cascading/SchemaIntersection.java | 63 ------ .../parquet/cascading/TupleReadSupport.java | 80 -------- .../parquet/cascading/TupleWriteSupport.java | 111 ----------- .../cascading/convert/TupleConverter.java | 115 ----------- .../convert/TupleRecordMaterializer.java | 46 ----- .../cascading/TestParquetTBaseScheme.java | 3 +- .../cascading/TestParquetTupleScheme.java | 182 ------------------ parquet-cascading/src/test/resources/names.txt | 3 - parquet-cascading/src/test/thrift/test.thrift | 25 --- parquet-cascading3/REVIEWERS.md | 27 +++ parquet-cascading3/pom.xml | 178 +++++++++++++++++ .../parquet/cascading/ParquetTBaseScheme.java | 80 ++++++++ .../parquet/cascading/ParquetTupleScheme.java | 191 +++++++++++++++++++ .../parquet/cascading/ParquetValueScheme.java | 184 ++++++++++++++++++ .../cascading/TestParquetTBaseScheme.java | 186 ++++++++++++++++++ .../parquet/hadoop/TestMergeMetadataFiles.java | 10 +- parquet_cascading.md | 13 ++ pom.xml | 26 +++ 32 files changed, 1574 insertions(+), 629 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index cd3c066..aa67d3d 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ dependency-reduced-pom.xml parquet-scrooge/.cache .idea/* target/ +.cache +*~ +mvn_install.log http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 2d9a50a..9bb0be6 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ sudo make install Once protobuf and thrift are available in your path, you can build the project by running: ``` -mvn clean install +LC_ALL=C mvn clean install ``` ## Features http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java new file mode 100644 index 0000000..e3fc3f7 --- /dev/null +++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +import cascading.tuple.Fields; + +import java.util.List; +import java.util.ArrayList; + +public class SchemaIntersection { + + private final MessageType requestedSchema; + private final Fields sourceFields; + + public SchemaIntersection(MessageType fileSchema, Fields requestedFields) { + if(requestedFields == Fields.UNKNOWN) + requestedFields = Fields.ALL; + + Fields newFields = Fields.NONE; + List<Type> newSchemaFields = new ArrayList<Type>(); + int schemaSize = fileSchema.getFieldCount(); + + for (int i = 0; i < schemaSize; i++) { + Type type = fileSchema.getType(i); + Fields name = new Fields(type.getName()); + + if(requestedFields.contains(name)) { + newFields = newFields.append(name); + newSchemaFields.add(type); + } + } + + this.sourceFields = newFields; + this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields); + } + + public MessageType getRequestedSchema() { + return requestedSchema; + } + + public Fields getSourceFields() { + return sourceFields; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java new file mode 100644 index 0000000..42a5926 --- /dev/null +++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import java.util.Map; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.commons.lang.StringUtils; + +import cascading.tuple.Tuple; +import cascading.tuple.Fields; +import cascading.flow.hadoop.util.HadoopUtil; + +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.cascading.convert.TupleRecordMaterializer; + + +public class TupleReadSupport extends ReadSupport<Tuple> { + static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields"; + + static protected Fields getRequestedFields(Configuration configuration) { + String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS); + + if(fieldsString == null) + return Fields.ALL; + + String[] parts = StringUtils.split(fieldsString, ":"); + if(parts.length == 0) + return Fields.ALL; + else + return new Fields(parts); + } + + static protected void setRequestedFields(JobConf configuration, Fields fields) { + String fieldsString = StringUtils.join(fields.iterator(), ":"); + configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString); + } + + @Override + public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) { + Fields requestedFields = getRequestedFields(configuration); + if (requestedFields == null) { + return new ReadContext(fileSchema); + } else { + SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields); + return new ReadContext(intersection.getRequestedSchema()); + } + } + + @Override + public RecordMaterializer<Tuple> prepareForRead( + Configuration configuration, + Map<String, String> keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + MessageType requestedSchema = readContext.getRequestedSchema(); + return new TupleRecordMaterializer(requestedSchema); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java new file mode 100644 index 0000000..032f534 --- /dev/null +++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import cascading.tuple.TupleEntry; +import java.util.HashMap; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +/** + * + * + * @author Mickaël Lacour <[email protected]> + */ +public class TupleWriteSupport extends WriteSupport<TupleEntry> { + + private RecordConsumer recordConsumer; + private MessageType rootSchema; + public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema"; + + @Override + public String getName() { + return "cascading"; + } + + @Override + public WriteContext init(Configuration configuration) { + String schema = configuration.get(PARQUET_CASCADING_SCHEMA); + rootSchema = MessageTypeParser.parseMessageType(schema); + return new WriteContext(rootSchema, new HashMap<String, String>()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(TupleEntry record) { + recordConsumer.startMessage(); + final List<Type> fields = rootSchema.getFields(); + + for (int i = 0; i < fields.size(); i++) { + Type field = fields.get(i); + + if (record == null || record.getObject(field.getName()) == null) { + continue; + } + recordConsumer.startField(field.getName(), i); + if (field.isPrimitive()) { + writePrimitive(record, field.asPrimitiveType()); + } else { + throw new UnsupportedOperationException("Complex type not implemented"); + } + recordConsumer.endField(field.getName(), i); + } + recordConsumer.endMessage(); + } + + private void writePrimitive(TupleEntry record, PrimitiveType field) { + switch (field.getPrimitiveTypeName()) { + case BINARY: + recordConsumer.addBinary(Binary.fromString(record.getString(field.getName()))); + break; + case BOOLEAN: + recordConsumer.addBoolean(record.getBoolean(field.getName())); + break; + case INT32: + recordConsumer.addInteger(record.getInteger(field.getName())); + break; + case INT64: + recordConsumer.addLong(record.getLong(field.getName())); + break; + case DOUBLE: + recordConsumer.addDouble(record.getDouble(field.getName())); + break; + case FLOAT: + recordConsumer.addFloat(record.getFloat(field.getName())); + break; + case FIXED_LEN_BYTE_ARRAY: + throw new UnsupportedOperationException("Fixed len byte array type not implemented"); + case INT96: + throw new UnsupportedOperationException("Int96 type not implemented"); + default: + throw new UnsupportedOperationException(field.getName() + " type not implemented"); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java new file mode 100644 index 0000000..3741165 --- /dev/null +++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading.convert; + +import cascading.tuple.Tuple; + +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.pig.TupleConversionException; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; + +public class TupleConverter extends GroupConverter { + + protected Tuple currentTuple; + private final Converter[] converters; + + public TupleConverter(GroupType parquetSchema) { + int schemaSize = parquetSchema.getFieldCount(); + + this.converters = new Converter[schemaSize]; + for (int i = 0; i < schemaSize; i++) { + Type type = parquetSchema.getType(i); + converters[i] = newConverter(type, i); + } + } + + private Converter newConverter(Type type, int i) { + if(!type.isPrimitive()) { + throw new IllegalArgumentException("cascading can only build tuples from primitive types"); + } else { + return new TuplePrimitiveConverter(this, i); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + final public void start() { + currentTuple = Tuple.size(converters.length); + } + + @Override + public void end() { + } + + final public Tuple getCurrentTuple() { + return currentTuple; + } + + static final class TuplePrimitiveConverter extends PrimitiveConverter { + private final TupleConverter parent; + private final int index; + + public TuplePrimitiveConverter(TupleConverter parent, int index) { + this.parent = parent; + this.index = index; + } + + @Override + public void addBinary(Binary value) { + parent.getCurrentTuple().setString(index, value.toStringUsingUTF8()); + } + + @Override + public void addBoolean(boolean value) { + parent.getCurrentTuple().setBoolean(index, value); + } + + @Override + public void addDouble(double value) { + parent.getCurrentTuple().setDouble(index, value); + } + + @Override + public void addFloat(float value) { + parent.getCurrentTuple().setFloat(index, value); + } + + @Override + public void addInt(int value) { + parent.getCurrentTuple().setInteger(index, value); + } + + @Override + public void addLong(long value) { + parent.getCurrentTuple().setLong(index, value); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java new file mode 100644 index 0000000..275e17b --- /dev/null +++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading.convert; + +import cascading.tuple.Tuple; +import cascading.tuple.Fields; + +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.GroupType; + +public class TupleRecordMaterializer extends RecordMaterializer<Tuple> { + + private TupleConverter root; + + public TupleRecordMaterializer(GroupType parquetSchema) { + this.root = new TupleConverter(parquetSchema); + } + + @Override + public Tuple getCurrentRecord() { + return root.getCurrentTuple(); + } + + @Override + public GroupConverter getRootConverter() { + return root; + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java new file mode 100644 index 0000000..de350dd --- /dev/null +++ b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import cascading.flow.Flow; +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowConnector; +import cascading.operation.BaseOperation; +import cascading.operation.Function; +import cascading.operation.FunctionCall; +import cascading.pipe.Each; +import cascading.pipe.Pipe; +import cascading.scheme.Scheme; +import cascading.scheme.hadoop.TextLine; +import cascading.tap.Tap; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TIOStreamTransport; +import org.junit.Test; +import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; +import org.apache.parquet.hadoop.util.ContextUtil; +import org.apache.parquet.thrift.test.Name; + +import java.io.ByteArrayOutputStream; +import java.io.File; + +import static org.junit.Assert.assertEquals; + +public class TestParquetTupleScheme { + final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in"; + final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out"; + + @Test + public void testReadPattern() throws Exception { + String sourceFolder = parquetInputPath; + testReadWrite(sourceFolder); + + String sourceGlobPattern = parquetInputPath + "/*"; + testReadWrite(sourceGlobPattern); + + String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*"; + testReadWrite(multiLevelGlobPattern); + } + + @Test + public void testFieldProjection() throws Exception { + createFileForRead(); + + Path path = new Path(txtOutputPath); + final FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.exists(path)) fs.delete(path, true); + + Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name")); + Tap source = new Hfs(sourceScheme, parquetInputPath); + + Scheme sinkScheme = new TextLine(new Fields("last_name")); + Tap sink = new Hfs(sinkScheme, txtOutputPath); + + Pipe assembly = new Pipe("namecp"); + assembly = new Each(assembly, new ProjectedTupleFunction()); + Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); + + flow.complete(); + String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); + assertEquals("Practice\nHope\nHorse\n", result); + } + + public void testReadWrite(String inputPath) throws Exception { + createFileForRead(); + + Path path = new Path(txtOutputPath); + final FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.exists(path)) fs.delete(path, true); + + Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name")); + Tap source = new Hfs(sourceScheme, inputPath); + + Scheme sinkScheme = new TextLine(new Fields("first", "last")); + Tap sink = new Hfs(sinkScheme, txtOutputPath); + + Pipe assembly = new Pipe("namecp"); + assembly = new Each(assembly, new UnpackTupleFunction()); + Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); + + flow.complete(); + String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); + assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result); + } + + private void createFileForRead() throws Exception { + final Path fileToCreate = new Path(parquetInputPath + "/names.parquet"); + + final Configuration conf = new Configuration(); + final FileSystem fs = fileToCreate.getFileSystem(conf); + if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true); + + TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); + TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); + ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); + + Name n1 = new Name(); + n1.setFirst_name("Alice"); + n1.setLast_name("Practice"); + Name n2 = new Name(); + n2.setFirst_name("Bob"); + n2.setLast_name("Hope"); + Name n3 = new Name(); + n3.setFirst_name("Charlie"); + n3.setLast_name("Horse"); + + n1.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + baos.reset(); + n2.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + baos.reset(); + n3.write(protocol); + w.write(new BytesWritable(baos.toByteArray())); + w.close(); + } + + private static class UnpackTupleFunction extends BaseOperation implements Function { + @Override + public void operate(FlowProcess flowProcess, FunctionCall functionCall) { + TupleEntry arguments = functionCall.getArguments(); + Tuple result = new Tuple(); + + Tuple name = new Tuple(); + name.addString(arguments.getString(0)); + name.addString(arguments.getString(1)); + + result.add(name); + functionCall.getOutputCollector().add(result); + } + } + + private static class ProjectedTupleFunction extends BaseOperation implements Function { + @Override + public void operate(FlowProcess flowProcess, FunctionCall functionCall) { + TupleEntry arguments = functionCall.getArguments(); + Tuple result = new Tuple(); + + Tuple name = new Tuple(); + name.addString(arguments.getString(0)); +// name.addString(arguments.getString(1)); + + result.add(name); + functionCall.getOutputCollector().add(result); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/resources/names.txt ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/test/resources/names.txt b/parquet-cascading-common23/src/test/resources/names.txt new file mode 100644 index 0000000..e2d0408 --- /dev/null +++ b/parquet-cascading-common23/src/test/resources/names.txt @@ -0,0 +1,3 @@ +Alice Practive +Bob Hope +Charlie Horse http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/thrift/test.thrift ---------------------------------------------------------------------- diff --git a/parquet-cascading-common23/src/test/thrift/test.thrift b/parquet-cascading-common23/src/test/thrift/test.thrift new file mode 100644 index 0000000..c58843d --- /dev/null +++ b/parquet-cascading-common23/src/test/thrift/test.thrift @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace java org.apache.parquet.thrift.test + +struct Name { + 1: required string first_name, + 2: optional string last_name +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml index 0cd8588..cabb003 100644 --- a/parquet-cascading/pom.xml +++ b/parquet-cascading/pom.xml @@ -103,6 +103,51 @@ <build> <plugins> <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>../parquet-cascading-common23/src/main/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>../parquet-cascading-common23/src/test/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-test-resource</id> + <phase>generate-test-resources</phase> + <goals> + <goal>add-test-resource</goal> + </goals> + <configuration> + <resources> + <resource> + <directory>../parquet-cascading-common23/src/test/resources</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> <artifactId>maven-enforcer-plugin</artifactId> </plugin> <plugin> @@ -115,6 +160,8 @@ <version>0.1.10</version> <configuration> <thriftExecutable>${thrift.executable}</thriftExecutable> + <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot> + <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot> </configuration> <executions> <execution> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java index ea70d43..b34ee7d 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.thrift.ThriftReadSupport; import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; import org.apache.parquet.thrift.TBaseRecordConverter; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> { // In the case of reads, we can read the thrift class from the file metadata http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java index 41b56d0..3b7d715 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -59,6 +59,7 @@ import static org.apache.parquet.Preconditions.checkNotNull; * @author Avi Bryant */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java index 9549ef4..6c34a84 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java @@ -47,6 +47,7 @@ import static org.apache.parquet.Preconditions.checkNotNull; * This is an abstract class; implementations are expected to set up their Input/Output Formats * correctly in the respective Init methods. */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{ public static final class Config<T> implements Serializable { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java deleted file mode 100644 index e3fc3f7..0000000 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; - -import cascading.tuple.Fields; - -import java.util.List; -import java.util.ArrayList; - -public class SchemaIntersection { - - private final MessageType requestedSchema; - private final Fields sourceFields; - - public SchemaIntersection(MessageType fileSchema, Fields requestedFields) { - if(requestedFields == Fields.UNKNOWN) - requestedFields = Fields.ALL; - - Fields newFields = Fields.NONE; - List<Type> newSchemaFields = new ArrayList<Type>(); - int schemaSize = fileSchema.getFieldCount(); - - for (int i = 0; i < schemaSize; i++) { - Type type = fileSchema.getType(i); - Fields name = new Fields(type.getName()); - - if(requestedFields.contains(name)) { - newFields = newFields.append(name); - newSchemaFields.add(type); - } - } - - this.sourceFields = newFields; - this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields); - } - - public MessageType getRequestedSchema() { - return requestedSchema; - } - - public Fields getSourceFields() { - return sourceFields; - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java deleted file mode 100644 index 42a5926..0000000 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import java.util.Map; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.commons.lang.StringUtils; - -import cascading.tuple.Tuple; -import cascading.tuple.Fields; -import cascading.flow.hadoop.util.HadoopUtil; - -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.cascading.convert.TupleRecordMaterializer; - - -public class TupleReadSupport extends ReadSupport<Tuple> { - static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields"; - - static protected Fields getRequestedFields(Configuration configuration) { - String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS); - - if(fieldsString == null) - return Fields.ALL; - - String[] parts = StringUtils.split(fieldsString, ":"); - if(parts.length == 0) - return Fields.ALL; - else - return new Fields(parts); - } - - static protected void setRequestedFields(JobConf configuration, Fields fields) { - String fieldsString = StringUtils.join(fields.iterator(), ":"); - configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString); - } - - @Override - public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) { - Fields requestedFields = getRequestedFields(configuration); - if (requestedFields == null) { - return new ReadContext(fileSchema); - } else { - SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields); - return new ReadContext(intersection.getRequestedSchema()); - } - } - - @Override - public RecordMaterializer<Tuple> prepareForRead( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - MessageType requestedSchema = readContext.getRequestedSchema(); - return new TupleRecordMaterializer(requestedSchema); - } - -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java deleted file mode 100644 index 032f534..0000000 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import cascading.tuple.TupleEntry; -import java.util.HashMap; -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; - -/** - * - * - * @author Mickaël Lacour <[email protected]> - */ -public class TupleWriteSupport extends WriteSupport<TupleEntry> { - - private RecordConsumer recordConsumer; - private MessageType rootSchema; - public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema"; - - @Override - public String getName() { - return "cascading"; - } - - @Override - public WriteContext init(Configuration configuration) { - String schema = configuration.get(PARQUET_CASCADING_SCHEMA); - rootSchema = MessageTypeParser.parseMessageType(schema); - return new WriteContext(rootSchema, new HashMap<String, String>()); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - @Override - public void write(TupleEntry record) { - recordConsumer.startMessage(); - final List<Type> fields = rootSchema.getFields(); - - for (int i = 0; i < fields.size(); i++) { - Type field = fields.get(i); - - if (record == null || record.getObject(field.getName()) == null) { - continue; - } - recordConsumer.startField(field.getName(), i); - if (field.isPrimitive()) { - writePrimitive(record, field.asPrimitiveType()); - } else { - throw new UnsupportedOperationException("Complex type not implemented"); - } - recordConsumer.endField(field.getName(), i); - } - recordConsumer.endMessage(); - } - - private void writePrimitive(TupleEntry record, PrimitiveType field) { - switch (field.getPrimitiveTypeName()) { - case BINARY: - recordConsumer.addBinary(Binary.fromString(record.getString(field.getName()))); - break; - case BOOLEAN: - recordConsumer.addBoolean(record.getBoolean(field.getName())); - break; - case INT32: - recordConsumer.addInteger(record.getInteger(field.getName())); - break; - case INT64: - recordConsumer.addLong(record.getLong(field.getName())); - break; - case DOUBLE: - recordConsumer.addDouble(record.getDouble(field.getName())); - break; - case FLOAT: - recordConsumer.addFloat(record.getFloat(field.getName())); - break; - case FIXED_LEN_BYTE_ARRAY: - throw new UnsupportedOperationException("Fixed len byte array type not implemented"); - case INT96: - throw new UnsupportedOperationException("Int96 type not implemented"); - default: - throw new UnsupportedOperationException(field.getName() + " type not implemented"); - } - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java deleted file mode 100644 index 3741165..0000000 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading.convert; - -import cascading.tuple.Tuple; - -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.Converter; -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.PrimitiveConverter; -import org.apache.parquet.pig.TupleConversionException; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; -import org.apache.parquet.schema.Type.Repetition; - -public class TupleConverter extends GroupConverter { - - protected Tuple currentTuple; - private final Converter[] converters; - - public TupleConverter(GroupType parquetSchema) { - int schemaSize = parquetSchema.getFieldCount(); - - this.converters = new Converter[schemaSize]; - for (int i = 0; i < schemaSize; i++) { - Type type = parquetSchema.getType(i); - converters[i] = newConverter(type, i); - } - } - - private Converter newConverter(Type type, int i) { - if(!type.isPrimitive()) { - throw new IllegalArgumentException("cascading can only build tuples from primitive types"); - } else { - return new TuplePrimitiveConverter(this, i); - } - } - - @Override - public Converter getConverter(int fieldIndex) { - return converters[fieldIndex]; - } - - @Override - final public void start() { - currentTuple = Tuple.size(converters.length); - } - - @Override - public void end() { - } - - final public Tuple getCurrentTuple() { - return currentTuple; - } - - static final class TuplePrimitiveConverter extends PrimitiveConverter { - private final TupleConverter parent; - private final int index; - - public TuplePrimitiveConverter(TupleConverter parent, int index) { - this.parent = parent; - this.index = index; - } - - @Override - public void addBinary(Binary value) { - parent.getCurrentTuple().setString(index, value.toStringUsingUTF8()); - } - - @Override - public void addBoolean(boolean value) { - parent.getCurrentTuple().setBoolean(index, value); - } - - @Override - public void addDouble(double value) { - parent.getCurrentTuple().setDouble(index, value); - } - - @Override - public void addFloat(float value) { - parent.getCurrentTuple().setFloat(index, value); - } - - @Override - public void addInt(int value) { - parent.getCurrentTuple().setInteger(index, value); - } - - @Override - public void addLong(long value) { - parent.getCurrentTuple().setLong(index, value); - } - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java deleted file mode 100644 index 275e17b..0000000 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading.convert; - -import cascading.tuple.Tuple; -import cascading.tuple.Fields; - -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.GroupType; - -public class TupleRecordMaterializer extends RecordMaterializer<Tuple> { - - private TupleConverter root; - - public TupleRecordMaterializer(GroupType parquetSchema) { - this.root = new TupleConverter(parquetSchema); - } - - @Override - public Tuple getCurrentRecord() { - return root.getCurrentTuple(); - } - - @Override - public GroupConverter getRootConverter() { - return root; - } - -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java index 841314c..e0f33e1 100644 --- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java +++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java @@ -58,8 +58,9 @@ import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.Map; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class TestParquetTBaseScheme { - final String txtInputPath = "src/test/resources/names.txt"; + final String txtInputPath = "target/test-classes/names.txt"; final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in"; final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out"; final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out"; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java deleted file mode 100644 index de350dd..0000000 --- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.cascading; - -import cascading.flow.Flow; -import cascading.flow.FlowProcess; -import cascading.flow.hadoop.HadoopFlowConnector; -import cascading.operation.BaseOperation; -import cascading.operation.Function; -import cascading.operation.FunctionCall; -import cascading.pipe.Each; -import cascading.pipe.Pipe; -import cascading.scheme.Scheme; -import cascading.scheme.hadoop.TextLine; -import cascading.tap.Tap; -import cascading.tap.hadoop.Hfs; -import cascading.tuple.Fields; -import cascading.tuple.Tuple; -import cascading.tuple.TupleEntry; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TIOStreamTransport; -import org.junit.Test; -import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; -import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.thrift.test.Name; - -import java.io.ByteArrayOutputStream; -import java.io.File; - -import static org.junit.Assert.assertEquals; - -public class TestParquetTupleScheme { - final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in"; - final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out"; - - @Test - public void testReadPattern() throws Exception { - String sourceFolder = parquetInputPath; - testReadWrite(sourceFolder); - - String sourceGlobPattern = parquetInputPath + "/*"; - testReadWrite(sourceGlobPattern); - - String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*"; - testReadWrite(multiLevelGlobPattern); - } - - @Test - public void testFieldProjection() throws Exception { - createFileForRead(); - - Path path = new Path(txtOutputPath); - final FileSystem fs = path.getFileSystem(new Configuration()); - if (fs.exists(path)) fs.delete(path, true); - - Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name")); - Tap source = new Hfs(sourceScheme, parquetInputPath); - - Scheme sinkScheme = new TextLine(new Fields("last_name")); - Tap sink = new Hfs(sinkScheme, txtOutputPath); - - Pipe assembly = new Pipe("namecp"); - assembly = new Each(assembly, new ProjectedTupleFunction()); - Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); - - flow.complete(); - String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); - assertEquals("Practice\nHope\nHorse\n", result); - } - - public void testReadWrite(String inputPath) throws Exception { - createFileForRead(); - - Path path = new Path(txtOutputPath); - final FileSystem fs = path.getFileSystem(new Configuration()); - if (fs.exists(path)) fs.delete(path, true); - - Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name")); - Tap source = new Hfs(sourceScheme, inputPath); - - Scheme sinkScheme = new TextLine(new Fields("first", "last")); - Tap sink = new Hfs(sinkScheme, txtOutputPath); - - Pipe assembly = new Pipe("namecp"); - assembly = new Each(assembly, new UnpackTupleFunction()); - Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); - - flow.complete(); - String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); - assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result); - } - - private void createFileForRead() throws Exception { - final Path fileToCreate = new Path(parquetInputPath + "/names.parquet"); - - final Configuration conf = new Configuration(); - final FileSystem fs = fileToCreate.getFileSystem(conf); - if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true); - - TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); - TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); - ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); - - Name n1 = new Name(); - n1.setFirst_name("Alice"); - n1.setLast_name("Practice"); - Name n2 = new Name(); - n2.setFirst_name("Bob"); - n2.setLast_name("Hope"); - Name n3 = new Name(); - n3.setFirst_name("Charlie"); - n3.setLast_name("Horse"); - - n1.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - baos.reset(); - n2.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - baos.reset(); - n3.write(protocol); - w.write(new BytesWritable(baos.toByteArray())); - w.close(); - } - - private static class UnpackTupleFunction extends BaseOperation implements Function { - @Override - public void operate(FlowProcess flowProcess, FunctionCall functionCall) { - TupleEntry arguments = functionCall.getArguments(); - Tuple result = new Tuple(); - - Tuple name = new Tuple(); - name.addString(arguments.getString(0)); - name.addString(arguments.getString(1)); - - result.add(name); - functionCall.getOutputCollector().add(result); - } - } - - private static class ProjectedTupleFunction extends BaseOperation implements Function { - @Override - public void operate(FlowProcess flowProcess, FunctionCall functionCall) { - TupleEntry arguments = functionCall.getArguments(); - Tuple result = new Tuple(); - - Tuple name = new Tuple(); - name.addString(arguments.getString(0)); -// name.addString(arguments.getString(1)); - - result.add(name); - functionCall.getOutputCollector().add(result); - } - } -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/resources/names.txt ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/test/resources/names.txt b/parquet-cascading/src/test/resources/names.txt deleted file mode 100644 index e2d0408..0000000 --- a/parquet-cascading/src/test/resources/names.txt +++ /dev/null @@ -1,3 +0,0 @@ -Alice Practive -Bob Hope -Charlie Horse http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/thrift/test.thrift ---------------------------------------------------------------------- diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading/src/test/thrift/test.thrift deleted file mode 100644 index c58843d..0000000 --- a/parquet-cascading/src/test/thrift/test.thrift +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace java org.apache.parquet.thrift.test - -struct Name { - 1: required string first_name, - 2: optional string last_name -} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/REVIEWERS.md ---------------------------------------------------------------------- diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md new file mode 100644 index 0000000..f797235 --- /dev/null +++ b/parquet-cascading3/REVIEWERS.md @@ -0,0 +1,27 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project: + +| Name | Apache Id | github id | +|--------------------|------------|-------------| +| Dmitriy Ryaboy | dvryaboy | dvryaboy | +| Tianshuo Deng | tianshuo | tsdeng | + + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml new file mode 100644 index 0000000..ea552ad --- /dev/null +++ b/parquet-cascading3/pom.xml @@ -0,0 +1,178 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet</artifactId> + <relativePath>../pom.xml</relativePath> + <version>1.8.2-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>parquet-cascading3</artifactId> + <packaging>jar</packaging> + + <name>Apache Parquet Cascading (for Cascading 3.0 onwards)</name> + <url>https://parquet.apache.org</url> + + <repositories> + <repository> + <id>conjars.org</id> + <url>http://conjars.org/repo</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-thrift</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>cascading</groupId> + <artifactId>cascading-hadoop</artifactId> <!-- building against cascading-hadoop for Hadoop1, but will use against any backend --> + <version>${cascading3.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- TEMPORARY UNTIL AFTER previous.version >= 1.8.2 + + (enforcer checks against the API in 1.7.0, this module did not exist back then, therefore it can't succeed) + --> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <phase>none</phase> + </execution> + </executions> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <!-- /TEMPORARY --> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>../parquet-cascading-common23/src/main/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>../parquet-cascading-common23/src/test/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-test-resource</id> + <phase>generate-test-resources</phase> + <goals> + <goal>add-test-resource</goal> + </goals> + <configuration> + <resources> + <resource> + <directory>../parquet-cascading-common23/src/test/resources</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.thrift.tools</groupId> + <artifactId>maven-thrift-plugin</artifactId> + <version>0.1.10</version> + <configuration> + <thriftExecutable>${thrift.executable}</thriftExecutable> + <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot> + <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot> + </configuration> + <executions> + <execution> + <id>thrift-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java new file mode 100644 index 0000000..af04b47 --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.thrift.TBase; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; +import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; +import org.apache.parquet.thrift.TBaseRecordConverter; + +public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> { + + // In the case of reads, we can read the thrift class from the file metadata + public ParquetTBaseScheme() { + this(new Config<T>()); + } + + public ParquetTBaseScheme(Class<T> thriftClass) { + this(new Config<T>().withRecordClass(thriftClass)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate) { + this(new Config<T>().withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) { + this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(Config<T> config) { + super(config); + } + + @Override + public void sourceConfInit(FlowProcess<? extends JobConf> fp, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) { + super.sourceConfInit(fp, tap, jobConf); + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + } + + @Override + public void sinkConfInit(FlowProcess<? extends JobConf> fp, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) { + + if (this.config.getKlass() == null) { + throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); + } + + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass()); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java ---------------------------------------------------------------------- diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java new file mode 100644 index 0000000..4532d3b --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.parquet.cascading; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.CompositeTap; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that converts Parquet groups into Cascading tuples. + * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. + * The names must match the names in the Parquet schema. + * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the + * Parquet schema. + * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be + * flattened to a top-level field in the Cascading tuple. + * + * @author Avi Bryant + */ + +public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{ + + private static final long serialVersionUID = 0L; + private String parquetSchema; + private final FilterPredicate filterPredicate; + + public ParquetTupleScheme() { + super(); + this.filterPredicate = null; + } + + public ParquetTupleScheme(Fields sourceFields) { + super(sourceFields); + this.filterPredicate = null; + } + + public ParquetTupleScheme(FilterPredicate filterPredicate) { + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { + super(sourceFields); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + this.filterPredicate = null; + } + + @SuppressWarnings("rawtypes") + @Override + public void sourceConfInit(FlowProcess<? extends JobConf> fp, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) { + + if (filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); + } + + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + } + + @Override + public Fields retrieveSourceFields(FlowProcess<? extends JobConf> flowProcess, Tap tap) { + MessageType schema = readSchema(flowProcess, tap); + SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); + + setSourceFields(intersection.getSourceFields()); + + return getSourceFields(); + } + + private MessageType readSchema(FlowProcess<? extends JobConf> flowProcess, Tap tap) { + try { + Hfs hfs; + + if( tap instanceof CompositeTap ) + hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); + else + hfs = (Hfs) tap; + + List<Footer> footers = getFooters(flowProcess, hfs); + + if(footers.isEmpty()) { + throw new TapException("Could not read Parquet metadata at " + hfs.getPath()); + } else { + return footers.get(0).getParquetMetadata().getFileMetaData().getSchema(); + } + } catch (IOException e) { + throw new TapException(e); + } + } + + private List<Footer> getFooters(FlowProcess<? extends JobConf> flowProcess, Hfs hfs) throws IOException { + JobConf jobConf = flowProcess.getConfigCopy(); + DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat(); + format.addInputPath(jobConf, hfs.getPath()); + return format.getFooters(jobConf); + } + + @SuppressWarnings("unchecked") + @Override + public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc) + throws IOException { + Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue(); + boolean hasNext = sc.getInput().next(null, value); + if (!hasNext) { return false; } + + // Skip nulls + if (value == null) { return true; } + + sc.getIncomingEntry().setTuple(value.get()); + return true; + } + + + @SuppressWarnings("rawtypes") + @Override + public void sinkConfInit(FlowProcess<? extends JobConf> fp, + Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) { + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema); + ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class); + } + + @Override + public boolean isSink() { + return parquetSchema != null; + } + + @Override + public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sink) + throws IOException { + TupleEntry tuple = sink.getOutgoingEntry(); + OutputCollector outputCollector = sink.getOutput(); + outputCollector.collect(null, tuple); + } +}
