This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c4b42f81c6 Phase 1sh - more yaml integration tests (#34692)
5c4b42f81c6 is described below

commit 5c4b42f81c67fefec42e28cc6faae52f21ba8f56
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Apr 22 21:56:08 2025 -0400

    Phase 1sh - more yaml integration tests (#34692)
    
    * assign_timestamp yaml test
    
    * create yaml test
    
    * extract window yaml test
    
    * ml transform yaml test
    
    * validate with schema yaml test
    
    * add extra grandle parameter for ml_test
    
    * add ml_test package if parameter is present
---
 .../workflows/beam_PreCommit_Yaml_Xlang_Direct.yml |   2 +-
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   8 +-
 .../apache_beam/yaml/tests/assign_timestamps.yaml  |  84 +++++++++++++
 sdks/python/apache_beam/yaml/tests/create.yaml     |  66 +++++++++++
 .../yaml/tests/extract_windowing_Info.yaml         | 131 +++++++++++++++++++++
 .../apache_beam/yaml/tests/ml_transform.yaml       |  92 +++++++++++++++
 .../yaml/tests/validate_with_schema.yaml           |  95 +++++++++++++++
 7 files changed, 476 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml 
b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
index a65970968b2..92a20e5003e 100644
--- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
+++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
@@ -91,7 +91,7 @@ jobs:
       - name: run PreCommit Yaml Xlang Direct script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :sdks:python:yamlIntegrationTests
+          gradle-command: :sdks:python:yamlIntegrationTests 
-PbeamPythonExtra=ml_test
       - name: Archive Python Test Results
         uses: actions/upload-artifact@v4
         if: failure()
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 6eb45a109d3..472ece04f9e 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -3021,9 +3021,15 @@ class BeamModulePlugin implements Plugin<Project> {
         dependsOn ':sdks:python:sdist'
         doLast {
           def distTarBall = "${pythonRootDir}/build/apache-beam.tar.gz"
+          def packages = "gcp,test,aws,azure,dataframe"
+          def extra = project.findProperty('beamPythonExtra')
+          if (extra) {
+            packages += ",${extra}"
+          }
+
           project.exec {
             executable 'sh'
-            args '-c', ". ${project.ext.envdir}/bin/activate && pip install 
--pre --retries 10 ${distTarBall}[gcp,test,aws,azure,dataframe]"
+            args '-c', ". ${project.ext.envdir}/bin/activate && pip install 
--pre --retries 10 ${distTarBall}[${packages}]"
           }
         }
       }
diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml 
b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
new file mode 100644
index 00000000000..edaa581214e
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Assign timestamp to beam row element
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          name: CreateVisits
+          config:
+            elements:
+              - user: alice
+                new_time_stamp: 1
+              - user: alice
+                new_time_stamp: 3
+              - user: bob
+                new_time_stamp: 7
+        - type: AssignTimestamps
+          config:
+            timestamp: new_time_stamp
+        - type: ExtractWindowingInfo
+          config:
+            fields: [timestamp]
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              user: user
+              timestamp: timestamp
+        - type: AssertEqual
+          config:
+            elements:
+              - {user: "alice", timestamp: 1}
+              - {user: "alice", timestamp: 3}
+              - {user: "bob", timestamp: 7}
+
+  # Assign timestamp to beam row element with error_handling
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          name: CreateVisits
+          config:
+            elements:
+              - {user: alice, timestamp: "not-valid"}
+              - {user: bob, timestamp: 3}
+        - type: AssignTimestamps
+          input: CreateVisits
+          config:
+            timestamp: timestamp
+            error_handling:
+              output: invalid_rows
+        - type: MapToFields
+          input: AssignTimestamps.invalid_rows
+          config:
+            language: python
+            fields:
+              user: "element.user"
+              timestamp: "element.timestamp"
+        - type: AssertEqual
+          input: MapToFields
+          config:
+            elements:
+              - {user: "alice", timestamp: "not-valid"}
+        - type: AssertEqual
+          input: AssignTimestamps
+          config:
+            elements:
+              - {user: bob, timestamp: 3}
diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml 
b/sdks/python/apache_beam/yaml/tests/create.yaml
new file mode 100644
index 00000000000..bed364c1714
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/create.yaml
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Simple Create with element list
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements: [1,2,3,4,5]
+        - type: AssertEqual
+          config:
+            elements:
+              - {element: 1}
+              - {element: 2}
+              - {element: 3}
+              - {element: 4}
+              - {element: 5}  
+
+  # Simple Create with more complex beam row
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {first: 0, second: [1,2,3]}
+              - {first: 1, second: [4,5,6]}
+        - type: AssertEqual
+          config:
+            elements:
+              - {first: 0, second: [1,2,3]}
+              - {first: 1, second: [4,5,6]}
+
+  # Simple Create with reshuffle
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {first: 0, second: [1,2,3]}
+              - {first: 1, second: [4,5,6]}
+              - {first: 2, second: [7,8,9]}
+            reshuffle: false
+        - type: AssertEqual
+          config:
+            elements:
+              - {first: 0, second: [1,2,3]}
+              - {first: 1, second: [4,5,6]}
+              - {first: 2, second: [7,8,9]}
diff --git a/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml 
b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
new file mode 100644
index 00000000000..45233e2039e
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Simply extract windowing info with no fields resulting in default fields
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+              - {label: "389a", rank: 2}
+        - type: ExtractWindowingInfo
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              label: 
+                callable: "lambda x: x.label"
+              rank:
+                callable: "lambda x: x.rank"
+              timestamp:
+                callable: "lambda x: str(x.timestamp)"
+              window_start:
+                callable: "lambda x: str(x.window_start)"
+              window_end:
+                callable: "lambda x: str(x.window_end)"
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+              - {label: "37a", rank: 1, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+              - {label: "389a", rank: 2, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+
+  
+  # Simply extract windowing info with all available fields
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+              - {label: "389a", rank: 2}
+        - type: ExtractWindowingInfo
+          config:
+            fields: [timestamp, window_start, window_end, window_string, 
window_type, window_object, pane_info]
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              label: 
+                callable: "lambda x: x.label"
+              rank:
+                callable: "lambda x: x.rank"
+              timestamp:
+                callable: "lambda x: str(x.timestamp)"
+              window_start:
+                callable: "lambda x: str(x.window_start)"
+              window_end:
+                callable: "lambda x: str(x.window_end)"
+              window_string:
+                callable: "lambda x: str(x.window_string)"
+              window_type:
+                callable: "lambda x: str(x.window_type)"
+              window_object:
+                callable: "lambda x: str(x.window_object)"
+              pane_info:
+                callable: "lambda x: str(x.pane_info)"
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), 
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, 
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+              - {label: "37a", rank: 1, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), 
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, 
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+              - {label: "389a", rank: 2, timestamp: 
Timestamp(-9223372036854.775000), window_start: 
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), 
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, 
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+
+  
+  # Simply extract windowing info with a few fields renamed
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+              - {label: "389a", rank: 2}
+        - type: ExtractWindowingInfo
+          config:
+            fields: 
+              ts: timestamp
+              ws: window_start
+              pane: pane_info
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              label: 
+                callable: "lambda x: x.label"
+              rank:
+                callable: "lambda x: x.rank"
+              ts:
+                callable: "lambda x: str(x.ts)"
+              ws:
+                callable: "lambda x: str(x.ws)"
+              pane:
+                callable: "lambda x: str(x.pane)"
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0, ts: Timestamp(-9223372036854.775000), 
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, 
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+              - {label: "37a", rank: 1, ts: Timestamp(-9223372036854.775000), 
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, 
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+              - {label: "389a", rank: 2, ts: Timestamp(-9223372036854.775000), 
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, 
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+
diff --git a/sdks/python/apache_beam/yaml/tests/ml_transform.yaml 
b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml
new file mode 100644
index 00000000000..c4881096f0d
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: TEMP_DIR
+    type: "tempfile.TemporaryDirectory"
+
+pipelines:
+  # MLTransform with write_artifact_location
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {num: 0, text: 'To be or not to be'}
+              - {num: 2, text: 'I think, therefore I am'}
+              - {num: 5, text: 'The only thing we have to fear is fear itself'}
+              - {num: 8, text: 'Be the change you wish to see in the world'}
+        - type: MLTransform
+          config:
+            write_artifact_location: "{TEMP_DIR}"
+            transforms:
+              - type: ScaleTo01
+                config:
+                  columns: [num]
+              - type: ScaleByMinMax
+                config:
+                  columns: [num]
+                  min_value: 0
+                  max_value: 100
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              num_scaled:
+                callable: 'lambda x: x.num[0]'
+        - type: AssertEqual
+          config:
+            elements:
+              - {num_scaled: 0.0}
+              - {num_scaled: 25.0}
+              - {num_scaled: 62.5}
+              - {num_scaled: 100.0}
+    options:
+      yaml_experimental_features: ['ML']
+
+  # MLTransform with read_artifact_location based on previous 
+  # write_artifact_location
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {num: 0, text: 'To be or not to be'}
+              - {num: 2, text: 'I think, therefore I am'}
+              - {num: 5, text: 'The only thing we have to fear is fear itself'}
+              - {num: 8, text: 'Be the change you wish to see in the world'}
+        - type: MLTransform
+          config:
+            read_artifact_location: "{TEMP_DIR}"
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              num_scaled:
+                callable: 'lambda x: x.num[0]'
+        - type: AssertEqual
+          config:
+            elements:
+              - {num_scaled: 0.0}
+              - {num_scaled: 25.0}
+              - {num_scaled: 62.5}
+              - {num_scaled: 100.0}
+    options:
+      yaml_experimental_features: ['ML']
+
diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml 
b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
new file mode 100644
index 00000000000..d5ae57a3e8c
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Validate a Beam Row with a predefined schema with no error handling
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          name: InputData
+          config:
+            elements:
+              - {name: "Alice", age: 30, score: 95.5}
+              - {name: "Bob", age: 25, score: 88.0}
+        - type: ValidateWithSchema
+          config:
+            schema:
+              type: object
+              properties:
+                name:
+                  type: string
+                age:
+                  type: integer
+                score:
+                  type: number
+        - type: AssertEqual
+          config:
+            elements:
+              - {name: "Alice", age: 30, score: 95.5}
+              - {name: "Bob", age: 25, score: 88.0}
+
+  # Validate a Beam Row with a predefined schema with error handling
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {name: "Alice", age: 30, score: 95.5}
+              - {name: "Bob", age: 25, score: 88.0}
+              - {name: "Charlie", age: 27, score: "apple"} 
+              - {name: "David", age: "twenty", score: 90.0} 
+              - {name: 30, age: 40, score: 100.0}               
+        - type: ValidateWithSchema
+          input: Create
+          config:
+            schema:
+              type: object
+              properties:
+                name:
+                  type: string
+                age:
+                  type: integer
+                score:
+                  type: number
+              required: [name, age, score]
+            error_handling:
+              output: invalid_rows
+        - type: MapToFields
+          input: ValidateWithSchema.invalid_rows
+          config:
+            language: python
+            fields:
+              name: "element.name"
+              age: "element.age"
+              score: "element.score"
+        - type: AssertEqual
+          input: MapToFields
+          config:
+            elements:
+              - {name: "Charlie", age: 27, score: "apple"} 
+              - {name: "David", age: "twenty", score: 90.0} 
+              - {name: 30, age: 40, score: 100.0}   
+        - type: AssertEqual
+          input: ValidateWithSchema
+          config:
+            elements:
+              - {name: "Alice", age: 30, score: 95.5}
+              - {name: "Bob", age: 25, score: 88.0}
+
+

Reply via email to