This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db3058a6afb Add glob and single file support to BeamSQL ParquetTable 
LOCATION (#35582)
db3058a6afb is described below

commit db3058a6afb73ca389898dff4f5a75c1785edd99
Author: Talat UYARER <[email protected]>
AuthorDate: Mon Jul 14 17:07:51 2025 -0700

    Add glob and single file support to BeamSQL ParquetTable LOCATION (#35582)
    
    * Add glob and single file support to ParquetTable LOCATION
    
    * not rely on FileSystems.match call to resolveFilePattern. Added test and 
documenration
    
    * CHANGES.md update for behaviour change
---
 CHANGES.md                                         |   1 +
 .../sql/meta/provider/parquet/ParquetTable.java    |  13 ++-
 .../provider/parquet/ParquetTableProviderTest.java | 110 +++++++++++++++------
 .../dsls/sql/extensions/create-external-table.md   |  99 +++++++++++++++++++
 4 files changed, 192 insertions(+), 31 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 08e7ccfc75f..0c3c7817523 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -84,6 +84,7 @@
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
 * Go: The pubsubio.Read transform now accepts ReadOptions as a value type 
instead of a pointer, and requires exactly one of Topic or Subscription to be 
set (they are mutually exclusive). Additionally, the ReadOptions struct now 
includes a Topic field for specifying the topic directly, replacing the 
previous topic parameter in the Read function signature 
([#35369])(https://github.com/apache/beam/pull/35369).
+* SQL: The `ParquetTable` external table provider has changed its handling of 
the `LOCATION` property. To read from a directory, the path must now end with a 
trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing 
slash was not required. This change was made to enable support for glob 
patterns and single-file paths 
([#35582])(https://github.com/apache/beam/pull/35582).
 
 ## Deprecations
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
index 88d162c0237..bdbb48bf1b7 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -57,7 +57,8 @@ class ParquetTable extends SchemaBaseBeamTable implements 
Serializable {
   @Override
   public PCollection<Row> buildIOReader(PBegin begin) {
     final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
-    Read read = 
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    String filePattern = resolveFilePattern(table.getLocation());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
     return begin.apply("ParquetIORead", read).apply("ToRows", 
Convert.toRows());
   }
 
@@ -65,7 +66,8 @@ class ParquetTable extends SchemaBaseBeamTable implements 
Serializable {
   public PCollection<Row> buildIOReader(
       PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
     final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
-    Read read = 
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+    String filePattern = resolveFilePattern(table.getLocation());
+    Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
     if (!fieldNames.isEmpty()) {
       Schema projectionSchema = projectSchema(schema, fieldNames);
       LOG.info("Projecting fields schema: {}", projectionSchema);
@@ -122,4 +124,11 @@ class ParquetTable extends SchemaBaseBeamTable implements 
Serializable {
   public ProjectSupport supportsProjects() {
     return ProjectSupport.WITH_FIELD_REORDERING;
   }
+
+  private String resolveFilePattern(String location) {
+    if (location.endsWith("/")) {
+      return location + "*";
+    }
+    return location;
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
index 71680f706fb..63197be0a45 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
@@ -18,8 +18,11 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
@@ -27,8 +30,10 @@ import 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -43,25 +48,35 @@ public class ParquetTableProviderTest {
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT, 
country VARCHAR)";
-
   private static final Schema TABLE_SCHEMA =
       Schema.builder()
           .addStringField("name")
           .addInt64Field("age")
           .addStringField("country")
           .build();
-  private static final Schema PROJECTED_SCHEMA =
-      Schema.builder().addInt64Field("age").addStringField("country").build();
+
+  private static final Row ROW_1 =
+      Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build();
+  private static final Row ROW_2 =
+      Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build();
+  private static final List<Row> ALL_ROWS = Arrays.asList(ROW_1, ROW_2);
+
+  private BeamSqlEnv env;
+
+  @Before
+  public void setUp() {
+    env = BeamSqlEnv.inMemory(new ParquetTableProvider());
+  }
 
   @Test
-  public void testWriteAndReadTable() {
-    File destinationFile = new File(tempFolder.getRoot(), "person-info/");
+  public void testReadAndFilter() {
+    File destinationDir = new File(tempFolder.getRoot(), "person-info");
+    String locationPath = destinationDir.getAbsolutePath() + File.separator;
 
-    BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider());
     env.executeDdl(
         String.format(
             "CREATE EXTERNAL TABLE PersonInfo %s TYPE parquet LOCATION '%s'",
-            FIELD_NAMES, destinationFile.getAbsolutePath()));
+            FIELD_NAMES, locationPath));
 
     BeamSqlRelUtils.toPCollection(
         writePipeline,
@@ -69,32 +84,69 @@ public class ParquetTableProviderTest {
             "INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John', 
42, 'USA')"));
     writePipeline.run().waitUntilFinish();
 
-    PCollection<Row> rows =
-        BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * 
FROM PersonInfo"));
-    PAssert.that(rows)
-        .containsInAnyOrder(
-            Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, 
"England").build(),
-            Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, 
"USA").build());
-
-    PCollection<Row> filtered =
+    Schema projectedSchema = 
Schema.builder().addStringField("name").addInt64Field("age").build();
+    PCollection<Row> filteredAndProjected =
         BeamSqlRelUtils.toPCollection(
-            readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age > 
25"));
-    PAssert.that(filtered)
-        .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John", 
42L, "USA").build());
+            readPipeline, env.parseQuery("SELECT name, age FROM PersonInfo 
WHERE age > 25"));
 
-    PCollection<Row> projected =
-        BeamSqlRelUtils.toPCollection(
-            readPipeline, env.parseQuery("SELECT age, country FROM 
PersonInfo"));
-    PAssert.that(projected)
-        .containsInAnyOrder(
-            Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(),
-            Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
+    PAssert.that(filteredAndProjected)
+        .containsInAnyOrder(Row.withSchema(projectedSchema).addValues("John", 
42L).build());
 
-    PCollection<Row> filteredAndProjected =
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testLocationPathConventions() {
+    File destinationDir = new File(tempFolder.getRoot(), "path-test-data");
+    String writeLocation = destinationDir.getAbsolutePath() + File.separator;
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE TmpWriteTable %s TYPE parquet LOCATION 
'%s'",
+            FIELD_NAMES, writeLocation));
+    BeamSqlRelUtils.toPCollection(
+        writePipeline,
+        env.parseQuery(
+            "INSERT INTO TmpWriteTable VALUES ('Alan', 22, 'England'), 
('John', 42, 'USA')"));
+    writePipeline.run().waitUntilFinish();
+
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE DirTable %s TYPE parquet LOCATION '%s'",
+            FIELD_NAMES, writeLocation));
+    PCollection<Row> dirResult =
+        BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * 
FROM DirTable"));
+    PAssert.that("Directory with '/' reads all files", 
dirResult).containsInAnyOrder(ALL_ROWS);
+
+    String globPath = new File(destinationDir, "output-*").getAbsolutePath();
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE GlobTable %s TYPE parquet LOCATION '%s'",
+            FIELD_NAMES, globPath));
+    PCollection<Row> globResult =
+        BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT * 
FROM GlobTable"));
+    PAssert.that("Glob 'output-*' reads all files", 
globResult).containsInAnyOrder(ALL_ROWS);
+
+    File[] writtenFiles = destinationDir.listFiles((dir, name) -> 
name.startsWith("output-"));
+    assertTrue(
+        "Test setup failed: No output files found",
+        writtenFiles != null && writtenFiles.length > 0);
+    String singleFilePath = writtenFiles[0].getAbsolutePath();
+
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE SingleFileTable %s TYPE parquet LOCATION 
'%s'",
+            FIELD_NAMES, singleFilePath));
+    PCollection<Row> singleFileResult =
         BeamSqlRelUtils.toPCollection(
-            readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo 
WHERE age > 25"));
-    PAssert.that(filteredAndProjected)
-        .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L, 
"USA").build());
+            readPipeline, env.parseQuery("SELECT * FROM SingleFileTable"));
+
+    PCollection<Long> count = singleFileResult.apply(Count.globally());
+    PAssert.thatSingleton(count)
+        .satisfies(
+            actualCount -> {
+              assertTrue("Count should be greater than 0", actualCount > 0L);
+              return null;
+            });
 
     PipelineResult.State state = readPipeline.run().waitUntilFinish();
     assertEquals(State.DONE, state);
diff --git 
a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
 
b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 40e75906826..65d7d6dab41 100644
--- 
a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++ 
b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -70,6 +70,7 @@ tableElement: columnName fieldType [ NOT NULL ]
     *   `bigtable`
     *   `pubsub`
     *   `kafka`
+    *   `parquet`
     *   `text`
 *   `location`: The I/O specific location of the underlying table, specified as
     a [String
@@ -554,6 +555,104 @@ Write Mode supports writing to a topic.
 
 For CSV only simple types are supported.
 
+## Parquet
+
+### Syntax
+
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, 
tableElement ]*)
+TYPE parquet
+LOCATION '/path/to/files/'
+```
+
+* `LOCATION`: The path to the Parquet file(s). The interpretation of the path 
is based on a convention:
+    * **Directory:** A path ending with a forward slash (`/`) is treated as a 
directory. Beam reads all files within that directory. Example: 
`'gs://my-bucket/orders/'`.
+    * **Glob Pattern:** A path containing wildcard characters (`*`, `?`, `[]`) 
is treated as a glob pattern that the underlying file system expands. Example: 
`'gs://my-bucket/orders/date=2025-*-??/*.parquet'`.
+    * **Single File:** A full path that does not end in a slash and contains 
no wildcards is treated as a path to a single file. Example: 
`'gs://my-bucket/orders/data.parquet'`.
+
+### Read Mode
+
+Supports reading from Parquet files specified by the `LOCATION`. Predicate and 
projection push-down are supported to improve performance.
+
+### Write Mode
+
+Supports writing to a set of sharded Parquet files in a specified directory.
+
+### Schema
+
+The specified schema is used to read and write Parquet files. The schema is 
converted to an Avro schema internally for `ParquetIO`. Beam SQL types map to 
Avro types as follows:
+
+<table>
+  <tr>
+   <td><b>Beam SQL Type</b>
+   </td>
+   <td><b>Avro Type</b>
+   </td>
+  </tr>
+  <tr>
+   <td>TINYINT, SMALLINT, INTEGER, BIGINT &nbsp;
+   </td>
+   <td>long
+   </td>
+  </tr>
+  <tr>
+   <td>FLOAT, DOUBLE
+   </td>
+   <td>double
+   </td>
+  </tr>
+  <tr>
+   <td>DECIMAL
+   </td>
+   <td>bytes (with logical type)
+   </td>
+  </tr>
+  <tr>
+   <td>BOOLEAN
+   </td>
+   <td>boolean
+   </td>
+  </tr>
+  <tr>
+   <td>DATE, TIME, TIMESTAMP
+   </td>
+   <td>long (with logical type)
+   </td>
+  </tr>
+  <tr>
+   <td>CHAR, VARCHAR
+   </td>
+   <td>string
+   </td>
+  </tr>
+  <tr>
+   <td>ARRAY
+   </td>
+   <td>array
+   </td>
+  </tr>
+  <tr>
+   <td>ROW
+   </td>
+   <td>record
+   </td>
+  </tr>
+</table>
+
+### Example
+
+```
+CREATE EXTERNAL TABLE daily_orders (
+  order_id BIGINT,
+  product_name VARCHAR,
+  purchase_ts TIMESTAMP
+)
+TYPE parquet
+LOCATION '/gcs/my-data/orders/2025-07-14/*';
+```
+
+---
+
 ## MongoDB
 
 ### Syntax

Reply via email to