This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3364271e2c9 Pass runtime configs & variables to BeamSqlSeekableTable
(#28253)
3364271e2c9 is described below
commit 3364271e2c95be267e4e106415a010d46ea35f0b
Author: gabry.wu <[email protected]>
AuthorDate: Tue Sep 5 11:17:32 2023 +0800
Pass runtime configs & variables to BeamSqlSeekableTable (#28253)
* closing https://github.com/apache/beam/issues/28145
* remove unsupported parameter & add parameter type to context
* remove unnecessary semicolon
---
.../beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 12 ++++++++++--
.../extensions/sql/impl/transform/BeamJoinTransforms.java | 15 +++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
index 95f4b7f47f1..7b924cf6b6d 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.extensions.sql;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.Row;
/**
@@ -27,11 +29,17 @@ import org.apache.beam.sdk.values.Row;
*/
public interface BeamSqlSeekableTable extends Serializable {
/** prepare the instance. */
- default void setUp() {};
+ default void setUp() {}
+
+ default void startBundle(
+ DoFn<Row, Row>.StartBundleContext context, PipelineOptions
pipelineOptions) {}
+
+ default void finishBundle(
+ DoFn<Row, Row>.FinishBundleContext context, PipelineOptions
pipelineOptions) {}
/** return a list of {@code Row} with given key set. */
List<Row> seekRow(Row lookupSubRow);
/** cleanup resources of the instance. */
- default void tearDown() {};
+ default void tearDown() {}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index a30822de151..e4d62c2b5de 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -26,6 +26,7 @@ import
org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import
org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexFieldAccess;
import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexInputRef;
import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexNode;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
@@ -155,6 +156,20 @@ public class BeamJoinTransforms {
seekableTable.setUp();
}
+ @StartBundle
+ public void startBundle(
+ DoFn<Row, Row>.StartBundleContext context,
+ PipelineOptions pipelineOptions) {
+ seekableTable.startBundle(context, pipelineOptions);
+ }
+
+ @FinishBundle
+ public void finishBundle(
+ DoFn<Row, Row>.FinishBundleContext context,
+ PipelineOptions pipelineOptions) {
+ seekableTable.finishBundle(context, pipelineOptions);
+ }
+
@ProcessElement
public void processElement(ProcessContext context) {
Row factRow = context.element();