This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb6609 [BEAM-7728] [SQL] Support ParquetTable (#9054)
5fb6609 is described below
commit 5fb6609950b7ee4167eb994c7fc7ece05c276560
Author: Kai Jiang <[email protected]>
AuthorDate: Tue Jul 30 16:40:13 2019 -0700
[BEAM-7728] [SQL] Support ParquetTable (#9054)
* parquet sql table
---
sdks/java/extensions/sql/build.gradle | 5 +-
.../parquet/GenericRecordReadConverter.java | 64 +++++++++++++++
.../sql/meta/provider/parquet/ParquetTable.java | 60 ++++++++++++++
.../provider/parquet/ParquetTableProvider.java | 52 ++++++++++++
.../sql/meta/provider/parquet/package-info.java | 20 +++++
.../provider/parquet/GenericRecordToRowTest.java | 79 ++++++++++++++++++
.../provider/parquet/ParquetTableReadTest.java | 90 +++++++++++++++++++++
.../sql/src/test/resources/users.parquet | Bin 0 -> 615 bytes
8 files changed, 369 insertions(+), 1 deletion(-)
diff --git a/sdks/java/extensions/sql/build.gradle
b/sdks/java/extensions/sql/build.gradle
index 0bbf0aa..b4a7079 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -96,6 +96,7 @@ dependencies {
shadow project(path: ":runners:direct-java", configuration: "shadow")
provided project(":sdks:java:io:kafka")
provided project(":sdks:java:io:google-cloud-platform")
+ provided project(":sdks:java:io:parquet")
provided library.java.kafka_clients
shadowTest library.java.junit
shadowTest library.java.hamcrest_core
@@ -106,6 +107,7 @@ dependencies {
// Dependencies that we don't directly reference
permitUnusedDeclared "com.jayway.jsonpath:json-path:2.4.0"
+ permitUnusedDeclared "net.jcip:jcip-annotations:1.0"
permitUnusedDeclared library.java.jackson_dataformat_yaml
// Dependencies that are bundled in when we bundle Calcite
@@ -115,7 +117,8 @@ dependencies {
// Dependencies where one or the other appears "used" depending on classpath,
// but it doesn't matter which is used
permitUsedUndeclared "com.google.code.findbugs:jsr305:3.0.2"
- permitUnusedDeclared "net.jcip:jcip-annotations:1.0"
+ permitUsedUndeclared "org.apache.avro:avro:1.8.2"
+
}
// Copy Caclcite templates and our own template into the build directory
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
new file mode 100644
index 0000000..d5ba45b
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordReadConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/** A {@link PTransform} to convert {@link GenericRecord} to {@link Row}. */
+@AutoValue
+public abstract class GenericRecordReadConverter
+ extends PTransform<PCollection<GenericRecord>, PCollection<Row>>
implements Serializable {
+
+ public abstract Schema beamSchema();
+
+ public static Builder builder() {
+ return new AutoValue_GenericRecordReadConverter.Builder();
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<GenericRecord> input) {
+ return input
+ .apply(
+ "GenericRecordsToRows",
+ ParDo.of(
+ new DoFn<GenericRecord, Row>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Row row = AvroUtils.toBeamRowStrict(c.element(),
beamSchema());
+ c.output(row);
+ }
+ }))
+ .setRowSchema(beamSchema());
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ public abstract Builder beamSchema(Schema beamSchema);
+
+ public abstract GenericRecordReadConverter build();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
new file mode 100644
index 0000000..bb27e2b
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/** {@link ParquetTable} is a {@link
org.apache.beam.sdk.extensions.sql.BeamSqlTable}. */
+public class ParquetTable extends BaseBeamTable implements Serializable {
+ private final String filePattern;
+
+ public ParquetTable(Schema beamSchema, String filePattern) {
+ super(beamSchema);
+ this.filePattern = filePattern;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+ PTransform<PCollection<GenericRecord>, PCollection<Row>> readConverter =
+ GenericRecordReadConverter.builder().beamSchema(schema).build();
+
+ return begin
+ .apply("ParquetIORead",
ParquetIO.read(AvroUtils.toAvroSchema(schema)).from(filePattern))
+ .apply("GenericRecordToRow", readConverter);
+ }
+
+ @Override
+ public PDone buildIOWriter(PCollection<Row> input) {
+ throw new UnsupportedOperationException("Writing to a Parquet file is not
supported");
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
new file mode 100644
index 0000000..6ef1545
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+
+/**
+ * {@link TableProvider} for {@link ParquetTable}.
+ *
+ * <p>A sample of parquet table is:
+ *
+ * <pre>{@code
+ * CREATE TABLE ORDERS(
+ * name VARCHAR,
+ * favorite_color VARCHAR,
+ * favorite_numbers ARRAY<INTEGER>
+ * )
+ * TYPE 'parquet'
+ * LOCATION '/home/admin/users.parquet'
+ * }</pre>
+ */
+@AutoService(TableProvider.class)
+public class ParquetTableProvider extends InMemoryMetaTableProvider {
+ @Override
+ public String getTableType() {
+ return "parquet";
+ }
+
+ @Override
+ public BeamSqlTable buildBeamSqlTable(Table table) {
+ return new ParquetTable(table.getSchema(), table.getLocation());
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
new file mode 100644
index 0000000..2f8c2f9
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for ParquetIO. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
new file mode 100644
index 0000000..b24e745
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/GenericRecordToRowTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+
+/** Unit tests for {@link GenericRecordReadConverter}. */
+public class GenericRecordToRowTest implements Serializable {
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ org.apache.beam.sdk.schemas.Schema payloadSchema =
+ org.apache.beam.sdk.schemas.Schema.builder()
+ .addField("name",
org.apache.beam.sdk.schemas.Schema.FieldType.STRING)
+ .addField("favorite_number",
org.apache.beam.sdk.schemas.Schema.FieldType.INT32)
+ .addField("favorite_color",
org.apache.beam.sdk.schemas.Schema.FieldType.STRING)
+ .addField("price",
org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE)
+ .build();
+
+ @Test
+ public void testConvertsGenericRecordToRow() {
+ String schemaString =
+ "{\"namespace\": \"example.avro\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"User\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"},\n"
+ + " {\"name\": \"favorite_number\", \"type\": \"int\"},\n"
+ + " {\"name\": \"favorite_color\", \"type\": \"string\"},\n"
+ + " {\"name\": \"price\", \"type\": \"double\"}\n"
+ + " ]\n"
+ + "}";
+ Schema schema = (new Schema.Parser()).parse(schemaString);
+
+ GenericRecord before = new GenericData.Record(schema);
+ before.put("name", "Bob");
+ before.put("favorite_number", 256);
+ before.put("favorite_color", "red");
+ before.put("price", 2.4);
+
+ AvroCoder<GenericRecord> coder = AvroCoder.of(schema);
+
+ PCollection<Row> rows =
+ pipeline
+ .apply("create PCollection<GenericRecord>",
Create.of(before).withCoder(coder))
+ .apply(
+ "convert",
GenericRecordReadConverter.builder().beamSchema(payloadSchema).build());
+
+ PAssert.that(rows)
+ .containsInAnyOrder(
+ Row.withSchema(payloadSchema).addValues("Bob", 256, "red",
2.4).build());
+ pipeline.run();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
new file mode 100644
index 0000000..efa70a1
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableReadTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test for ParquetTable. */
+@RunWith(JUnit4.class)
+public class ParquetTableReadTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetTableReadTest.class);
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String SQL_PARQUET_FIELD =
+ "(name VARCHAR, favorite_color VARCHAR, favorite_numbers
ARRAY<INTEGER>)";
+
+ private static final Schema PARQUET_SCHEMA =
+ Schema.builder()
+ .addField("name", Schema.FieldType.STRING)
+ .addNullableField("favorite_color", Schema.FieldType.STRING)
+ .addArrayField("favorite_numbers", Schema.FieldType.INT32)
+ .build();
+
+ private String extractParquetFile(String fileName) throws IOException {
+ InputStream inputStream = getClass().getResourceAsStream("/" + fileName);
+ File root = temporaryFolder.getRoot();
+ Path tempFilePath = new File(root, fileName).toPath();
+ Files.copy(inputStream, tempFilePath);
+ return tempFilePath.toString();
+ }
+
+ @Test
+ public void testReadParquet() throws IOException {
+ String parquetPath = extractParquetFile("users.parquet");
+
+ BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider());
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE users %s TYPE parquet LOCATION '%s'",
+ SQL_PARQUET_FIELD, parquetPath));
+
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(
+ pipeline, env.parseQuery("SELECT name, favorite_color,
favorite_numbers FROM users"));
+
+ PAssert.that(rows)
+ .containsInAnyOrder(
+ Row.withSchema(PARQUET_SCHEMA)
+ .addValues("Alyssa", null, Arrays.asList(3, 9, 15, 20))
+ .build(),
+ Row.withSchema(PARQUET_SCHEMA).addValues("Ben", "red",
Arrays.asList()).build());
+
+ pipeline.run();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/resources/users.parquet
b/sdks/java/extensions/sql/src/test/resources/users.parquet
new file mode 100644
index 0000000..aa52733
Binary files /dev/null and
b/sdks/java/extensions/sql/src/test/resources/users.parquet differ