Repository: reef Updated Branches: refs/heads/master 9349e99b5 -> 267140543
[REEF-1765] Add a Parquet-On-HDFS to Avro-on-Local-Disk tool This adds a new tool in the new module reef-expermimental to download a parquet file from HDFS and store it locally in Avro format. This file is then meant to be picked up by a REEF.NET Task. JIRA: [REEF-1765](https://issues.apache.org/jira/browse/REEF-1765) Pull Request: This closes #1283 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/26714054 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/26714054 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/26714054 Branch: refs/heads/master Commit: 267140543782487269f9f5fdb54317f536bd7a91 Parents: 9349e99 Author: Shouheng Yi <[email protected]> Authored: Tue Apr 4 09:49:34 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Apr 11 10:23:41 2017 -0700 ---------------------------------------------------------------------- lang/java/reef-experimental/pom.xml | 98 +++++++++++ .../apache/reef/experimental/package-info.java | 22 +++ .../experimental/parquet/ParquetReader.java | 173 +++++++++++++++++++ .../reef/experimental/parquet/PathString.java | 25 +++ .../reef/experimental/parquet/package-info.java | 22 +++ .../apache/reef/experimental/package-info.java | 22 +++ .../experimental/parquet/ParquetReaderTest.java | 81 +++++++++ .../reef/experimental/parquet/package-info.java | 22 +++ .../src/test/resources/file.parquet | Bin 0 -> 905 bytes pom.xml | 2 + 10 files changed, 467 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/pom.xml b/lang/java/reef-experimental/pom.xml new file mode 100644 index 0000000..9fef1c3 --- /dev/null +++ b/lang/java/reef-experimental/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.reef</groupId> + <artifactId>reef-project</artifactId> + <version>0.16.0-SNAPSHOT</version> + <relativePath>../../..</relativePath> + </parent> + + <properties> + <rootPath>${basedir}/../../..</rootPath> + </properties> + + <artifactId>reef-experimental</artifactId> + <name>REEF Experimental</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <!-- REEF --> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-standalone</artifactId> + <version>${project.version}</version> + </dependency> + <!-- END OF REEF --> + <!-- HADOOP --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <!-- END OF HADOOP --> + <!-- PARQUET --> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> + <!-- END OF PARQUET --> + <!-- AVRO --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>${avro.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <!-- END OF AVRO --> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/package-info.java b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/package-info.java new file mode 100644 index 0000000..f555016 --- /dev/null +++ b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Experimental projects for REEF. + */ +package org.apache.reef.experimental; http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/ParquetReader.java b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/ParquetReader.java new file mode 100644 index 0000000..d1e042f --- /dev/null +++ b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/ParquetReader.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.experimental.parquet; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; + +/** + * A reader for Parquet files that can serialize data to local disk and return Avro schema and Avro reader. + * The intent is not to build a general parquet reader, but to consume data with table-like property. + */ +public final class ParquetReader { + /** + * Standard java logger. + */ + private static final Logger LOG = Logger.getLogger(ParquetReader.class.getName()); + + private Path parquetFilePath; + + @Inject + private ParquetReader(@Parameter(PathString.class) final String path) throws IOException { + parquetFilePath = new Path(new File(path).getAbsolutePath()); + final Schema schema = createAvroSchema(); + if (schema.getType() != Schema.Type.RECORD) { + LOG.log(Level.SEVERE, "ParquetReader only support Avro record type that can be consumed as a table."); + throw new IOException("ParquetReader only support Avro record type that can be consumed as a table."); + } + for (final Schema.Field f : schema.getFields()) { + if (f.schema().getType() == Schema.Type.RECORD) { + LOG.log(Level.SEVERE, "ParquetReader doesn't support nested record type for its elements."); + throw new IOException("ParquetReader doesn't support nested record type for its elements."); + } + } + } + + /** + * Retrieve avro schema from parquet file. + * @return avro schema from parquet file. + * @throws IOException if the Avro schema couldn't be parsed from the parquet file. + */ + public Schema createAvroSchema() throws IOException { + return createAvroSchema(new Configuration(true), NO_FILTER); + } + + /** + * Retrieve avro schema from parquet file. + * @param configuration Hadoop configuration. + * @param filter Filter for Avro metadata. + * @return avro schema from parquet file. + * @throws IOException if the Avro schema couldn't be parsed from the parquet file. + */ + private Schema createAvroSchema(final Configuration configuration, final MetadataFilter filter) throws IOException { + final ParquetMetadata footer = ParquetFileReader.readFooter(configuration, parquetFilePath, filter); + final AvroSchemaConverter converter = new AvroSchemaConverter(); + final MessageType schema = footer.getFileMetaData().getSchema(); + return converter.convert(schema); + } + + /** + * Construct an avro reader from parquet file. + * @return avro reader based on the provided parquet file. + * @throws IOException if the parquet file couldn't be parsed correctly. + */ + private AvroParquetReader<GenericRecord> createAvroReader() throws IOException { + return new AvroParquetReader<GenericRecord>(parquetFilePath); + } + + /** + * Serialize Avro data to a local file. + * @param file Local destination file for serialization. + * @throws IOException if the parquet file couldn't be parsed correctly. + */ + public void serializeToDisk(final File file) throws IOException { + final DatumWriter datumWriter = new GenericDatumWriter<GenericRecord>(); + final DataFileWriter fileWriter = new DataFileWriter<GenericRecord>(datumWriter); + final AvroParquetReader<GenericRecord> reader = createAvroReader(); + fileWriter.create(createAvroSchema(), file); + + GenericRecord record = reader.read(); + while (record != null) { + fileWriter.append(record); + record = reader.read(); + } + + try { + reader.close(); + } catch (IOException ex){ + LOG.log(Level.SEVERE, ex.getMessage()); + throw ex; + } + + try { + fileWriter.close(); + } catch (IOException ex){ + LOG.log(Level.SEVERE, ex.getMessage()); + throw ex; + } + } + + /** + * Serialize Avro data to a in-memory ByteBuffer. + * @return A ByteBuffer that contains avro data. + * @throws IOException if the parquet file couldn't be parsed correctly. + */ + public ByteBuffer serializeToByteBuffer() throws IOException { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null); + final DatumWriter writer = new GenericDatumWriter<GenericRecord>(); + writer.setSchema(createAvroSchema()); + final AvroParquetReader<GenericRecord> reader = createAvroReader(); + + GenericRecord record = reader.read(); + while (record != null) { + writer.write(record, encoder); + record = reader.read(); + } + + try { + reader.close(); + } catch (IOException ex){ + LOG.log(Level.SEVERE, ex.getMessage()); + throw ex; + } + + encoder.flush(); + final ByteBuffer buf = ByteBuffer.wrap(stream.toByteArray()); + buf.order(ByteOrder.LITTLE_ENDIAN); + return buf; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/PathString.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/PathString.java b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/PathString.java new file mode 100644 index 0000000..40cdc12 --- /dev/null +++ b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/PathString.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.experimental.parquet; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +@NamedParameter(default_value="", doc="Path to parquet file", short_name="path") +class PathString implements Name<String> { } http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/package-info.java b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/package-info.java new file mode 100644 index 0000000..342e1a8 --- /dev/null +++ b/lang/java/reef-experimental/src/main/java/org/apache/reef/experimental/parquet/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Experimental projects for REEF. + */ +package org.apache.reef.experimental.parquet; http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/package-info.java b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/package-info.java new file mode 100644 index 0000000..f555016 --- /dev/null +++ b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Experimental projects for REEF. + */ +package org.apache.reef.experimental; http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/ParquetReaderTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/ParquetReaderTest.java b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/ParquetReaderTest.java new file mode 100644 index 0000000..02f4721 --- /dev/null +++ b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/ParquetReaderTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.experimental.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.hadoop.io.AvroKeyDeserializer; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.reef.tang.*; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; + +public class ParquetReaderTest { + + private final File file = new File(getClass().getClassLoader().getResource("file.parquet").getFile()); + + @Test + public void testSchema() throws IOException, InjectionException { + final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder(); + builder.bindNamedParameter(PathString.class, file.getAbsolutePath()); + final Configuration conf = builder.build(); + final Injector injector = Tang.Factory.getTang().newInjector(conf); + + final ParquetReader reader = injector.getInstance(ParquetReader.class); + final Schema schema = reader.createAvroSchema(); + + Assert.assertEquals("User", schema.getName()); + Assert.assertEquals(Schema.Type.RECORD, schema.getType()); + } + + @Test + public void testDataEntries() throws IOException, InjectionException { + final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder(); + builder.bindNamedParameter(PathString.class, file.getAbsolutePath()); + final Configuration conf = builder.build(); + final Injector injector = Tang.Factory.getTang().newInjector(conf); + + final ParquetReader reader = injector.getInstance(ParquetReader.class); + + final byte[] byteArr = reader.serializeToByteBuffer().array(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArr); + final DatumReader datumReader = new GenericDatumReader<GenericRecord>(); + datumReader.setSchema(reader.createAvroSchema()); + + final AvroKeyDeserializer deserializer + = new AvroKeyDeserializer<GenericRecord>(reader.createAvroSchema(), reader.createAvroSchema(), datumReader); + deserializer.open(inputStream); + + AvroWrapper<GenericRecord> record = null; + + for (int i = 0; i < 10; i = i + 1) { + record = deserializer.deserialize(record); + Assert.assertEquals("User_" + i, record.datum().get("name").toString()); + Assert.assertEquals(i, record.datum().get("age")); + Assert.assertEquals("blue", record.datum().get("favorite_color").toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/package-info.java b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/package-info.java new file mode 100644 index 0000000..342e1a8 --- /dev/null +++ b/lang/java/reef-experimental/src/test/java/org/apache/reef/experimental/parquet/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Experimental projects for REEF. + */ +package org.apache.reef.experimental.parquet; http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/lang/java/reef-experimental/src/test/resources/file.parquet ---------------------------------------------------------------------- diff --git a/lang/java/reef-experimental/src/test/resources/file.parquet b/lang/java/reef-experimental/src/test/resources/file.parquet new file mode 100644 index 0000000..c70ee4c Binary files /dev/null and b/lang/java/reef-experimental/src/test/resources/file.parquet differ http://git-wip-us.apache.org/repos/asf/reef/blob/26714054/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ed39250..6e41a3e 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ under the License. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.6.0</hadoop.version> <avro.version>1.8.1</avro.version> + <parquet.version>1.9.0</parquet.version> <jetty.version>6.1.26</jetty.version> <jackson.version>1.9.13</jackson.version> <protobuf.version>2.5.0</protobuf.version> @@ -735,6 +736,7 @@ under the License. <module>lang/java/reef-examples</module> <module>lang/java/reef-examples-clr</module> <module>lang/java/reef-examples-hdinsight</module> + <module>lang/java/reef-experimental</module> <module>lang/java/reef-io</module> <module>lang/java/reef-poison</module> <module>lang/java/reef-runtime-hdinsight</module>
