This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5c4b42f81c6 Phase 1sh - more yaml integration tests (#34692)
5c4b42f81c6 is described below
commit 5c4b42f81c67fefec42e28cc6faae52f21ba8f56
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Apr 22 21:56:08 2025 -0400
Phase 1sh - more yaml integration tests (#34692)
* assign_timestamp yaml test
* create yaml test
* extract window yaml test
* ml transform yaml test
* validate with schema yaml test
* add extra grandle parameter for ml_test
* add ml_test package if parameter is present
---
.../workflows/beam_PreCommit_Yaml_Xlang_Direct.yml | 2 +-
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 8 +-
.../apache_beam/yaml/tests/assign_timestamps.yaml | 84 +++++++++++++
sdks/python/apache_beam/yaml/tests/create.yaml | 66 +++++++++++
.../yaml/tests/extract_windowing_Info.yaml | 131 +++++++++++++++++++++
.../apache_beam/yaml/tests/ml_transform.yaml | 92 +++++++++++++++
.../yaml/tests/validate_with_schema.yaml | 95 +++++++++++++++
7 files changed, 476 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
index a65970968b2..92a20e5003e 100644
--- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
+++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml
@@ -91,7 +91,7 @@ jobs:
- name: run PreCommit Yaml Xlang Direct script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :sdks:python:yamlIntegrationTests
+ gradle-command: :sdks:python:yamlIntegrationTests
-PbeamPythonExtra=ml_test
- name: Archive Python Test Results
uses: actions/upload-artifact@v4
if: failure()
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 6eb45a109d3..472ece04f9e 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -3021,9 +3021,15 @@ class BeamModulePlugin implements Plugin<Project> {
dependsOn ':sdks:python:sdist'
doLast {
def distTarBall = "${pythonRootDir}/build/apache-beam.tar.gz"
+ def packages = "gcp,test,aws,azure,dataframe"
+ def extra = project.findProperty('beamPythonExtra')
+ if (extra) {
+ packages += ",${extra}"
+ }
+
project.exec {
executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && pip install
--pre --retries 10 ${distTarBall}[gcp,test,aws,azure,dataframe]"
+ args '-c', ". ${project.ext.envdir}/bin/activate && pip install
--pre --retries 10 ${distTarBall}[${packages}]"
}
}
}
diff --git a/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
new file mode 100644
index 00000000000..edaa581214e
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Assign timestamp to beam row element
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ name: CreateVisits
+ config:
+ elements:
+ - user: alice
+ new_time_stamp: 1
+ - user: alice
+ new_time_stamp: 3
+ - user: bob
+ new_time_stamp: 7
+ - type: AssignTimestamps
+ config:
+ timestamp: new_time_stamp
+ - type: ExtractWindowingInfo
+ config:
+ fields: [timestamp]
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ user: user
+ timestamp: timestamp
+ - type: AssertEqual
+ config:
+ elements:
+ - {user: "alice", timestamp: 1}
+ - {user: "alice", timestamp: 3}
+ - {user: "bob", timestamp: 7}
+
+ # Assign timestamp to beam row element with error_handling
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateVisits
+ config:
+ elements:
+ - {user: alice, timestamp: "not-valid"}
+ - {user: bob, timestamp: 3}
+ - type: AssignTimestamps
+ input: CreateVisits
+ config:
+ timestamp: timestamp
+ error_handling:
+ output: invalid_rows
+ - type: MapToFields
+ input: AssignTimestamps.invalid_rows
+ config:
+ language: python
+ fields:
+ user: "element.user"
+ timestamp: "element.timestamp"
+ - type: AssertEqual
+ input: MapToFields
+ config:
+ elements:
+ - {user: "alice", timestamp: "not-valid"}
+ - type: AssertEqual
+ input: AssignTimestamps
+ config:
+ elements:
+ - {user: bob, timestamp: 3}
diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml
b/sdks/python/apache_beam/yaml/tests/create.yaml
new file mode 100644
index 00000000000..bed364c1714
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/create.yaml
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Simple Create with element list
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements: [1,2,3,4,5]
+ - type: AssertEqual
+ config:
+ elements:
+ - {element: 1}
+ - {element: 2}
+ - {element: 3}
+ - {element: 4}
+ - {element: 5}
+
+ # Simple Create with more complex beam row
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {first: 0, second: [1,2,3]}
+ - {first: 1, second: [4,5,6]}
+ - type: AssertEqual
+ config:
+ elements:
+ - {first: 0, second: [1,2,3]}
+ - {first: 1, second: [4,5,6]}
+
+ # Simple Create with reshuffle
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {first: 0, second: [1,2,3]}
+ - {first: 1, second: [4,5,6]}
+ - {first: 2, second: [7,8,9]}
+ reshuffle: false
+ - type: AssertEqual
+ config:
+ elements:
+ - {first: 0, second: [1,2,3]}
+ - {first: 1, second: [4,5,6]}
+ - {first: 2, second: [7,8,9]}
diff --git a/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
new file mode 100644
index 00000000000..45233e2039e
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/extract_windowing_Info.yaml
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Simply extract windowing info with no fields resulting in default fields
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ - {label: "389a", rank: 2}
+ - type: ExtractWindowingInfo
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ label:
+ callable: "lambda x: x.label"
+ rank:
+ callable: "lambda x: x.rank"
+ timestamp:
+ callable: "lambda x: str(x.timestamp)"
+ window_start:
+ callable: "lambda x: str(x.window_start)"
+ window_end:
+ callable: "lambda x: str(x.window_end)"
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+ - {label: "37a", rank: 1, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+ - {label: "389a", rank: 2, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
+
+
+ # Simply extract windowing info with all available fields
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ - {label: "389a", rank: 2}
+ - type: ExtractWindowingInfo
+ config:
+ fields: [timestamp, window_start, window_end, window_string,
window_type, window_object, pane_info]
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ label:
+ callable: "lambda x: x.label"
+ rank:
+ callable: "lambda x: x.rank"
+ timestamp:
+ callable: "lambda x: str(x.timestamp)"
+ window_start:
+ callable: "lambda x: str(x.window_start)"
+ window_end:
+ callable: "lambda x: str(x.window_end)"
+ window_string:
+ callable: "lambda x: str(x.window_string)"
+ window_type:
+ callable: "lambda x: str(x.window_type)"
+ window_object:
+ callable: "lambda x: str(x.window_object)"
+ pane_info:
+ callable: "lambda x: str(x.pane_info)"
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000),
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object:
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True,
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+ - {label: "37a", rank: 1, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000),
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object:
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True,
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+ - {label: "389a", rank: 2, timestamp:
Timestamp(-9223372036854.775000), window_start:
Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000),
window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object:
'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True,
timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+
+
+ # Simply extract windowing info with a few fields renamed
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ - {label: "389a", rank: 2}
+ - type: ExtractWindowingInfo
+ config:
+ fields:
+ ts: timestamp
+ ws: window_start
+ pane: pane_info
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ label:
+ callable: "lambda x: x.label"
+ rank:
+ callable: "lambda x: x.rank"
+ ts:
+ callable: "lambda x: str(x.ts)"
+ ws:
+ callable: "lambda x: str(x.ws)"
+ pane:
+ callable: "lambda x: str(x.pane)"
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0, ts: Timestamp(-9223372036854.775000),
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True,
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+ - {label: "37a", rank: 1, ts: Timestamp(-9223372036854.775000),
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True,
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+ - {label: "389a", rank: 2, ts: Timestamp(-9223372036854.775000),
ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True,
is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
+
diff --git a/sdks/python/apache_beam/yaml/tests/ml_transform.yaml
b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml
new file mode 100644
index 00000000000..c4881096f0d
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/ml_transform.yaml
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+ - name: TEMP_DIR
+ type: "tempfile.TemporaryDirectory"
+
+pipelines:
+ # MLTransform with write_artifact_location
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {num: 0, text: 'To be or not to be'}
+ - {num: 2, text: 'I think, therefore I am'}
+ - {num: 5, text: 'The only thing we have to fear is fear itself'}
+ - {num: 8, text: 'Be the change you wish to see in the world'}
+ - type: MLTransform
+ config:
+ write_artifact_location: "{TEMP_DIR}"
+ transforms:
+ - type: ScaleTo01
+ config:
+ columns: [num]
+ - type: ScaleByMinMax
+ config:
+ columns: [num]
+ min_value: 0
+ max_value: 100
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ num_scaled:
+ callable: 'lambda x: x.num[0]'
+ - type: AssertEqual
+ config:
+ elements:
+ - {num_scaled: 0.0}
+ - {num_scaled: 25.0}
+ - {num_scaled: 62.5}
+ - {num_scaled: 100.0}
+ options:
+ yaml_experimental_features: ['ML']
+
+ # MLTransform with read_artifact_location based on previous
+ # write_artifact_location
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {num: 0, text: 'To be or not to be'}
+ - {num: 2, text: 'I think, therefore I am'}
+ - {num: 5, text: 'The only thing we have to fear is fear itself'}
+ - {num: 8, text: 'Be the change you wish to see in the world'}
+ - type: MLTransform
+ config:
+ read_artifact_location: "{TEMP_DIR}"
+ - type: MapToFields
+ config:
+ language: python
+ fields:
+ num_scaled:
+ callable: 'lambda x: x.num[0]'
+ - type: AssertEqual
+ config:
+ elements:
+ - {num_scaled: 0.0}
+ - {num_scaled: 25.0}
+ - {num_scaled: 62.5}
+ - {num_scaled: 100.0}
+ options:
+ yaml_experimental_features: ['ML']
+
diff --git a/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
new file mode 100644
index 00000000000..d5ae57a3e8c
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Validate a Beam Row with a predefined schema with no error handling
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ name: InputData
+ config:
+ elements:
+ - {name: "Alice", age: 30, score: 95.5}
+ - {name: "Bob", age: 25, score: 88.0}
+ - type: ValidateWithSchema
+ config:
+ schema:
+ type: object
+ properties:
+ name:
+ type: string
+ age:
+ type: integer
+ score:
+ type: number
+ - type: AssertEqual
+ config:
+ elements:
+ - {name: "Alice", age: 30, score: 95.5}
+ - {name: "Bob", age: 25, score: 88.0}
+
+ # Validate a Beam Row with a predefined schema with error handling
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {name: "Alice", age: 30, score: 95.5}
+ - {name: "Bob", age: 25, score: 88.0}
+ - {name: "Charlie", age: 27, score: "apple"}
+ - {name: "David", age: "twenty", score: 90.0}
+ - {name: 30, age: 40, score: 100.0}
+ - type: ValidateWithSchema
+ input: Create
+ config:
+ schema:
+ type: object
+ properties:
+ name:
+ type: string
+ age:
+ type: integer
+ score:
+ type: number
+ required: [name, age, score]
+ error_handling:
+ output: invalid_rows
+ - type: MapToFields
+ input: ValidateWithSchema.invalid_rows
+ config:
+ language: python
+ fields:
+ name: "element.name"
+ age: "element.age"
+ score: "element.score"
+ - type: AssertEqual
+ input: MapToFields
+ config:
+ elements:
+ - {name: "Charlie", age: 27, score: "apple"}
+ - {name: "David", age: "twenty", score: 90.0}
+ - {name: 30, age: 40, score: 100.0}
+ - type: AssertEqual
+ input: ValidateWithSchema
+ config:
+ elements:
+ - {name: "Alice", age: 30, score: 95.5}
+ - {name: "Bob", age: 25, score: 88.0}
+
+