This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d79cd82  [BEAM-7929] Support column projection for Parquet Tables
     new 2447679  Merge pull request #14117: [BEAM-7929] Support column 
projection for Parquet Tables
d79cd82 is described below

commit d79cd82943c90dad518b705b7e81bcd2d2fc0f21
Author: Ismaël Mejía <ieme...@gmail.com>
AuthorDate: Mon Mar 1 10:05:32 2021 +0100

    [BEAM-7929] Support column projection for Parquet Tables
---
 sdks/java/extensions/sql/build.gradle              |   1 +
 .../sql/meta/provider/parquet/ParquetTable.java    | 132 +++++++++++++++++++++
 .../provider/parquet/ParquetTableProvider.java     |  22 ++--
 .../provider/parquet/ParquetTableProviderTest.java |  35 +++++-
 .../sdk/io/parquet/ParquetSchemaIOProvider.java    | 127 --------------------
 5 files changed, 172 insertions(+), 145 deletions(-)

diff --git a/sdks/java/extensions/sql/build.gradle 
b/sdks/java/extensions/sql/build.gradle
index 6de73f2..6758e4b 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -79,6 +79,7 @@ dependencies {
   provided project(":sdks:java:io:kafka")
   provided project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:mongodb")
+  compile library.java.avro
   provided project(":sdks:java:io:parquet")
   provided library.java.jackson_dataformat_xml
   provided library.java.hadoop_client
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
new file mode 100644
index 0000000..b2282ff
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
+import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.io.parquet.ParquetIO.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@SuppressWarnings({"nullness"})
+class ParquetTable extends SchemaBaseBeamTable implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetTable.class);
+
+  private final Table table;
+
+  ParquetTable(Table table) {
+    super(table.getSchema());
+    this.table = table;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(PBegin begin) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = 
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    return begin.apply("ParquetIORead", read).apply("ToRows", 
Convert.toRows());
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(
+      PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
+    final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
+    Read read = 
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    if (!fieldNames.isEmpty()) {
+      Schema projectionSchema = projectSchema(schema, fieldNames);
+      LOG.info("Projecting fields schema : " + projectionSchema.toString());
+      read = read.withProjection(projectionSchema, projectionSchema);
+    }
+    return begin.apply("ParquetIORead", read).apply("ToRows", 
Convert.toRows());
+  }
+
+  /** Returns a copy of the {@link Schema} with only the fieldNames fields. */
+  private static Schema projectSchema(Schema schema, List<String> fieldNames) {
+    List<Field> selectedFields = new ArrayList<>();
+    for (String fieldName : fieldNames) {
+      selectedFields.add(deepCopyField(schema.getField(fieldName)));
+    }
+    return Schema.createRecord(
+        schema.getName() + "_projected",
+        schema.getDoc(),
+        schema.getNamespace(),
+        schema.isError(),
+        selectedFields);
+  }
+
+  private static Field deepCopyField(Field field) {
+    Schema.Field newField =
+        new Schema.Field(
+            field.name(), field.schema(), field.doc(), field.defaultVal(), 
field.order());
+    for (Map.Entry<String, Object> kv : field.getObjectProps().entrySet()) {
+      newField.addProp(kv.getKey(), kv.getValue());
+    }
+    if (field.aliases() != null) {
+      for (String alias : field.aliases()) {
+        newField.addAlias(alias);
+      }
+    }
+    return newField;
+  }
+
+  @Override
+  public POutput buildIOWriter(PCollection<Row> input) {
+    final org.apache.avro.Schema schema = 
AvroUtils.toAvroSchema(input.getSchema());
+    return input
+        .apply("ToGenericRecords", Convert.to(GenericRecord.class))
+        .apply(
+            "ParquetIOWrite",
+            
FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(table.getLocation()));
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  @Override
+  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+    return BeamTableStatistics.BOUNDED_UNKNOWN;
+  }
+
+  @Override
+  public ProjectSupport supportsProjects() {
+    return ProjectSupport.WITH_FIELD_REORDERING;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
index b8a55f5..f24e226 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProvider.java
@@ -18,18 +18,15 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
 
 import com.google.auto.service.AutoService;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.io.parquet.ParquetIO;
-import org.apache.beam.sdk.io.parquet.ParquetSchemaIOProvider;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
  * {@link TableProvider} for {@link ParquetIO} for consumption by Beam SQL.
  *
- * <p>Passes the {@link ParquetSchemaIOProvider} to the generalized table 
provider wrapper, {@link
- * SchemaIOTableProviderWrapper}, for Parquet specific behavior.
- *
  * <p>A sample of parquet table is:
  *
  * <pre>{@code
@@ -39,19 +36,18 @@ import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
  *   favorite_numbers ARRAY<INTEGER>
  * )
  * TYPE 'parquet'
- * LOCATION '/home/admin/users.parquet'
+ * LOCATION '/home/admin/orders/'
  * }</pre>
  */
 @AutoService(TableProvider.class)
-public class ParquetTableProvider extends SchemaIOTableProviderWrapper {
+public class ParquetTableProvider extends InMemoryMetaTableProvider {
   @Override
-  public SchemaIOProvider getSchemaIOProvider() {
-    return new ParquetSchemaIOProvider();
+  public String getTableType() {
+    return "parquet";
   }
 
-  // TODO[BEAM-10516]: remove this override after TableProvider problem is 
fixed
   @Override
-  public String getTableType() {
-    return "parquet";
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return new ParquetTable(table);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
index 49d7b6e..71680f7 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
@@ -44,7 +44,13 @@ public class ParquetTableProviderTest {
 
   private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, 
country VARCHAR)";
 
-  private static final Schema OUTPUT_ROW_SCHEMA =
+  private static final Schema TABLE_SCHEMA =
+      Schema.builder()
+          .addStringField("name")
+          .addInt64Field("age")
+          .addStringField("country")
+          .build();
+  private static final Schema PROJECTED_SCHEMA =
       Schema.builder().addInt64Field("age").addStringField("country").build();
 
   @Test
@@ -61,15 +67,34 @@ public class ParquetTableProviderTest {
         writePipeline,
         env.parseQuery(
             "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 
42, 'USA')"));
-
     writePipeline.run().waitUntilFinish();
 
     PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * 
FROM PersonInfo"));
+    PAssert.that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, 
"England").build(),
+            Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, 
"USA").build());
+
+    PCollection<Row> filtered =
         BeamSqlRelUtils.toPCollection(
-            readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo 
WHERE age > 25"));
+            readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 
25"));
+    PAssert.that(filtered)
+        .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 
42L, "USA").build());
 
-    PAssert.that(rows)
-        .containsInAnyOrder(Row.withSchema(OUTPUT_ROW_SCHEMA).addValues(42L, 
"USA").build());
+    PCollection<Row> projected =
+        BeamSqlRelUtils.toPCollection(
+            readPipeline, env.parseQuery("SELECT age, country FROM 
PersonInfo"));
+    PAssert.that(projected)
+        .containsInAnyOrder(
+            Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(),
+            Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
+
+    PCollection<Row> filteredAndProjected =
+        BeamSqlRelUtils.toPCollection(
+            readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo 
WHERE age > 25"));
+    PAssert.that(filteredAndProjected)
+        .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, 
"USA").build());
 
     PipelineResult.State state = readPipeline.run().waitUntilFinish();
     assertEquals(State.DONE, state);
diff --git 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
 
b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
deleted file mode 100644
index 71ef5e2..0000000
--- 
a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetSchemaIOProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.parquet;
-
-import com.google.auto.service.AutoService;
-import java.io.Serializable;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.FileIO;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
-import org.apache.beam.sdk.schemas.transforms.Convert;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-
-/**
- * An implementation of {@link SchemaIOProvider} for reading and writing 
parquet files with {@link
- * ParquetIO}.
- */
-@Internal
-@AutoService(SchemaIOProvider.class)
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
-public class ParquetSchemaIOProvider implements SchemaIOProvider {
-  /** Returns an id that uniquely represents this IO. */
-  @Override
-  public String identifier() {
-    return "parquet";
-  }
-
-  /**
-   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
-   * of the data source itself. No configuration expected for parquet.
-   */
-  @Override
-  public Schema configurationSchema() {
-    return Schema.builder().build();
-  }
-
-  /**
-   * Produce a SchemaIO given a String representing the data's location, the 
schema of the data that
-   * resides there, and some IO-specific configuration object.
-   */
-  @Override
-  public ParquetSchemaIO from(String location, Row configuration, Schema 
dataSchema) {
-    return new ParquetSchemaIO(location, dataSchema);
-  }
-
-  @Override
-  public boolean requiresDataSchema() {
-    return true;
-  }
-
-  @Override
-  public PCollection.IsBounded isBounded() {
-    return PCollection.IsBounded.BOUNDED;
-  }
-
-  /** An abstraction to create schema aware IOs. */
-  private static class ParquetSchemaIO implements SchemaIO, Serializable {
-    protected final Schema dataSchema;
-    protected final String location;
-
-    private ParquetSchemaIO(String location, Schema dataSchema) {
-      this.dataSchema = dataSchema;
-      this.location = location;
-    }
-
-    @Override
-    public Schema schema() {
-      return dataSchema;
-    }
-
-    @Override
-    public PTransform<PBegin, PCollection<Row>> buildReader() {
-      return new PTransform<PBegin, PCollection<Row>>() {
-        @Override
-        public PCollection<Row> expand(PBegin begin) {
-          org.apache.avro.Schema schema = AvroUtils.toAvroSchema(dataSchema);
-          return begin
-              .apply(
-                  "ParquetIORead",
-                  ParquetIO.read(schema).withBeamSchemas(true).from(location + 
"/*"))
-              .apply("ToRows", Convert.toRows());
-        }
-      };
-    }
-
-    @Override
-    public PTransform<PCollection<Row>, POutput> buildWriter() {
-      return new PTransform<PCollection<Row>, POutput>() {
-        @Override
-        public PDone expand(PCollection<Row> input) {
-          final org.apache.avro.Schema schema = 
AvroUtils.toAvroSchema(input.getSchema());
-          input
-              .apply("ToGenericRecords", Convert.to(GenericRecord.class))
-              .apply(
-                  "ParquetIOWrite",
-                  
FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to(location));
-          return PDone.in(input.getPipeline());
-        }
-      };
-    }
-  }
-}

Reply via email to