This is an automated email from the ASF dual-hosted git repository. shangxinli pushed a commit to branch travis_dev in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
commit 5bb484690349a84e6210ccc3438f535bfe026cf8 Author: Xinli shang <[email protected]> AuthorDate: Fri Oct 26 14:33:25 2018 -0700 initial commit --- README.md | 2 - dependency-reduced-pom.xml | 168 ++++++++++++ pom.xml | 305 +++++++++++++++++++++ products.json | 3 + src/main/java/com/uber/ParquetHelloWorld.java | 40 +++ .../parquet/CryptoParquetWriteSupport.scala | 20 ++ .../parquet/ParquetMetadataSchemaConverter.scala | 61 +++++ .../execution/datasources/parquet/SchemaUtil.scala | 12 + src/main/resources/log4j.properties | 9 + 9 files changed, 618 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 76e6df5..e69de29 100644 --- a/README.md +++ b/README.md @@ -1,2 +0,0 @@ -# parquet-writesupport-extensions -This repo includes extension of Parquet WriteSupport Extensions. For example, extension for org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..984bd21 --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,168 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.uber.parquetwritesupports</groupId> + <artifactId>ParquetWriteSupports</artifactId> + <version>2.0.0.1</version> + <build> + <resources> + <resource> + <directory>src/main/java</directory> + <excludes> + <exclude>**/*.java</exclude> + </excludes> + </resource> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>${project.basedir}/src/test/java</directory> + <excludes> + <exclude>**/*.java</exclude> + </excludes> + </testResource> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-target:jvm-${java.version}</arg> + </args> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>buildAntlr</id> + <build> + <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr4-maven-plugin</artifactId> + <version>4.5.3</version> + <executions> + <execution> + <goals> + <goal>antlr4</goal> + </goals> + </execution> + </executions> + <configuration> + <visitor>true</visitor> + <sourceDirectory>src/main/antlr4</sourceDirectory> + </configuration> + </plugin> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <tasks> + <move> + <fileset /> + </move> + </tasks> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <repositories> + <repository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </repository> + <repository> + <releases /> + <snapshots> + <enabled>false</enabled> + </snapshots> + <id>central</id> + <name>Maven Repository</name> + <url>https://repo1.maven.org/maven2</url> + </repository> + </repositories> + <pluginRepositories> + <pluginRepository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </pluginRepository> + </pluginRepositories> + <properties> + <java.version>1.8</java.version> + <scala.version>2.11.8</scala.version> + <encoding>UTF-8</encoding> + <scala.libversion>2.11</scala.libversion> + <spark.version>2.1.3</spark.version> + </properties> +</project> + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..19422f4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,305 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.uber.parquetwritesupports</groupId> + <artifactId>ParquetWriteSupports</artifactId> + <version>2.0.0.1</version> + <properties> + <scala.libversion>2.11</scala.libversion> + <scala.version>2.11.8</scala.version> + <spark.version>2.1.3</spark.version> + <java.version>1.8</java.version> + <encoding>UTF-8</encoding> + </properties> + + <repositories> + <repository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </repository> + <repository> + <id>central</id> + <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution --> + <name>Maven Repository</name> + <url>https://repo1.maven.org/maven2</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <pluginRepositories> + <pluginRepository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </pluginRepository> + </pluginRepositories> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.libversion}</artifactId> + <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils-core</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.libversion}</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-format</artifactId> + <version>2.5.1-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>1.10.0</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>1.10.0</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-jackson</artifactId> + <version>1.10.1-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-common</artifactId> + <version>1.10.0</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-encoding</artifactId> + <version>1.10.0</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-tools</artifactId> + <version>1.10.1-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.libversion}</artifactId> + <version>${spark.version}</version> + </dependency> + + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + <version>2.2.0</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils-core</artifactId> + </exclusion> + <exclusion> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/main/java</directory> + <excludes> + <exclude>**/*.java</exclude> + </excludes> + </resource> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + + <testResources> + <testResource> + <directory>${project.basedir}/src/test/java</directory> + <excludes> + <exclude>**/*.java</exclude> + </excludes> + </testResource> + <testResource> + <directory>${project.basedir}/src/test/resources</directory> + </testResource> + </testResources> + + + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.0</version> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-target:jvm-${java.version}</arg> + </args> + </configuration> + </plugin> + + <!-- Set a compiler level --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.3.2</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + + <!--Build assembly jar--> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <profiles> + <!-- Run mvn compile -DskipTests -PbuildAntlr to build antlr if any change is made --> + <profile> + <id>buildAntlr</id> + <build> + <plugins> + <plugin> + <!-- command to generate sources: mvn generate-sources --> + <!-- mvn antlr4:help -Ddetail=true --> + <groupId>org.antlr</groupId> + <artifactId>antlr4-maven-plugin</artifactId> + <version>4.5.3</version> + <executions> + <execution> + <goals> + <goal>antlr4</goal> + </goals> + </execution> + </executions> + <configuration> + <visitor>true</visitor> + <sourceDirectory>src/main/antlr4</sourceDirectory> + </configuration> + </plugin> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <tasks> + <move todir="src/main/java/com/uber/hadoopprof/antlr4/generated" overwrite="true"> + <fileset dir="${project.build.directory}/generated-sources/antlr4/" /> + </move> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/products.json b/products.json new file mode 100644 index 0000000..c02d0ba --- /dev/null +++ b/products.json @@ -0,0 +1,3 @@ +{"product":"table", "price":30} +{"product":"chair", "price":10} +{"product":"table", "price":13} diff --git a/src/main/java/com/uber/ParquetHelloWorld.java b/src/main/java/com/uber/ParquetHelloWorld.java new file mode 100644 index 0000000..3a07bff --- /dev/null +++ b/src/main/java/com/uber/ParquetHelloWorld.java @@ -0,0 +1,40 @@ +package com.uber; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.execution.datasources.parquet.SchemaUtil; + +/** + * This app depends on Parquet-1178 and Parquet-1396. + */ +public class ParquetHelloWorld { + + public static void main(String[] args) throws Exception { + SparkSession spark = SparkSession + .builder() + .appName("Java Spark SQL basic example") + .config("spark.master", "local") + .config("spark.sql.parquet.enableVectorizedReader", false) + .config("parquet.crypto.encryptor.decryptor.retriever.class", + "org.apache.parquet.crypto.SampleFileEncDecryptorRetriever") + .config("parquet.write.support.class", + org.apache.spark.sql.execution.datasources.parquet.CryptoParquetWriteSupport.class.getName()) + .getOrCreate(); + + testColumnEncReadWrite(spark); + } + + private static void testColumnEncReadWrite(SparkSession spark) { + String schemaString = "{\"type\":\"struct\",\"fields\":[{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"encrypted\": true,\"columnKeyMetaData\": \"AAA=\"}},{\"name\":\"product\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"encrypted\": false}}]}"; + StructType schema = org.apache.spark.sql.execution.datasources.parquet.SchemaUtil.parseString(schemaString); + JavaRDD<Row> rawData = spark.read().json("products.json").toJavaRDD(); + Dataset<Row> dataFrame = spark.createDataFrame(rawData, schema); + + dataFrame.write().mode("overwrite").parquet("file1"); + + spark.read().parquet("file1").show(); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala new file mode 100644 index 0000000..e3f2c1d --- /dev/null +++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/CryptoParquetWriteSupport.scala @@ -0,0 +1,20 @@ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +class CryptoParquetWriteSupport extends ParquetWriteSupport { + + override def init(configuration: Configuration): WriteContext = { + val converter = new ParquetMetadataSchemaConverter(configuration) + createContext(configuration, converter) + } + + override def writeFields( + row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + //Todo add data masking fields + super.writeFields(row, schema, fieldWriters) + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala new file mode 100644 index 0000000..10deafc --- /dev/null +++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetMetadataSchemaConverter.scala @@ -0,0 +1,61 @@ +package org.apache.spark.sql.execution.datasources.parquet + +import java.util +import java.util.Map + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.{ExtType, MessageType, Type, Types} +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} + +import scala.collection.JavaConversions._ + +/** + * This class pass field's metadata in StructType to the field of MessageType in addition to + * existing functions of ParquetSchemaConverter. + * + * It has dependency on the class ExtType defined in the link below. Parquet-1396 is opened to merge + * ExtType to Parquet-mr repo. https://github.com/shangxinli/parquet-mr/blob/encryption/parquet-column/ + * src/main/java/org/apache/parquet/schema/ExtType.java + * + */ +class ParquetMetadataSchemaConverter(conf: Configuration) extends ParquetSchemaConverter(conf) { + + /** + * Converts a Spark SQL [[StructField]] to a Parquet [[Type]]. + */ + override def convert(catalystSchema: StructType): MessageType = { + Types + .buildMessage() + .addFields(catalystSchema.map(convertFieldWithMetadata): _*) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) + } + + def convertFieldWithMetadata(field: StructField) : Type = { + val extField = new ExtType[Any](convertField(field)) + val metaBuilder = new ExtMetadataBuilder().withMetadata(field.metadata) + val metaData = metaBuilder.getMap + extField.setMetadata(metaData) + return extField + } + + private def getMetadata(schema : StructType , fieldName : String) : Map[String, Any] = { + schema.fields.foreach{ field => + if (field.name != null && field.name.equals(fieldName)) { + val metaBuilder = new ExtMetadataBuilder().withMetadata(field.metadata) + return metaBuilder.getMap + } + } + return new util.HashMap[String, Any]() + } +} + +/** + * Due to the access modifier of getMap() in Spark, ExtMetadataBuilder is created to let getMap can be + * accessed in above class. + */ +class ExtMetadataBuilder extends MetadataBuilder { + + override def getMap = { + super.getMap + } +} diff --git a/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala new file mode 100644 index 0000000..7b40f47 --- /dev/null +++ b/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SchemaUtil.scala @@ -0,0 +1,12 @@ +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.types.StructType + +/** + * This class is created to let the method fromString() can be used outside org.apache.spark.sql.types namespace + */ +object SchemaUtil { + def parseString(s : String) : StructType = { + return StructType.fromString(s) + } +} \ No newline at end of file diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..e4fcb01 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,9 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file
