This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb6609  [BEAM-7728] [SQL] Support ParquetTable (#9054)
5fb6609 is described below

commit 5fb6609950b7ee4167eb994c7fc7ece05c276560
Author: Kai Jiang <[email protected]>
AuthorDate: Tue Jul 30 16:40:13 2019 -0700

    [BEAM-7728] [SQL] Support ParquetTable (#9054)
    
    * parquet sql table
---
 sdks/java/extensions/sql/build.gradle              |   5 +-
 .../parquet/GenericRecordReadConverter.java        |  64 +++++++++++++++
 .../sql/meta/provider/parquet/ParquetTable.java    |  60 ++++++++++++++
 .../provider/parquet/ParquetTableProvider.java     |  52 ++++++++++++
 .../sql/meta/provider/parquet/package-info.java    |  20 +++++
 .../provider/parquet/GenericRecordToRowTest.java   |  79 ++++++++++++++++++
 .../provider/parquet/ParquetTableReadTest.java     |  90 +++++++++++++++++++++
 .../sql/src/test/resources/users.parquet           | Bin 0 -> 615 bytes
 8 files changed, 369 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/sql/build.gradle 
b/sdks/java/extensions/sql/build.gradle
index 0bbf0aa..b4a7079 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -96,6 +96,7 @@ dependencies {
   shadow project(path: ":runners:direct-java", configuration: "shadow")
   provided project(":sdks:java:io:kafka")
   provided project(":sdks:java:io:google-cloud-platform")
+  provided project(":sdks:java:io:parquet")
   provided library.java.kafka_clients
   shadowTest library.java.junit
   shadowTest library.java.hamcrest_core
@@ -106,6 +107,7 @@ dependencies {
 
   // Dependencies that we don't directly reference
   permitUnusedDeclared "com.jayway.jsonpath:json-path:2.4.0"
+  permitUnusedDeclared "net.jcip:jcip-annotations:1.0"
   permitUnusedDeclared library.java.jackson_dataformat_yaml
 
   // Dependencies that are bundled in when we bundle Calcite
@@ -115,7 +117,8 @@ dependencies {
   // Dependencies where one or the other appears "used" depending on classpath,
   // but it doesn't matter which is used
   permitUsedUndeclared "com.google.code.findbugs:jsr305:3.0.2"
-  permitUnusedDeclared "net.jcip:jcip-annotations:1.0"
+  permitUsedUndeclared "org.apache.avro:avro:1.8.2"
+
 }
 
 // Copy Caclcite templates and our own template into the build directory
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
new file mode 100644
index 0000000..d5ba45b
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/** A {@link PTransform} to convert {@link GenericRecord} to {@link Row}. */
+@AutoValue
+public abstract class GenericRecordReadConverter
+    extends PTransform<PCollection<GenericRecord>, PCollection<Row>> 
implements Serializable {
+
+  public abstract Schema beamSchema();
+
+  public static Builder builder() {
+    return new AutoValue_GenericRecordReadConverter.Builder();
+  }
+
+  @Override
+  public PCollection<Row> expand(PCollection<GenericRecord> input) {
+    return input
+        .apply(
+            "GenericRecordsToRows",
+            ParDo.of(
+                new DoFn<GenericRecord, Row>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Row row = AvroUtils.toBeamRowStrict(c.element(), 
beamSchema());
+                    c.output(row);
+                  }
+                }))
+        .setRowSchema(beamSchema());
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder beamSchema(Schema beamSchema);
+
+    public abstract GenericRecordReadConverter build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
new file mode 100644
index 0000000..bb27e2b
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/** {@link ParquetTable} is a {@link 
org.apache.beam.sdk.extensions.sql.BeamSqlTable}. */
+public class ParquetTable extends BaseBeamTable implements Serializable {
+  private final String filePattern;
+
+  public ParquetTable(Schema beamSchema, String filePattern) {
+    super(beamSchema);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
+        GenericRecordReadConverter.builder().beamSchema(schema).build();
+
+    return begin
+        .apply("ParquetIORead", 
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
+        .apply("GenericRecordToRow", readConverter);
+  }
+
+  @Override
+  public PDone buildIOWriter(PCollection<Row> input) {
+    throw new UnsupportedOperationException("Writing to a Parquet file is not 
supported");
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
new file mode 100644
index 0000000..6ef1545
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * {@link TableProvider} for {@link ParquetTable}.
+ *
+ * <p>A sample of parquet table is:
+ *
+ * <pre>{@code
+ * CREATE TABLE ORDERS(
+ *   name VARCHAR,
+ *   favorite_color VARCHAR,
+ *   favorite_numbers ARRAY<INTEGER>
+ * )
+ * TYPE 'parquet'
+ * LOCATION '/home/admin/users.parquet'
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class ParquetTableProvider extends InMemoryMetaTableProvider {
+  @Override
+  public String getTableType() {
+    return "parquet";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new ParquetTable(table.getSchema(), table.getLocation());
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
new file mode 100644
index 0000000..2f8c2f9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for ParquetIO. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
new file mode 100644
index 0000000..b24e745
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Unit tests for {@link GenericRecordReadConverter}. */
+public class GenericRecordToRowTest implements Serializable {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  org.apache.beam.sdk.schemas.Schema payloadSchema =
+      org.apache.beam.sdk.schemas.Schema.builder()
+          .addField("name", 
org.apache.beam.sdk.schemas.Schema.FieldType.STRING)
+          .addField("favorite_number", 
org.apache.beam.sdk.schemas.Schema.FieldType.INT32)
+          .addField("favorite_color", 
org.apache.beam.sdk.schemas.Schema.FieldType.STRING)
+          .addField("price", 
org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE)
+          .build();
+
+  @Test
+  public void testConvertsGenericRecordToRow() {
+    String schemaString =
+        "{\"namespace\": \"example.avro\",\n"
+            + " \"type\": \"record\",\n"
+            + " \"name\": \"User\",\n"
+            + " \"fields\": [\n"
+            + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"favorite_number\", \"type\": \"int\"},\n"
+            + "     {\"name\": \"favorite_color\", \"type\": \"string\"},\n"
+            + "     {\"name\": \"price\", \"type\": \"double\"}\n"
+            + " ]\n"
+            + "}";
+    Schema schema = (new Schema.Parser()).parse(schemaString);
+
+    GenericRecord before = new GenericData.Record(schema);
+    before.put("name", "Bob");
+    before.put("favorite_number", 256);
+    before.put("favorite_color", "red");
+    before.put("price", 2.4);
+
+    AvroCoder<GenericRecord> coder = AvroCoder.of(schema);
+
+    PCollection<Row> rows =
+        pipeline
+            .apply("create PCollection<GenericRecord>", 
Create.of(before).withCoder(coder))
+            .apply(
+                "convert", 
GenericRecordReadConverter.builder().beamSchema(payloadSchema).build());
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(payloadSchema).addValues("Bob", 256, "red", 
2.4).build());
+    pipeline.run();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
new file mode 100644
index 0000000..efa70a1
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test for ParquetTable. */
+@RunWith(JUnit4.class)
+public class ParquetTableReadTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetTableReadTest.class);
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String SQL_PARQUET_FIELD =
+      "(name VARCHAR, favorite_color VARCHAR, favorite_numbers 
ARRAY<INTEGER>)";
+
+  private static final Schema PARQUET_SCHEMA =
+      Schema.builder()
+          .addField("name", Schema.FieldType.STRING)
+          .addNullableField("favorite_color", Schema.FieldType.STRING)
+          .addArrayField("favorite_numbers", Schema.FieldType.INT32)
+          .build();
+
+  private String extractParquetFile(String fileName) throws IOException {
+    InputStream inputStream = getClass().getResourceAsStream("/" + fileName);
+    File root = temporaryFolder.getRoot();
+    Path tempFilePath = new File(root, fileName).toPath();
+    Files.copy(inputStream, tempFilePath);
+    return tempFilePath.toString();
+  }
+
+  @Test
+  public void testReadParquet() throws IOException {
+    String parquetPath = extractParquetFile("users.parquet");
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE users %s TYPE parquet LOCATION '%s'",
+            SQL_PARQUET_FIELD, parquetPath));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(
+            pipeline, env.parseQuery("SELECT name, favorite_color, 
favorite_numbers FROM users"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(PARQUET_SCHEMA)
+                .addValues("Alyssa", null, Arrays.asList(3, 9, 15, 20))
+                .build(),
+            Row.withSchema(PARQUET_SCHEMA).addValues("Ben", "red", 
Arrays.asList()).build());
+
+    pipeline.run();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/resources/users.parquet 
b/sdks/java/extensions/sql/src/test/resources/users.parquet
new file mode 100644
index 0000000..aa52733
Binary files /dev/null and 
b/sdks/java/extensions/sql/src/test/resources/users.parquet differ

Reply via email to