This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 09aa10c52f1 Python examples to use IObase IOs sinks in streaming (and
batch) (#35811)
09aa10c52f1 is described below
commit 09aa10c52f1d24846ab30e791c3e5bd544e9321f
Author: Razvan Culea <[email protected]>
AuthorDate: Mon Oct 6 18:54:38 2025 +0200
Python examples to use IObase IOs sinks in streaming (and batch) (#35811)
* examples to use IObase IOs sinks (TextIO, ParquetIO, AvroIO, TFRecordsIO)
in batch and streaming
* add __init__.py for module import
* Fix lint (unused imports)
* move the samples in to sinks (as iobased_sinks might be too obscure for
users).
add folder README
add mode comments per damccorm comments.
* fix imports
* formatter fix
* spaces
* test pylint line length
* add no qa
* fix extra line
---
sdks/python/apache_beam/examples/sinks/README.md | 59 ++++++++
sdks/python/apache_beam/examples/sinks/__init__.py | 16 ++
.../apache_beam/examples/sinks/generate_event.py | 144 ++++++++++++++++++
.../examples/sinks/test_periodicimpulse.py | 68 +++++++++
.../examples/sinks/test_write_bounded.py | 98 ++++++++++++
.../examples/sinks/test_write_unbounded.py | 166 +++++++++++++++++++++
6 files changed, 551 insertions(+)
diff --git a/sdks/python/apache_beam/examples/sinks/README.md
b/sdks/python/apache_beam/examples/sinks/README.md
new file mode 100644
index 00000000000..b0e43ba2b52
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/README.md
@@ -0,0 +1,59 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Examples of writing to Sinks
+
+This module contains example pipelines that use the [Beam IO
connectors](https://beam.apache.org/documentation/io/connectors/) also known as
Sinks to write in streaming and batch.
+
+## Batch
+
+test_write_bounded.py - a simple pipeline taking a bounded PCollection
+as input using the
[Create](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Create)
+ transform (useful for testing) and writing it to files using multiple IOs.
+
+### Running the pipeline
+
+To run the pipeline locally:
+
+```sh
+python -m apache_beam.examples.sinks.test_write_bounded
+```
+
+## Streaming
+
+Two example pipelines that use 2 different approches for creating the input.
+
+test_write_unbounded.py uses
[TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
+a method where you can control when data arrives and how watermark advances.
+This is especially useful in unit tests.
+
+test_periodicimpulse.py uses
[PeriodicImpulse](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.periodicsequence.html#apache_beam.transforms.periodicsequence.PeriodicImpulse),
+a method useful to test pipelines in realtime. You can run it to Dataflow as
well.
+
+### Running the pipeline
+
+To run the pipelines locally:
+
+```sh
+python -m apache_beam.examples.sinks.test_write_unbounded
+```
+
+```sh
+python -m apache_beam.examples.sinks.test_periodicimpulse
+```
\ No newline at end of file
diff --git a/sdks/python/apache_beam/examples/sinks/__init__.py
b/sdks/python/apache_beam/examples/sinks/__init__.py
new file mode 100644
index 00000000000..cce3acad34a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/examples/sinks/generate_event.py
b/sdks/python/apache_beam/examples/sinks/generate_event.py
new file mode 100644
index 00000000000..6566a82ef6e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/generate_event.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+import pytz
+
+import apache_beam as beam
+from apache_beam.testing.test_stream import TestStream
+
+
+class GenerateEvent(beam.PTransform):
+ # pylint: disable=line-too-long
+ """This class simulates streaming data.
+ It leverages
[TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
+ a method where you can control when data arrives and how watermark advances.
+ This is especially useful in unit tests.""" # noqa
+
+ @staticmethod
+ def sample_data():
+ return GenerateEvent()
+
+ def expand(self, input):
+ # these are the elements that will arrive in the simulated TestStream
+ # at multiple timestamps
+ elem = [{'age': 10}, {'age': 20}, {'age': 30}]
+
+ # The simulated TestStream adds elements at specific timestamps
+ # using add_elements and advances the watermark after 1 or more
+ # elements are arrive using advance_watermark_to
+ return (
+ input
+ | TestStream().add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 1, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 2, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 3, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 4, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 6,
+ 0, tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 7, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 8, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 9, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 11, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 12, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 13, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 14, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 16, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 17, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 18, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 19, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 20, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 20, 0,
+
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
+ datetime(
+ 2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
+ timestamp()).advance_watermark_to_infinity())
diff --git a/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py
b/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py
new file mode 100644
index 00000000000..0480d064b15
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_periodicimpulse
+
+# This file contains examples of writing unbounded PCollection using
+# PeriodicImpulse to files
+
+import argparse
+import logging
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.window import FixedWindows
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ _, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+ p = beam.Pipeline(options=pipeline_options)
+
+ _ = (
+ p
+ | "Create elements" >> beam.transforms.periodicsequence.PeriodicImpulse(
+ start_timestamp=1,
+ stop_timestamp=100,
+ fire_interval=10,
+ apply_windowing=False)
+ | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(20))
+ | beam.io.WriteToText(
+ file_path_prefix="__output__/ouput_WriteToText",
+ file_name_suffix=".txt"))
+
+ # Execute the pipeline and return the result.
+ result = p.run()
+ result.wait_until_finish()
+ return result
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/sdks/python/apache_beam/examples/sinks/test_write_bounded.py
b/sdks/python/apache_beam/examples/sinks/test_write_bounded.py
new file mode 100644
index 00000000000..a7ce0931882
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_write_bounded.py
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_write_bounded
+
+# This file contains multiple examples of writing bounded PCollection to files
+
+import argparse
+import json
+import logging
+
+import pyarrow
+
+import apache_beam as beam
+from apache_beam.io.fileio import WriteToFiles
+from apache_beam.io.textio import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.util import LogElements
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ _, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+ p = beam.Pipeline(options=pipeline_options)
+
+ output = (
+ p | beam.Create([{
+ 'age': 10
+ }, {
+ 'age': 20
+ }, {
+ 'age': 30
+ }])
+ | beam.LogElements(
+ prefix='before write ', with_window=False, level=logging.INFO))
+ #TextIO
+ output2 = output | 'Write to text' >> WriteToText(
+ file_path_prefix="__output_batch__/ouput_WriteToText",
+ file_name_suffix=".txt",
+ shard_name_template='-U-SSSSS-of-NNNNN')
+ _ = output2 | 'LogElements after WriteToText' >> LogElements(
+ prefix='after WriteToText ', with_window=False, level=logging.INFO)
+
+ #FileIO
+ output3 = (
+ output | 'Serialize' >> beam.Map(json.dumps)
+ | 'Write to files' >>
+ WriteToFiles(path="__output_batch__/output_WriteToFiles"))
+ _ = output3 | 'LogElements after WriteToFiles' >> LogElements(
+ prefix='after WriteToFiles ', with_window=False, level=logging.INFO)
+
+ #ParquetIO
+ output4 = output | 'Write' >> beam.io.WriteToParquet(
+ file_path_prefix="__output_batch__/output_parquet",
+ schema=pyarrow.schema([('age', pyarrow.int64())]))
+ _ = output4 | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet ', with_window=False, level=logging.INFO)
+ _ = output | 'Write parquet' >> beam.io.WriteToParquet(
+ file_path_prefix="__output_batch__/output_WriteToParquet",
+ schema=pyarrow.schema([('age', pyarrow.int64())]),
+ record_batch_size=10,
+ num_shards=0)
+
+ # Execute the pipeline and return the result.
+ result = p.run()
+ result.wait_until_finish()
+ return result
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py
b/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py
new file mode 100644
index 00000000000..95cab44f622
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_write_unbounded
+
+# This file contains multiple examples of writing unbounded PCollection to
files
+
+import argparse
+import json
+import logging
+
+import pyarrow
+
+import apache_beam as beam
+from apache_beam.examples.sinks.generate_event import GenerateEvent
+from apache_beam.io.fileio import WriteToFiles
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterWatermark
+from apache_beam.transforms.util import LogElements
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.utils.timestamp import Duration
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ _, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+ p = beam.Pipeline(options=pipeline_options)
+
+ output = p | GenerateEvent.sample_data()
+
+ #TextIO
+ output2 = output | 'TextIO WriteToText' >> beam.io.WriteToText(
+ file_path_prefix="__output__/ouput_WriteToText",
+ file_name_suffix=".txt",
+ #shard_name_template='-V-SSSSS-of-NNNNN',
+ num_shards=2,
+ triggering_frequency=5,
+ )
+ _ = output2 | 'LogElements after WriteToText' >> LogElements(
+ prefix='after WriteToText ', with_window=True, level=logging.INFO)
+
+ #FileIO
+ _ = (
+ output
+ | 'FileIO window' >> beam.WindowInto(
+ FixedWindows(5),
+ trigger=AfterWatermark(),
+ accumulation_mode=AccumulationMode.DISCARDING,
+ allowed_lateness=Duration(seconds=0))
+ | 'Serialize' >> beam.Map(json.dumps)
+ | 'FileIO WriteToFiles' >>
+ WriteToFiles(path="__output__/output_WriteToFiles"))
+
+ #ParquetIO
+ pyschema = pyarrow.schema([('age', pyarrow.int64())])
+
+ output4a = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix="__output__/output_parquet",
+ #shard_name_template='-V-SSSSS-of-NNNNN',
+ file_name_suffix=".parquet",
+ num_shards=2,
+ triggering_frequency=5,
+ schema=pyschema)
+ _ = output4a | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet 4a ', with_window=True, level=logging.INFO)
+
+ output4aw = (
+ output
+ | 'ParquetIO window' >> beam.WindowInto(
+ FixedWindows(20),
+ trigger=AfterWatermark(),
+ accumulation_mode=AccumulationMode.DISCARDING,
+ allowed_lateness=Duration(seconds=0))
+ | 'WriteToParquet windowed' >> beam.io.WriteToParquet(
+ file_path_prefix="__output__/output_parquet",
+ shard_name_template='-W-SSSSS-of-NNNNN',
+ file_name_suffix=".parquet",
+ num_shards=2,
+ schema=pyschema))
+ _ = output4aw | 'LogElements after WriteToParquet windowed' >> LogElements(
+ prefix='after WriteToParquet 4aw ', with_window=True, level=logging.INFO)
+
+ output4b = (
+ output
+ | 'To PyArrow Table' >>
+ beam.Map(lambda x: pyarrow.Table.from_pylist([x], schema=pyschema))
+ | 'WriteToParquetBatched to parquet' >> beam.io.WriteToParquetBatched(
+ file_path_prefix="__output__/output_parquet_batched",
+ shard_name_template='-V-SSSSS-of-NNNNN',
+ file_name_suffix=".parquet",
+ num_shards=2,
+ triggering_frequency=5,
+ schema=pyschema))
+ _ = output4b | 'LogElements after WriteToParquetBatched' >> LogElements(
+ prefix='after WriteToParquetBatched 4b ',
+ with_window=True,
+ level=logging.INFO)
+
+ #AvroIO
+ avroschema = {
+ 'name': 'dummy', # your supposed to be file name with .avro extension
+ 'type': 'record', # type of avro serilazation, there are more (see above
+ # docs) but as per me this will do most of the time
+ 'fields': [ # this defines actual keys & their types
+ {'name': 'age', 'type': 'int'},
+ ],
+ }
+ output5 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
+ file_path_prefix="__output__/output_avro",
+ #shard_name_template='-V-SSSSS-of-NNNNN',
+ file_name_suffix=".avro",
+ num_shards=2,
+ triggering_frequency=5,
+ schema=avroschema)
+ _ = output5 | 'LogElements after WriteToAvro' >> LogElements(
+ prefix='after WriteToAvro 5 ', with_window=True, level=logging.INFO)
+
+ #TFrecordIO
+ output6 = (
+ output
+ | "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8'))
+ | 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
+ file_path_prefix="__output__/output_tfrecord",
+ #shard_name_template='-V-SSSSS-of-NNNNN',
+ file_name_suffix=".tfrecord",
+ num_shards=2,
+ triggering_frequency=5))
+ _ = output6 | 'LogElements after WriteToTFRecord' >> LogElements(
+ prefix='after WriteToTFRecord 6 ', with_window=True, level=logging.INFO)
+
+ # Execute the pipeline and return the result.
+ result = p.run()
+ result.wait_until_finish()
+ return result
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()