This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 09aa10c52f1 Python examples to use IObase IOs sinks in streaming (and 
batch) (#35811)
09aa10c52f1 is described below

commit 09aa10c52f1d24846ab30e791c3e5bd544e9321f
Author: Razvan Culea <[email protected]>
AuthorDate: Mon Oct 6 18:54:38 2025 +0200

    Python examples to use IObase IOs sinks in streaming (and batch) (#35811)
    
    * examples to use IObase IOs sinks (TextIO, ParquetIO, AvroIO, TFRecordsIO) 
in batch and streaming
    
    * add __init__.py for module import
    
    * Fix lint (unused imports)
    
    * move the samples in to sinks (as iobased_sinks might be too obscure for 
users).
    add folder README
    add mode comments per damccorm comments.
    
    * fix imports
    
    * formatter fix
    
    * spaces
    
    * test pylint line length
    
    * add no qa
    
    * fix extra line
---
 sdks/python/apache_beam/examples/sinks/README.md   |  59 ++++++++
 sdks/python/apache_beam/examples/sinks/__init__.py |  16 ++
 .../apache_beam/examples/sinks/generate_event.py   | 144 ++++++++++++++++++
 .../examples/sinks/test_periodicimpulse.py         |  68 +++++++++
 .../examples/sinks/test_write_bounded.py           |  98 ++++++++++++
 .../examples/sinks/test_write_unbounded.py         | 166 +++++++++++++++++++++
 6 files changed, 551 insertions(+)

diff --git a/sdks/python/apache_beam/examples/sinks/README.md 
b/sdks/python/apache_beam/examples/sinks/README.md
new file mode 100644
index 00000000000..b0e43ba2b52
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/README.md
@@ -0,0 +1,59 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Examples of writing to Sinks
+
+This module contains example pipelines that use the [Beam IO 
connectors](https://beam.apache.org/documentation/io/connectors/) also known as 
Sinks to write in streaming and batch.
+
+## Batch
+
+test_write_bounded.py - a simple pipeline taking a bounded PCollection
+as input using the 
[Create](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Create)
+ transform (useful for testing) and writing it to files using multiple IOs.
+
+### Running the pipeline
+
+To run the pipeline locally:
+
+```sh
+python -m apache_beam.examples.sinks.test_write_bounded
+```
+
+## Streaming
+
+Two example pipelines that use 2 different approches for creating the input.
+
+test_write_unbounded.py uses 
[TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
+a method where you can control when data arrives and how watermark advances.
+This is especially useful in unit tests.
+
+test_periodicimpulse.py uses 
[PeriodicImpulse](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.periodicsequence.html#apache_beam.transforms.periodicsequence.PeriodicImpulse),
+a method useful to test pipelines in realtime. You can run it to Dataflow as 
well.
+
+### Running the pipeline
+
+To run the pipelines locally:
+
+```sh
+python -m apache_beam.examples.sinks.test_write_unbounded
+```
+
+```sh
+python -m apache_beam.examples.sinks.test_periodicimpulse
+```
\ No newline at end of file
diff --git a/sdks/python/apache_beam/examples/sinks/__init__.py 
b/sdks/python/apache_beam/examples/sinks/__init__.py
new file mode 100644
index 00000000000..cce3acad34a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/examples/sinks/generate_event.py 
b/sdks/python/apache_beam/examples/sinks/generate_event.py
new file mode 100644
index 00000000000..6566a82ef6e
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/generate_event.py
@@ -0,0 +1,144 @@
+#   Licensed to the Apache Software Foundation (ASF) under one
+#   or more contributor license agreements.  See the NOTICE file
+#   distributed with this work for additional information
+#   regarding copyright ownership.  The ASF licenses this file
+#   to you under the Apache License, Version 2.0 (the
+#   "License"); you may not use this file except in compliance
+#   with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+from datetime import datetime
+
+import pytz
+
+import apache_beam as beam
+from apache_beam.testing.test_stream import TestStream
+
+
+class GenerateEvent(beam.PTransform):
+  # pylint: disable=line-too-long
+  """This class simulates streaming data.
+  It leverages 
[TestStream](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/testing/TestStream.html),
+  a method where you can control when data arrives and how watermark advances.
+  This is especially useful in unit tests.""" # noqa
+
+  @staticmethod
+  def sample_data():
+    return GenerateEvent()
+
+  def expand(self, input):
+    # these are the elements that will arrive in the simulated TestStream
+    # at multiple timestamps
+    elem = [{'age': 10}, {'age': 20}, {'age': 30}]
+
+    # The simulated TestStream adds elements at specific timestamps
+    # using add_elements and advances the watermark after 1 or more
+    # elements are arrive using advance_watermark_to
+    return (
+        input
+        | TestStream().add_elements(
+            elements=elem,
+            event_timestamp=datetime(
+                2021, 3, 1, 0, 0, 1, 0,
+                tzinfo=pytz.UTC).timestamp()).add_elements(
+                    elements=elem,
+                    event_timestamp=datetime(
+                        2021, 3, 1, 0, 0, 2, 0,
+                        tzinfo=pytz.UTC).timestamp()).add_elements(
+                            elements=elem,
+                            event_timestamp=datetime(
+                                2021, 3, 1, 0, 0, 3, 0,
+                                tzinfo=pytz.UTC).timestamp()).add_elements(
+                                    elements=elem,
+                                    event_timestamp=datetime(
+                                        2021, 3, 1, 0, 0, 4, 0,
+                                        tzinfo=pytz.UTC).timestamp()).
+        advance_watermark_to(
+            datetime(2021, 3, 1, 0, 0, 5, 0,
+                     tzinfo=pytz.UTC).timestamp()).add_elements(
+                         elements=elem,
+                         event_timestamp=datetime(
+                             2021, 3, 1, 0, 0, 5, 0,
+                             tzinfo=pytz.UTC).timestamp()).
+        add_elements(
+            elements=elem,
+            event_timestamp=datetime(
+                2021, 3, 1, 0, 0, 6,
+                0, tzinfo=pytz.UTC).timestamp()).add_elements(
+                    elements=elem,
+                    event_timestamp=datetime(
+                        2021, 3, 1, 0, 0, 7, 0,
+                        tzinfo=pytz.UTC).timestamp()).add_elements(
+                            elements=elem,
+                            event_timestamp=datetime(
+                                2021, 3, 1, 0, 0, 8, 0,
+                                tzinfo=pytz.UTC).timestamp()).add_elements(
+                                    elements=elem,
+                                    event_timestamp=datetime(
+                                        2021, 3, 1, 0, 0, 9, 0,
+                                        tzinfo=pytz.UTC).timestamp()).
+        advance_watermark_to(
+            datetime(2021, 3, 1, 0, 0, 10, 0,
+                     tzinfo=pytz.UTC).timestamp()).add_elements(
+                         elements=elem,
+                         event_timestamp=datetime(
+                             2021, 3, 1, 0, 0, 10, 0,
+                             tzinfo=pytz.UTC).timestamp()).add_elements(
+                                 elements=elem,
+                                 event_timestamp=datetime(
+                                     2021, 3, 1, 0, 0, 11, 0,
+                                     tzinfo=pytz.UTC).timestamp()).
+        add_elements(
+            elements=elem,
+            event_timestamp=datetime(
+                2021, 3, 1, 0, 0, 12, 0,
+                tzinfo=pytz.UTC).timestamp()).add_elements(
+                    elements=elem,
+                    event_timestamp=datetime(
+                        2021, 3, 1, 0, 0, 13, 0,
+                        tzinfo=pytz.UTC).timestamp()).add_elements(
+                            elements=elem,
+                            event_timestamp=datetime(
+                                2021, 3, 1, 0, 0, 14, 0,
+                                tzinfo=pytz.UTC).timestamp()).
+        advance_watermark_to(
+            datetime(2021, 3, 1, 0, 0, 15, 0,
+                     tzinfo=pytz.UTC).timestamp()).add_elements(
+                         elements=elem,
+                         event_timestamp=datetime(
+                             2021, 3, 1, 0, 0, 15, 0,
+                             tzinfo=pytz.UTC).timestamp()).add_elements(
+                                 elements=elem,
+                                 event_timestamp=datetime(
+                                     2021, 3, 1, 0, 0, 16, 0,
+                                     tzinfo=pytz.UTC).timestamp()).
+        add_elements(
+            elements=elem,
+            event_timestamp=datetime(
+                2021, 3, 1, 0, 0, 17, 0,
+                tzinfo=pytz.UTC).timestamp()).add_elements(
+                    elements=elem,
+                    event_timestamp=datetime(
+                        2021, 3, 1, 0, 0, 18, 0,
+                        tzinfo=pytz.UTC).timestamp()).add_elements(
+                            elements=elem,
+                            event_timestamp=datetime(
+                                2021, 3, 1, 0, 0, 19, 0,
+                                tzinfo=pytz.UTC).timestamp()).
+        advance_watermark_to(
+            datetime(2021, 3, 1, 0, 0, 20, 0,
+                     tzinfo=pytz.UTC).timestamp()).add_elements(
+                         elements=elem,
+                         event_timestamp=datetime(
+                             2021, 3, 1, 0, 0, 20, 0,
+                             
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
+                                 datetime(
+                                     2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
+                                 timestamp()).advance_watermark_to_infinity())
diff --git a/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py 
b/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py
new file mode 100644
index 00000000000..0480d064b15
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_periodicimpulse.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_periodicimpulse
+
+# This file contains examples of writing unbounded PCollection using
+# PeriodicImpulse to files
+
+import argparse
+import logging
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.window import FixedWindows
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  _, pipeline_args = parser.parse_known_args(argv)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  p = beam.Pipeline(options=pipeline_options)
+
+  _ = (
+      p
+      | "Create elements" >> beam.transforms.periodicsequence.PeriodicImpulse(
+          start_timestamp=1,
+          stop_timestamp=100,
+          fire_interval=10,
+          apply_windowing=False)
+      | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(20))
+      | beam.io.WriteToText(
+          file_path_prefix="__output__/ouput_WriteToText",
+          file_name_suffix=".txt"))
+
+  # Execute the pipeline and return the result.
+  result = p.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git a/sdks/python/apache_beam/examples/sinks/test_write_bounded.py 
b/sdks/python/apache_beam/examples/sinks/test_write_bounded.py
new file mode 100644
index 00000000000..a7ce0931882
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_write_bounded.py
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_write_bounded
+
+# This file contains multiple examples of writing bounded PCollection to files
+
+import argparse
+import json
+import logging
+
+import pyarrow
+
+import apache_beam as beam
+from apache_beam.io.fileio import WriteToFiles
+from apache_beam.io.textio import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.util import LogElements
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  _, pipeline_args = parser.parse_known_args(argv)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  p = beam.Pipeline(options=pipeline_options)
+
+  output = (
+      p | beam.Create([{
+          'age': 10
+      }, {
+          'age': 20
+      }, {
+          'age': 30
+      }])
+      | beam.LogElements(
+          prefix='before write ', with_window=False, level=logging.INFO))
+  #TextIO
+  output2 = output | 'Write to text' >> WriteToText(
+      file_path_prefix="__output_batch__/ouput_WriteToText",
+      file_name_suffix=".txt",
+      shard_name_template='-U-SSSSS-of-NNNNN')
+  _ = output2 | 'LogElements after WriteToText' >> LogElements(
+      prefix='after WriteToText ', with_window=False, level=logging.INFO)
+
+  #FileIO
+  output3 = (
+      output | 'Serialize' >> beam.Map(json.dumps)
+      | 'Write to files' >>
+      WriteToFiles(path="__output_batch__/output_WriteToFiles"))
+  _ = output3 | 'LogElements after WriteToFiles' >> LogElements(
+      prefix='after WriteToFiles ', with_window=False, level=logging.INFO)
+
+  #ParquetIO
+  output4 = output | 'Write' >> beam.io.WriteToParquet(
+      file_path_prefix="__output_batch__/output_parquet",
+      schema=pyarrow.schema([('age', pyarrow.int64())]))
+  _ = output4 | 'LogElements after WriteToParquet' >> LogElements(
+      prefix='after WriteToParquet ', with_window=False, level=logging.INFO)
+  _ = output | 'Write parquet' >> beam.io.WriteToParquet(
+      file_path_prefix="__output_batch__/output_WriteToParquet",
+      schema=pyarrow.schema([('age', pyarrow.int64())]),
+      record_batch_size=10,
+      num_shards=0)
+
+  # Execute the pipeline and return the result.
+  result = p.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git a/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py 
b/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py
new file mode 100644
index 00000000000..95cab44f622
--- /dev/null
+++ b/sdks/python/apache_beam/examples/sinks/test_write_unbounded.py
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# To run the pipelines locally:
+# python -m apache_beam.examples.sinks.test_write_unbounded
+
+# This file contains multiple examples of writing unbounded PCollection to 
files
+
+import argparse
+import json
+import logging
+
+import pyarrow
+
+import apache_beam as beam
+from apache_beam.examples.sinks.generate_event import GenerateEvent
+from apache_beam.io.fileio import WriteToFiles
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import AfterWatermark
+from apache_beam.transforms.util import LogElements
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.utils.timestamp import Duration
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  _, pipeline_args = parser.parse_known_args(argv)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  p = beam.Pipeline(options=pipeline_options)
+
+  output = p | GenerateEvent.sample_data()
+
+  #TextIO
+  output2 = output | 'TextIO WriteToText' >> beam.io.WriteToText(
+      file_path_prefix="__output__/ouput_WriteToText",
+      file_name_suffix=".txt",
+      #shard_name_template='-V-SSSSS-of-NNNNN',
+      num_shards=2,
+      triggering_frequency=5,
+  )
+  _ = output2 | 'LogElements after WriteToText' >> LogElements(
+      prefix='after WriteToText ', with_window=True, level=logging.INFO)
+
+  #FileIO
+  _ = (
+      output
+      | 'FileIO window' >> beam.WindowInto(
+          FixedWindows(5),
+          trigger=AfterWatermark(),
+          accumulation_mode=AccumulationMode.DISCARDING,
+          allowed_lateness=Duration(seconds=0))
+      | 'Serialize' >> beam.Map(json.dumps)
+      | 'FileIO WriteToFiles' >>
+      WriteToFiles(path="__output__/output_WriteToFiles"))
+
+  #ParquetIO
+  pyschema = pyarrow.schema([('age', pyarrow.int64())])
+
+  output4a = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+      file_path_prefix="__output__/output_parquet",
+      #shard_name_template='-V-SSSSS-of-NNNNN',
+      file_name_suffix=".parquet",
+      num_shards=2,
+      triggering_frequency=5,
+      schema=pyschema)
+  _ = output4a | 'LogElements after WriteToParquet' >> LogElements(
+      prefix='after WriteToParquet 4a ', with_window=True, level=logging.INFO)
+
+  output4aw = (
+      output
+      | 'ParquetIO window' >> beam.WindowInto(
+          FixedWindows(20),
+          trigger=AfterWatermark(),
+          accumulation_mode=AccumulationMode.DISCARDING,
+          allowed_lateness=Duration(seconds=0))
+      | 'WriteToParquet windowed' >> beam.io.WriteToParquet(
+          file_path_prefix="__output__/output_parquet",
+          shard_name_template='-W-SSSSS-of-NNNNN',
+          file_name_suffix=".parquet",
+          num_shards=2,
+          schema=pyschema))
+  _ = output4aw | 'LogElements after WriteToParquet windowed' >> LogElements(
+      prefix='after WriteToParquet 4aw ', with_window=True, level=logging.INFO)
+
+  output4b = (
+      output
+      | 'To PyArrow Table' >>
+      beam.Map(lambda x: pyarrow.Table.from_pylist([x], schema=pyschema))
+      | 'WriteToParquetBatched to parquet' >> beam.io.WriteToParquetBatched(
+          file_path_prefix="__output__/output_parquet_batched",
+          shard_name_template='-V-SSSSS-of-NNNNN',
+          file_name_suffix=".parquet",
+          num_shards=2,
+          triggering_frequency=5,
+          schema=pyschema))
+  _ = output4b | 'LogElements after WriteToParquetBatched' >> LogElements(
+      prefix='after WriteToParquetBatched 4b ',
+      with_window=True,
+      level=logging.INFO)
+
+  #AvroIO
+  avroschema = {
+      'name': 'dummy', # your supposed to be file name with .avro extension 
+      'type': 'record', # type of avro serilazation, there are more (see above 
+                        # docs) but as per me this will do most of the time
+      'fields': [ # this defines actual keys & their types
+          {'name': 'age', 'type': 'int'},
+      ],
+    }
+  output5 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
+      file_path_prefix="__output__/output_avro",
+      #shard_name_template='-V-SSSSS-of-NNNNN',
+      file_name_suffix=".avro",
+      num_shards=2,
+      triggering_frequency=5,
+      schema=avroschema)
+  _ = output5 | 'LogElements after WriteToAvro' >> LogElements(
+      prefix='after WriteToAvro 5 ', with_window=True, level=logging.INFO)
+
+  #TFrecordIO
+  output6 = (
+      output
+      | "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8'))
+      | 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
+          file_path_prefix="__output__/output_tfrecord",
+          #shard_name_template='-V-SSSSS-of-NNNNN',
+          file_name_suffix=".tfrecord",
+          num_shards=2,
+          triggering_frequency=5))
+  _ = output6 | 'LogElements after WriteToTFRecord' >> LogElements(
+      prefix='after WriteToTFRecord 6 ', with_window=True, level=logging.INFO)
+
+  # Execute the pipeline and return the result.
+  result = p.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

Reply via email to