This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 96acc2cdd5d [YAML] Improved pipeline schema definition. (#29083)
96acc2cdd5d is described below

commit 96acc2cdd5d56d2d74b3147af09a57dfe5d51c58
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Oct 20 15:29:20 2023 -0700

    [YAML] Improved pipeline schema definition. (#29083)
    
    Slightly stricter definitions for catching more errors, as well as avoding 
the use of
    anyOf which often makes it difficult to deduce what the true error is.
    
    This does mean a pipeline must have a transform (or source/sink) block 
rather than simply
    be itself a list of transforms.
---
 sdks/python/apache_beam/yaml/README.md            | 177 +++++++++++-----------
 sdks/python/apache_beam/yaml/pipeline.schema.yaml |  48 ++++--
 2 files changed, 128 insertions(+), 97 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/README.md 
b/sdks/python/apache_beam/yaml/README.md
index 3ba78784c99..62c0d0eea16 100644
--- a/sdks/python/apache_beam/yaml/README.md
+++ b/sdks/python/apache_beam/yaml/README.md
@@ -166,41 +166,42 @@ Here we read two sources, join them, and write two 
outputs.
 
 ```
 pipeline:
-  - type: ReadFromCsv
-    name: ReadLeft
-    config:
-      path: /path/to/left*.csv
+  transforms:
+    - type: ReadFromCsv
+      name: ReadLeft
+      config:
+        path: /path/to/left*.csv
 
-  - type: ReadFromCsv
-    name: ReadRight
-    config:
-      path: /path/to/right*.csv
+    - type: ReadFromCsv
+      name: ReadRight
+      config:
+        path: /path/to/right*.csv
 
-  - type: Sql
-    config:
-      query: select left.col1, right.col2 from left join right using (col3)
-    input:
-      left: ReadLeft
-      right: ReadRight
-
-  - type: WriteToJson
-    name: WriteAll
-    input: Sql
-    config:
-      path: /path/to/all.json
+    - type: Sql
+      config:
+        query: select left.col1, right.col2 from left join right using (col3)
+      input:
+        left: ReadLeft
+        right: ReadRight
 
-  - type: Filter
-    name: FilterToBig
-    input: Sql
-    config:
-      language: python
-      keep: "col2 > 100"
+    - type: WriteToJson
+      name: WriteAll
+      input: Sql
+      config:
+        path: /path/to/all.json
 
-  - type: WriteToCsv
-    name: WriteBig
-    input: FilterToBig
-    config:
-      path: /path/to/big.csv
+    - type: Filter
+      name: FilterToBig
+      input: Sql
+      config:
+        language: python
+        keep: "col2 > 100"
+
+    - type: WriteToCsv
+      name: WriteBig
+      input: FilterToBig
+      config:
+        path: /path/to/big.csv
 ```
 
 One can, however, nest `chains` within a non-linear pipeline.
@@ -209,49 +210,50 @@ that has a single input and contains its own sink.
 
 ```
 pipeline:
-  - type: ReadFromCsv
-    name: ReadLeft
-    config:
-      path: /path/to/left*.csv
+  transforms:
+    - type: ReadFromCsv
+      name: ReadLeft
+      config:
+        path: /path/to/left*.csv
 
-  - type: ReadFromCsv
-    name: ReadRight
-    config:
-      path: /path/to/right*.csv
+    - type: ReadFromCsv
+      name: ReadRight
+      config:
+        path: /path/to/right*.csv
 
-  - type: Sql
-    config:
-      query: select left.col1, right.col2 from left join right using (col3)
-    input:
-      left: ReadLeft
-      right: ReadRight
-
-  - type: WriteToJson
-    name: WriteAll
-    input: Sql
-    config:
-      path: /path/to/all.json
+    - type: Sql
+      config:
+        query: select left.col1, right.col2 from left join right using (col3)
+      input:
+        left: ReadLeft
+        right: ReadRight
 
-  - type: chain
-    name: ExtraProcessingForBigRows
-    input: Sql
-    transforms:
-      - type: Filter
-        config:
-          language: python
-          keep: "col2 > 100"
-      - type: Filter
-        config:
-          language: python
-          keep: "len(col1) > 10"
-      - type: Filter
-        config:
-          language: python
-          keep: "col1 > 'z'"
-    sink:
-      type: WriteToCsv
+    - type: WriteToJson
+      name: WriteAll
+      input: Sql
       config:
-        path: /path/to/big.csv
+        path: /path/to/all.json
+
+    - type: chain
+      name: ExtraProcessingForBigRows
+      input: Sql
+      transforms:
+        - type: Filter
+          config:
+            language: python
+            keep: "col2 > 100"
+        - type: Filter
+          config:
+            language: python
+            keep: "len(col1) > 10"
+        - type: Filter
+          config:
+            language: python
+            keep: "col1 > 'z'"
+      sink:
+        type: WriteToCsv
+        config:
+          path: /path/to/big.csv
 ```
 
 ## Windowing
@@ -329,25 +331,26 @@ a join per window.
 
 ```
 pipeline:
-  - type: ReadFromPubSub
-    name: ReadLeft
-    config:
-      topic: leftTopic
+  transforms:
+    - type: ReadFromPubSub
+      name: ReadLeft
+      config:
+        topic: leftTopic
 
-  - type: ReadFromPubSub
-    name: ReadRight
-    config:
-      topic: rightTopic
+    - type: ReadFromPubSub
+      name: ReadRight
+      config:
+        topic: rightTopic
 
-  - type: Sql
-    config:
-      query: select left.col1, right.col2 from left join right using (col3)
-    input:
-      left: ReadLeft
-      right: ReadRight
-    windowing:
-      type: fixed
-      size: 60
+    - type: Sql
+      config:
+        query: select left.col1, right.col2 from left join right using (col3)
+      input:
+        left: ReadLeft
+        right: ReadRight
+      windowing:
+        type: fixed
+        size: 60
 ```
 
 For a transform with no inputs, the specified windowing is instead applied to
diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml 
b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
index ef0d9fe0f26..e784531d9be 100644
--- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml
+++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-$schema: 'http://json-schema.org/schema#'
+$schema: 'http://json-schema.org/draft-07/schema#'
 $id: 
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml/pipeline.schema.yaml
 
 $defs:
@@ -115,6 +115,23 @@ $defs:
             - $ref: '#/$defs/nestedTransform'
             - $ref: '#/$defs/implicitInputOutputs'
 
+      - if:
+          not:
+            anyOf:
+              - properties: { type: { const: composite }}
+              - properties: { type: { const: chain }}
+        then:
+          properties:
+            type: {}
+            name: {}
+            input: {}
+            output: {}
+            windowing: {}
+            config: { type: object }
+            __line__: {}
+            __uuid__: {}
+          additionalProperties: false
+
   windowing: {}  # TODO
 
   provider:
@@ -128,27 +145,38 @@ $defs:
         properties: { __line__: {}}
         additionalProperties:
           type: string
+      config: { type: object }
+    additionalProperties: false
     required:
       - type
       - transforms
+      - config
 
 type: object
 properties:
   pipeline:
-    anyOf:
-      - type: array
-        items:
-          $ref: '#/$defs/transform'
-      - $ref: '#/$defs/transform'
+    allOf:
+      # These are the only top-level properties defined in pipeline.
       - type: object
         properties:
-          transforms:
-            type: array
-            items:
-              $ref: '#/$defs/transform'
+          type: { const: chain }
+          windowing:
+            $ref: '#/$defs/windowing'
+          transforms: {}
+          extra_transforms: {}
+          sink: {}
+          source: {}
           __line__: {}
           __uuid__: {}
         additionalProperties: false
+      # This defines the allowable contents of the attributes above.
+      - $ref: '#/$defs/nestedTransform'
+      # A chain-type transform, like a chain composite, must have implicit io.
+      - if:
+          properties: { type: { const: chain }}
+          required: [type]
+        then:
+          $ref: '#/$defs/implicitInputOutputs'
   providers:
     type: array
     items:

Reply via email to