This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new db3058a6afb Add glob and single file support to BeamSQL ParquetTable
LOCATION (#35582)
db3058a6afb is described below
commit db3058a6afb73ca389898dff4f5a75c1785edd99
Author: Talat UYARER <[email protected]>
AuthorDate: Mon Jul 14 17:07:51 2025 -0700
Add glob and single file support to BeamSQL ParquetTable LOCATION (#35582)
* Add glob and single file support to ParquetTable LOCATION
* not rely on FileSystems.match call to resolveFilePattern. Added test and
documenration
* CHANGES.md update for behaviour change
---
CHANGES.md | 1 +
.../sql/meta/provider/parquet/ParquetTable.java | 13 ++-
.../provider/parquet/ParquetTableProviderTest.java | 110 +++++++++++++++------
.../dsls/sql/extensions/create-external-table.md | 99 +++++++++++++++++++
4 files changed, 192 insertions(+), 31 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 08e7ccfc75f..0c3c7817523 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -84,6 +84,7 @@
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Go: The pubsubio.Read transform now accepts ReadOptions as a value type
instead of a pointer, and requires exactly one of Topic or Subscription to be
set (they are mutually exclusive). Additionally, the ReadOptions struct now
includes a Topic field for specifying the topic directly, replacing the
previous topic parameter in the Read function signature
([#35369])(https://github.com/apache/beam/pull/35369).
+* SQL: The `ParquetTable` external table provider has changed its handling of
the `LOCATION` property. To read from a directory, the path must now end with a
trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing
slash was not required. This change was made to enable support for glob
patterns and single-file paths
([#35582])(https://github.com/apache/beam/pull/35582).
## Deprecations
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
index 88d162c0237..bdbb48bf1b7 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTable.java
@@ -57,7 +57,8 @@ class ParquetTable extends SchemaBaseBeamTable implements
Serializable {
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
- Read read =
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+ String filePattern = resolveFilePattern(table.getLocation());
+ Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
return begin.apply("ParquetIORead", read).apply("ToRows",
Convert.toRows());
}
@@ -65,7 +66,8 @@ class ParquetTable extends SchemaBaseBeamTable implements
Serializable {
public PCollection<Row> buildIOReader(
PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
final Schema schema = AvroUtils.toAvroSchema(table.getSchema());
- Read read =
ParquetIO.read(schema).withBeamSchemas(true).from(table.getLocation() + "/*");
+ String filePattern = resolveFilePattern(table.getLocation());
+ Read read = ParquetIO.read(schema).withBeamSchemas(true).from(filePattern);
if (!fieldNames.isEmpty()) {
Schema projectionSchema = projectSchema(schema, fieldNames);
LOG.info("Projecting fields schema: {}", projectionSchema);
@@ -122,4 +124,11 @@ class ParquetTable extends SchemaBaseBeamTable implements
Serializable {
public ProjectSupport supportsProjects() {
return ProjectSupport.WITH_FIELD_REORDERING;
}
+
+ private String resolveFilePattern(String location) {
+ if (location.endsWith("/")) {
+ return location + "*";
+ }
+ return location;
+ }
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
index 71680f706fb..63197be0a45 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/parquet/ParquetTableProviderTest.java
@@ -18,8 +18,11 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.parquet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
@@ -27,8 +30,10 @@ import
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -43,25 +48,35 @@ public class ParquetTableProviderTest {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private static final String FIELD_NAMES = "(name VARCHAR, age BIGINT,
country VARCHAR)";
-
private static final Schema TABLE_SCHEMA =
Schema.builder()
.addStringField("name")
.addInt64Field("age")
.addStringField("country")
.build();
- private static final Schema PROJECTED_SCHEMA =
- Schema.builder().addInt64Field("age").addStringField("country").build();
+
+ private static final Row ROW_1 =
+ Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L, "England").build();
+ private static final Row ROW_2 =
+ Row.withSchema(TABLE_SCHEMA).addValues("John", 42L, "USA").build();
+ private static final List<Row> ALL_ROWS = Arrays.asList(ROW_1, ROW_2);
+
+ private BeamSqlEnv env;
+
+ @Before
+ public void setUp() {
+ env = BeamSqlEnv.inMemory(new ParquetTableProvider());
+ }
@Test
- public void testWriteAndReadTable() {
- File destinationFile = new File(tempFolder.getRoot(), "person-info/");
+ public void testReadAndFilter() {
+ File destinationDir = new File(tempFolder.getRoot(), "person-info");
+ String locationPath = destinationDir.getAbsolutePath() + File.separator;
- BeamSqlEnv env = BeamSqlEnv.inMemory(new ParquetTableProvider());
env.executeDdl(
String.format(
"CREATE EXTERNAL TABLE PersonInfo %s TYPE parquet LOCATION '%s'",
- FIELD_NAMES, destinationFile.getAbsolutePath()));
+ FIELD_NAMES, locationPath));
BeamSqlRelUtils.toPCollection(
writePipeline,
@@ -69,32 +84,69 @@ public class ParquetTableProviderTest {
"INSERT INTO PersonInfo VALUES ('Alan', 22, 'England'), ('John',
42, 'USA')"));
writePipeline.run().waitUntilFinish();
- PCollection<Row> rows =
- BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT *
FROM PersonInfo"));
- PAssert.that(rows)
- .containsInAnyOrder(
- Row.withSchema(TABLE_SCHEMA).addValues("Alan", 22L,
"England").build(),
- Row.withSchema(TABLE_SCHEMA).addValues("John", 42L,
"USA").build());
-
- PCollection<Row> filtered =
+ Schema projectedSchema =
Schema.builder().addStringField("name").addInt64Field("age").build();
+ PCollection<Row> filteredAndProjected =
BeamSqlRelUtils.toPCollection(
- readPipeline, env.parseQuery("SELECT * FROM PersonInfo WHERE age >
25"));
- PAssert.that(filtered)
- .containsInAnyOrder(Row.withSchema(TABLE_SCHEMA).addValues("John",
42L, "USA").build());
+ readPipeline, env.parseQuery("SELECT name, age FROM PersonInfo
WHERE age > 25"));
- PCollection<Row> projected =
- BeamSqlRelUtils.toPCollection(
- readPipeline, env.parseQuery("SELECT age, country FROM
PersonInfo"));
- PAssert.that(projected)
- .containsInAnyOrder(
- Row.withSchema(PROJECTED_SCHEMA).addValues(22L, "England").build(),
- Row.withSchema(PROJECTED_SCHEMA).addValues(42L, "USA").build());
+ PAssert.that(filteredAndProjected)
+ .containsInAnyOrder(Row.withSchema(projectedSchema).addValues("John",
42L).build());
- PCollection<Row> filteredAndProjected =
+ readPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testLocationPathConventions() {
+ File destinationDir = new File(tempFolder.getRoot(), "path-test-data");
+ String writeLocation = destinationDir.getAbsolutePath() + File.separator;
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE TmpWriteTable %s TYPE parquet LOCATION
'%s'",
+ FIELD_NAMES, writeLocation));
+ BeamSqlRelUtils.toPCollection(
+ writePipeline,
+ env.parseQuery(
+ "INSERT INTO TmpWriteTable VALUES ('Alan', 22, 'England'),
('John', 42, 'USA')"));
+ writePipeline.run().waitUntilFinish();
+
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE DirTable %s TYPE parquet LOCATION '%s'",
+ FIELD_NAMES, writeLocation));
+ PCollection<Row> dirResult =
+ BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT *
FROM DirTable"));
+ PAssert.that("Directory with '/' reads all files",
dirResult).containsInAnyOrder(ALL_ROWS);
+
+ String globPath = new File(destinationDir, "output-*").getAbsolutePath();
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE GlobTable %s TYPE parquet LOCATION '%s'",
+ FIELD_NAMES, globPath));
+ PCollection<Row> globResult =
+ BeamSqlRelUtils.toPCollection(readPipeline, env.parseQuery("SELECT *
FROM GlobTable"));
+ PAssert.that("Glob 'output-*' reads all files",
globResult).containsInAnyOrder(ALL_ROWS);
+
+ File[] writtenFiles = destinationDir.listFiles((dir, name) ->
name.startsWith("output-"));
+ assertTrue(
+ "Test setup failed: No output files found",
+ writtenFiles != null && writtenFiles.length > 0);
+ String singleFilePath = writtenFiles[0].getAbsolutePath();
+
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE SingleFileTable %s TYPE parquet LOCATION
'%s'",
+ FIELD_NAMES, singleFilePath));
+ PCollection<Row> singleFileResult =
BeamSqlRelUtils.toPCollection(
- readPipeline, env.parseQuery("SELECT age, country FROM PersonInfo
WHERE age > 25"));
- PAssert.that(filteredAndProjected)
- .containsInAnyOrder(Row.withSchema(PROJECTED_SCHEMA).addValues(42L,
"USA").build());
+ readPipeline, env.parseQuery("SELECT * FROM SingleFileTable"));
+
+ PCollection<Long> count = singleFileResult.apply(Count.globally());
+ PAssert.thatSingleton(count)
+ .satisfies(
+ actualCount -> {
+ assertTrue("Count should be greater than 0", actualCount > 0L);
+ return null;
+ });
PipelineResult.State state = readPipeline.run().waitUntilFinish();
assertEquals(State.DONE, state);
diff --git
a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
index 40e75906826..65d7d6dab41 100644
---
a/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
+++
b/website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md
@@ -70,6 +70,7 @@ tableElement: columnName fieldType [ NOT NULL ]
* `bigtable`
* `pubsub`
* `kafka`
+ * `parquet`
* `text`
* `location`: The I/O specific location of the underlying table, specified as
a [String
@@ -554,6 +555,104 @@ Write Mode supports writing to a topic.
For CSV only simple types are supported.
+## Parquet
+
+### Syntax
+
+```
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [,
tableElement ]*)
+TYPE parquet
+LOCATION '/path/to/files/'
+```
+
+* `LOCATION`: The path to the Parquet file(s). The interpretation of the path
is based on a convention:
+ * **Directory:** A path ending with a forward slash (`/`) is treated as a
directory. Beam reads all files within that directory. Example:
`'gs://my-bucket/orders/'`.
+ * **Glob Pattern:** A path containing wildcard characters (`*`, `?`, `[]`)
is treated as a glob pattern that the underlying file system expands. Example:
`'gs://my-bucket/orders/date=2025-*-??/*.parquet'`.
+ * **Single File:** A full path that does not end in a slash and contains
no wildcards is treated as a path to a single file. Example:
`'gs://my-bucket/orders/data.parquet'`.
+
+### Read Mode
+
+Supports reading from Parquet files specified by the `LOCATION`. Predicate and
projection push-down are supported to improve performance.
+
+### Write Mode
+
+Supports writing to a set of sharded Parquet files in a specified directory.
+
+### Schema
+
+The specified schema is used to read and write Parquet files. The schema is
converted to an Avro schema internally for `ParquetIO`. Beam SQL types map to
Avro types as follows:
+
+<table>
+ <tr>
+ <td><b>Beam SQL Type</b>
+ </td>
+ <td><b>Avro Type</b>
+ </td>
+ </tr>
+ <tr>
+ <td>TINYINT, SMALLINT, INTEGER, BIGINT
+ </td>
+ <td>long
+ </td>
+ </tr>
+ <tr>
+ <td>FLOAT, DOUBLE
+ </td>
+ <td>double
+ </td>
+ </tr>
+ <tr>
+ <td>DECIMAL
+ </td>
+ <td>bytes (with logical type)
+ </td>
+ </tr>
+ <tr>
+ <td>BOOLEAN
+ </td>
+ <td>boolean
+ </td>
+ </tr>
+ <tr>
+ <td>DATE, TIME, TIMESTAMP
+ </td>
+ <td>long (with logical type)
+ </td>
+ </tr>
+ <tr>
+ <td>CHAR, VARCHAR
+ </td>
+ <td>string
+ </td>
+ </tr>
+ <tr>
+ <td>ARRAY
+ </td>
+ <td>array
+ </td>
+ </tr>
+ <tr>
+ <td>ROW
+ </td>
+ <td>record
+ </td>
+ </tr>
+</table>
+
+### Example
+
+```
+CREATE EXTERNAL TABLE daily_orders (
+ order_id BIGINT,
+ product_name VARCHAR,
+ purchase_ts TIMESTAMP
+)
+TYPE parquet
+LOCATION '/gcs/my-data/orders/2025-07-14/*';
+```
+
+---
+
## MongoDB
### Syntax