damccorm commented on code in PR #34411: URL: https://github.com/apache/beam/pull/34411#discussion_r2035846618
########## sdks/python/apache_beam/yaml/tests/tfrecord_write.yaml: ########## @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DIR + type: "tempfile.TemporaryDirectory" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {name: "foo"} + - type: MapToFields + config: + language: python + fields: + record: + callable: | + def process(row): + return row.name.encode('utf-8') + output_type: bytes + - type: WriteToTFRecord + config: + file_path_prefix: "{TEMP_DIR}" + file_name_suffix: ".tfrecords" + compression_type: "UNCOMPRESSED" + no_spilling: true + num_shards: 1 + + - pipeline: + type: chain + transforms: + - type: ReadFromTFRecord + config: + file_pattern: "{TEMP_DIR}*" + compression_type: "AUTO" + validate: true + - type: MapToFields + config: + language: python + fields: + record: + callable: | + def process(row): + return row.record.decode('utf-8') + output_type: string + - type: AssertEqual + config: + elements: + - {record: "foo"} + + + + + Review Comment: Nit: we can reduce the trailing whitespace here ########## sdks/standard_external_transforms.yaml: ########## @@ -50,3 +50,63 @@ type: numpy.int64 identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 name: GenerateSequence +- default_service: sdks:java:io:expansion-service:shadowJar Review Comment: Looks like there is a mismatch here: ``` E AssertionError: Lists differ: [{'de[1750 chars]le': False, 'type': 'str'}, 'validate': {'desc[1383 chars]te'}] != [{'de[1750 chars]le': True, 'type': 'str'}, 'validate': {'descr[1381 chars]te'}] E E First differing element 1: E {'def[509 chars]le': False, 'type': 'str'}, 'validate': {'desc[162 chars]ead'} E {'def[509 chars]le': True, 'type': 'str'}, 'validate': {'descr[161 chars]ead'} E E Diff is 5870 characters long. Set self.maxDiff to None to see it. : The standard xlang transforms config file "standard_external_transforms.yaml" is out of sync! Please updateby running './gradlew generateExternalTransformsConfig'and committing the changes. ``` from https://github.com/apache/beam/actions/runs/14344468172/job/40211321870?pr=34411 Could you try running the gradle command again? Looks like maybe just a minor out of sync issue ########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -574,3 +578,87 @@ def write_to_iceberg( def io_providers(): return yaml_provider.load_providers( yaml_utils.locate_data_file('standard_io.yaml')) + + +def read_from_tfrecord( + file_pattern: str, + coder: Optional[coders.BytesCoder] = coders.BytesCoder(), + compression_type: str = "AUTO", + validate: Optional[bool] = True): + """Reads data from TFRecord. + + Args: + file_pattern (str): A file glob pattern to read TFRecords from. + coder (coders.BytesCoder): Coder used to decode each record. + compression_type (CompressionTypes): Used to handle compressed input files. + Default value is CompressionTypes.AUTO, in which case the file_path's + extension will be used to detect the compression. + validate (bool): Boolean flag to verify that the files exist during the + pipeline creation time. + """ + return ReadFromTFRecord( + file_pattern=file_pattern, + compression_type=getattr(CompressionTypes, compression_type), + validate=validate) | beam.Map(lambda s: beam.Row(record=s)) + + +@beam.ptransform_fn +def write_to_tfrecord( + pcoll, + file_path_prefix: str, + coder: Optional[coders.BytesCoder] = coders.BytesCoder(), + file_name_suffix: Optional[str] = "", + num_shards: Optional[int] = 0, + shard_name_template: Optional[str] = None, + compression_type: str = "AUTO", + no_spilling: Optional[bool] = None): Review Comment: Related to above, this arg doesn't actually do anything ########## sdks/python/apache_beam/yaml/standard_io.yaml: ########## @@ -323,4 +333,30 @@ 'ReadFromSpanner': 'beam:schematransform:org.apache.beam:spanner_read:v1' 'WriteToSpanner': 'beam:schematransform:org.apache.beam:spanner_write:v1' config: - gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' \ No newline at end of file + gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + +# TFRecord +- type: renaming + transforms: + 'ReadFromTFRecord': 'ReadFromTFRecord' + 'WriteToTFRecord': 'WriteToTFRecord' + config: + mappings: + 'ReadFromTFRecord': + file_pattern: 'file_pattern' + compression_type: 'compression' + validate: 'validate' + 'WriteToTFRecord': + file_path_prefix: 'output_prefix' + shard_name_template: 'shard_template' + file_name_suffix: 'filename_suffix' + num_shards: 'num_shards' + compression_type: 'compression' + no_spilling: 'no_spilling' Review Comment: Lets start by not exposing the spilling option. I think this is an advanced feature that few folks will actually need, and it makes it harder to provide a python version (since no equivalent exists) ########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -574,3 +578,87 @@ def write_to_iceberg( def io_providers(): return yaml_provider.load_providers( yaml_utils.locate_data_file('standard_io.yaml')) + + +def read_from_tfrecord( + file_pattern: str, + coder: Optional[coders.BytesCoder] = coders.BytesCoder(), + compression_type: str = "AUTO", + validate: Optional[bool] = True): + """Reads data from TFRecord. + + Args: + file_pattern (str): A file glob pattern to read TFRecords from. + coder (coders.BytesCoder): Coder used to decode each record. + compression_type (CompressionTypes): Used to handle compressed input files. + Default value is CompressionTypes.AUTO, in which case the file_path's + extension will be used to detect the compression. + validate (bool): Boolean flag to verify that the files exist during the + pipeline creation time. + """ + return ReadFromTFRecord( + file_pattern=file_pattern, + compression_type=getattr(CompressionTypes, compression_type), + validate=validate) | beam.Map(lambda s: beam.Row(record=s)) + + +@beam.ptransform_fn +def write_to_tfrecord( + pcoll, + file_path_prefix: str, + coder: Optional[coders.BytesCoder] = coders.BytesCoder(), + file_name_suffix: Optional[str] = "", + num_shards: Optional[int] = 0, + shard_name_template: Optional[str] = None, + compression_type: str = "AUTO", + no_spilling: Optional[bool] = None): + """Writes data to TFRecord. + + public abstract Builder setNoSpilling(boolean value); Review Comment: ```suggestion ``` I think this may be failing docs precommit as well, though may be something else (hard to tell with whitespace from the diff ``` /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/target/.tox-docs/docs/lib/python3.9/site-packages/apache_beam/yaml/yaml_io.py:docstring of apache_beam.yaml.yaml_io.write_to_tfrecord:5: ERROR: Unexpected indentation. ``` from https://github.com/apache/beam/actions/runs/14344468180/job/40211320138?pr=34411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org