This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 96acc2cdd5d [YAML] Improved pipeline schema definition. (#29083)
96acc2cdd5d is described below
commit 96acc2cdd5d56d2d74b3147af09a57dfe5d51c58
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Oct 20 15:29:20 2023 -0700
[YAML] Improved pipeline schema definition. (#29083)
Slightly stricter definitions for catching more errors, as well as avoding
the use of
anyOf which often makes it difficult to deduce what the true error is.
This does mean a pipeline must have a transform (or source/sink) block
rather than simply
be itself a list of transforms.
---
sdks/python/apache_beam/yaml/README.md | 177 +++++++++++-----------
sdks/python/apache_beam/yaml/pipeline.schema.yaml | 48 ++++--
2 files changed, 128 insertions(+), 97 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/README.md
b/sdks/python/apache_beam/yaml/README.md
index 3ba78784c99..62c0d0eea16 100644
--- a/sdks/python/apache_beam/yaml/README.md
+++ b/sdks/python/apache_beam/yaml/README.md
@@ -166,41 +166,42 @@ Here we read two sources, join them, and write two
outputs.
```
pipeline:
- - type: ReadFromCsv
- name: ReadLeft
- config:
- path: /path/to/left*.csv
+ transforms:
+ - type: ReadFromCsv
+ name: ReadLeft
+ config:
+ path: /path/to/left*.csv
- - type: ReadFromCsv
- name: ReadRight
- config:
- path: /path/to/right*.csv
+ - type: ReadFromCsv
+ name: ReadRight
+ config:
+ path: /path/to/right*.csv
- - type: Sql
- config:
- query: select left.col1, right.col2 from left join right using (col3)
- input:
- left: ReadLeft
- right: ReadRight
-
- - type: WriteToJson
- name: WriteAll
- input: Sql
- config:
- path: /path/to/all.json
+ - type: Sql
+ config:
+ query: select left.col1, right.col2 from left join right using (col3)
+ input:
+ left: ReadLeft
+ right: ReadRight
- - type: Filter
- name: FilterToBig
- input: Sql
- config:
- language: python
- keep: "col2 > 100"
+ - type: WriteToJson
+ name: WriteAll
+ input: Sql
+ config:
+ path: /path/to/all.json
- - type: WriteToCsv
- name: WriteBig
- input: FilterToBig
- config:
- path: /path/to/big.csv
+ - type: Filter
+ name: FilterToBig
+ input: Sql
+ config:
+ language: python
+ keep: "col2 > 100"
+
+ - type: WriteToCsv
+ name: WriteBig
+ input: FilterToBig
+ config:
+ path: /path/to/big.csv
```
One can, however, nest `chains` within a non-linear pipeline.
@@ -209,49 +210,50 @@ that has a single input and contains its own sink.
```
pipeline:
- - type: ReadFromCsv
- name: ReadLeft
- config:
- path: /path/to/left*.csv
+ transforms:
+ - type: ReadFromCsv
+ name: ReadLeft
+ config:
+ path: /path/to/left*.csv
- - type: ReadFromCsv
- name: ReadRight
- config:
- path: /path/to/right*.csv
+ - type: ReadFromCsv
+ name: ReadRight
+ config:
+ path: /path/to/right*.csv
- - type: Sql
- config:
- query: select left.col1, right.col2 from left join right using (col3)
- input:
- left: ReadLeft
- right: ReadRight
-
- - type: WriteToJson
- name: WriteAll
- input: Sql
- config:
- path: /path/to/all.json
+ - type: Sql
+ config:
+ query: select left.col1, right.col2 from left join right using (col3)
+ input:
+ left: ReadLeft
+ right: ReadRight
- - type: chain
- name: ExtraProcessingForBigRows
- input: Sql
- transforms:
- - type: Filter
- config:
- language: python
- keep: "col2 > 100"
- - type: Filter
- config:
- language: python
- keep: "len(col1) > 10"
- - type: Filter
- config:
- language: python
- keep: "col1 > 'z'"
- sink:
- type: WriteToCsv
+ - type: WriteToJson
+ name: WriteAll
+ input: Sql
config:
- path: /path/to/big.csv
+ path: /path/to/all.json
+
+ - type: chain
+ name: ExtraProcessingForBigRows
+ input: Sql
+ transforms:
+ - type: Filter
+ config:
+ language: python
+ keep: "col2 > 100"
+ - type: Filter
+ config:
+ language: python
+ keep: "len(col1) > 10"
+ - type: Filter
+ config:
+ language: python
+ keep: "col1 > 'z'"
+ sink:
+ type: WriteToCsv
+ config:
+ path: /path/to/big.csv
```
## Windowing
@@ -329,25 +331,26 @@ a join per window.
```
pipeline:
- - type: ReadFromPubSub
- name: ReadLeft
- config:
- topic: leftTopic
+ transforms:
+ - type: ReadFromPubSub
+ name: ReadLeft
+ config:
+ topic: leftTopic
- - type: ReadFromPubSub
- name: ReadRight
- config:
- topic: rightTopic
+ - type: ReadFromPubSub
+ name: ReadRight
+ config:
+ topic: rightTopic
- - type: Sql
- config:
- query: select left.col1, right.col2 from left join right using (col3)
- input:
- left: ReadLeft
- right: ReadRight
- windowing:
- type: fixed
- size: 60
+ - type: Sql
+ config:
+ query: select left.col1, right.col2 from left join right using (col3)
+ input:
+ left: ReadLeft
+ right: ReadRight
+ windowing:
+ type: fixed
+ size: 60
```
For a transform with no inputs, the specified windowing is instead applied to
diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml
b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
index ef0d9fe0f26..e784531d9be 100644
--- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml
+++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
#
-$schema: 'http://json-schema.org/schema#'
+$schema: 'http://json-schema.org/draft-07/schema#'
$id:
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml/pipeline.schema.yaml
$defs:
@@ -115,6 +115,23 @@ $defs:
- $ref: '#/$defs/nestedTransform'
- $ref: '#/$defs/implicitInputOutputs'
+ - if:
+ not:
+ anyOf:
+ - properties: { type: { const: composite }}
+ - properties: { type: { const: chain }}
+ then:
+ properties:
+ type: {}
+ name: {}
+ input: {}
+ output: {}
+ windowing: {}
+ config: { type: object }
+ __line__: {}
+ __uuid__: {}
+ additionalProperties: false
+
windowing: {} # TODO
provider:
@@ -128,27 +145,38 @@ $defs:
properties: { __line__: {}}
additionalProperties:
type: string
+ config: { type: object }
+ additionalProperties: false
required:
- type
- transforms
+ - config
type: object
properties:
pipeline:
- anyOf:
- - type: array
- items:
- $ref: '#/$defs/transform'
- - $ref: '#/$defs/transform'
+ allOf:
+ # These are the only top-level properties defined in pipeline.
- type: object
properties:
- transforms:
- type: array
- items:
- $ref: '#/$defs/transform'
+ type: { const: chain }
+ windowing:
+ $ref: '#/$defs/windowing'
+ transforms: {}
+ extra_transforms: {}
+ sink: {}
+ source: {}
__line__: {}
__uuid__: {}
additionalProperties: false
+ # This defines the allowable contents of the attributes above.
+ - $ref: '#/$defs/nestedTransform'
+ # A chain-type transform, like a chain composite, must have implicit io.
+ - if:
+ properties: { type: { const: chain }}
+ required: [type]
+ then:
+ $ref: '#/$defs/implicitInputOutputs'
providers:
type: array
items: