This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d79cd82 [BEAM-7929] Support column projection for Parquet Tables
new 2447679 Merge pull request #14117: [BEAM-7929] Support column
projection for Parquet Tables
d79cd82 is described below
commit d79cd82943c90dad518b705b7e81bcd2d2fc0f21
Author: Ismaël Mejía <[email protected]>
AuthorDate: Mon Mar 1 10:05:32 2021 +0100
[BEAM-7929] Support column projection for Parquet Tables
---
sdks/java/extensions/sql/build.gradle | 1 +
.../sql/meta/provider/parquet/ParquetTable.java | 132 +++++++++++++++++++++
.../provider/parquet/ParquetTableProvider.java | 22 ++--
.../provider/parquet/ParquetTableProviderTest.java | 35 +++++-
.../sdk/io/parquet/ParquetSchemaIOProvider.java | 127 --------------------
5 files changed, 172 insertions(+), 145 deletions(-)
diff --git a/sdks/java/extensions/sql/build.gradle
b/sdks/java/extensions/sql/build.gradle
index 6de73f2..6758e4b 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -79,6 +79,7 @@ dependencies {
provided project(":sdks:java:io:kafka")
provided project(":sdks:java:io:google-cloud-platform")
compile project(":sdks:java:io:mongodb")
+ compile library.java.avro
provided project(":sdks:java:io:parquet")
provided library.java.jackson_dataformat_xml
provided library.java.hadoop_client
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
new file mode 100644
index 0000000..b2282ff
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetTable.class);
+
+ private final Table table;
+
+ ParquetTable(Table table) {
+ super(table.getSchema());
+ this.table = table;
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(PBegin begin) {
+ final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+ Read read =
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+ return begin.apply("ParquetIORead", read).apply("ToRows",
Convert.toRows());
+ }
+
+ @Override
+ public PCollection<Row> buildIOReader(
+ PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+ final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+ Read read =
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+ if (!fieldNames.isEmpty()) {
+ Schema projectionSchema = projectSchema(schema, fieldNames);
+ LOG.info("Projecting fields schema : " + projectionSchema.toString());
+ read = read.withProjection(projectionSchema, projectionSchema);
+ }
+ return begin.apply("ParquetIORead", read).apply("ToRows",
Convert.toRows());
+ }
+
+ /** Returns a copy of the {@link Schema} with only the fieldNames fields. */
+ private static Schema projectSchema(Schema schema, List<String> fieldNames) {
+ List<Field> selectedFields = new ArrayList<>();
+ for (String fieldName : fieldNames) {
+ selectedFields.add(deepCopyField(schema.getField(fieldName)));
+ }
+ return Schema.createRecord(
+ schema.getName() + "_projected",
+ schema.getDoc(),
+ schema.getNamespace(),
+ schema.isError(),
+ selectedFields);
+ }
+
+ private static Field deepCopyField(Field field) {
+ Schema.Field newField =
+ new Schema.Field(
+ field.name(), field.schema(), field.doc(), field.defaultVal(),
field.order());
+ for (Map.Entry<String, Object> kv : field.getObjectProps().entrySet()) {
+ newField.addProp(kv.getKey(), kv.getValue());
+ }
+ if (field.aliases() != null) {
+ for (String alias : field.aliases()) {
+ newField.addAlias(alias);
+ }
+ }
+ return newField;
+ }
+
+ @Override
+ public POutput buildIOWriter(PCollection<Row> input) {
+ final org.apache.avro.Schema schema =
AvroUtils.toAvroSchema(input.getSchema());
+ return input
+ .apply("ToGenericRecords", Convert.to(GenericRecord.class))
+ .apply(
+ "ParquetIOWrite",
+
FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(table.getLocation()));
+ }
+
+ @Override
+ public IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ @Override
+ public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+ return BeamTableStatistics.BOUNDED_UNKNOWN;
+ }
+
+ @Override
+ public ProjectSupport supportsProjects() {
+ return ProjectSupport.WITH_FIELD_REORDERING;
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
index b8a55f5..f24e226 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
@@ -18,18 +18,15 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
import com.google.auto.service.AutoService;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.parquet.ParquetIO;
-import org.apache.beam.sdk.io.parquet.ParquetSchemaIOProvider;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
/**
* {@link TableProvider} for {@link ParquetIO} for consumption by Beam SQL.
*
- * <p>Passes the {@link ParquetSchemaIOProvider} to the generalized table
provider wrapper, {@link
- * SchemaIOTableProviderWrapper}, for Parquet specific behavior.
- *
* <p>A sample of parquet table is:
*
* <pre>{@code
@@ -39,19 +36,18 @@ import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
* favorite_numbers ARRAY<INTEGER>
* )
* TYPE 'parquet'
- * LOCATION '/home/admin/users.parquet'
+ * LOCATION '/home/admin/orders/'
* }</pre>
*/
@AutoService(TableProvider.class)
-public class ParquetTableProvider extends SchemaIOTableProviderWrapper {
+public class ParquetTableProvider extends InMemoryMetaTableProvider {
@Override
- public SchemaIOProvider getSchemaIOProvider() {
- return new ParquetSchemaIOProvider();
+ public String getTableType() {
+ return "parquet";
}
- // TODO[BEAM-10516]: remove this override after TableProvider problem is
fixed
@Override
- public String getTableType() {
- return "parquet";
+ public BeamSqlTable buildBeamSqlTable(Table table) {
+ return new ParquetTable(table);
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
index 49d7b6e..71680f7 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
@@ -44,7 +44,13 @@ public class ParquetTableProviderTest {
private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT,
country VARCHAR)";
- private static final Schema OUTPUT_ROW_SCHEMA =
+ private static final Schema TABLE_SCHEMA =
+ Schema.builder()
+ .addStringField("name")
+ .addInt64Field("age")
+ .addStringField("country")
+ .build();
+ private static final Schema PROJECTED_SCHEMA =
Schema.builder().addInt64Field("age").addStringField("country").build();
@Test
@@ -61,15 +67,34 @@ public class ParquetTableProviderTest {
writePipeline,
env.parseQuery(
"INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John',
42, 'USA')"));
-
writePipeline.run().waitUntilFinish();
PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT *
FROM PersonInfo"));
+ PAssert.that(rows)
+ .containsInAnyOrder(
+ Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L,
"England").build(),
+ Row.withSchema(TABLE_SCHEMA).addValues("John", 42L,
"USA").build());
+
+ PCollection<Row> filtered =
BeamSqlRelUtils.toPCollection(
- readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo
WHERE age > 25"));
+ readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age >
25"));
+ PAssert.that(filtered)
+ .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John",
42L, "USA").build());
- PAssert.that(rows)
- .containsInAnyOrder(Row.withSchema(OUTPUT_ROW_SCHEMA).addValues(42L,
"USA").build());
+ PCollection<Row> projected =
+ BeamSqlRelUtils.toPCollection(
+ readPipeline, env.parseQuery("SELECT age, country FROM
PersonInfo"));
+ PAssert.that(projected)
+ .containsInAnyOrder(
+ Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(),
+ Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
+
+ PCollection<Row> filteredAndProjected =
+ BeamSqlRelUtils.toPCollection(
+ readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo
WHERE age > 25"));
+ PAssert.that(filteredAndProjected)
+ .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L,
"USA").build());
PipelineResult.State state = readPipeline.run().waitUntilFinish();
assertEquals(State.DONE, state);
diff --git
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
deleted file mode 100644
index 71ef5e2..0000000
---
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.parquet;
-
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.FileIO;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-
-/**
- * An implementation of {@link SchemaIOProvider} for reading and writing
parquet files with {@link
- * ParquetIO}.
- */
-@Internal
-@AutoService(SchemaIOProvider.class)
-@SuppressWarnings({
- "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public class ParquetSchemaIOProvider implements SchemaIOProvider {
- /** Returns an id that uniquely represents this IO. */
- @Override
- public String identifier() {
- return "parquet";
- }
-
- /**
- * Returns the expected schema of the configuration object. Note this is
distinct from the schema
- * of the data source itself. No configuration expected for parquet.
- */
- @Override
- public Schema configurationSchema() {
- return Schema.builder().build();
- }
-
- /**
- * Produce a SchemaIO given a String representing the data's location, the
schema of the data that
- * resides there, and some IO-specific configuration object.
- */
- @Override
- public ParquetSchemaIO from(String location, Row configuration, Schema
dataSchema) {
- return new ParquetSchemaIO(location, dataSchema);
- }
-
- @Override
- public boolean requiresDataSchema() {
- return true;
- }
-
- @Override
- public PCollection.IsBounded isBounded() {
- return PCollection.IsBounded.BOUNDED;
- }
-
- /** An abstraction to create schema aware IOs. */
- private static class ParquetSchemaIO implements SchemaIO, Serializable {
- protected final Schema dataSchema;
- protected final String location;
-
- private ParquetSchemaIO(String location, Schema dataSchema) {
- this.dataSchema = dataSchema;
- this.location = location;
- }
-
- @Override
- public Schema schema() {
- return dataSchema;
- }
-
- @Override
- public PTransform<PBegin, PCollection<Row>> buildReader() {
- return new PTransform<PBegin, PCollection<Row>>() {
- @Override
- public PCollection<Row> expand(PBegin begin) {
- org.apache.avro.Schema schema = AvroUtils.toAvroSchema(dataSchema);
- return begin
- .apply(
- "ParquetIORead",
- ParquetIO.read(schema).withBeamSchemas(true).from(location +
"/*"))
- .apply("ToRows", Convert.toRows());
- }
- };
- }
-
- @Override
- public PTransform<PCollection<Row>, POutput> buildWriter() {
- return new PTransform<PCollection<Row>, POutput>() {
- @Override
- public PDone expand(PCollection<Row> input) {
- final org.apache.avro.Schema schema =
AvroUtils.toAvroSchema(input.getSchema());
- input
- .apply("ToGenericRecords", Convert.to(GenericRecord.class))
- .apply(
- "ParquetIOWrite",
-
FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(location));
- return PDone.in(input.getPipeline());
- }
- };
- }
- }
-}