Please use beam.io.fileio.WriteToFiles
On Thu, Jul 25, 2024 at 2:48 AM Dhirendra Singh <[email protected]>
wrote:
> Hello Beam Devs,
>
> Thank you for your help.
> I have been trying to connect to the Kafka Enabled Azure event hub
> using Dataflow beam code using Python.
> Apache Beam version: 2.56.0
> Python version: 3.10.13 (main, Feb 1 2024, 05:36:44) [GCC 12.2.0]
>
>
> The code is as show below:
>
> import argparse
> import logging
> import sys
>
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions,
> SetupOptions, StandardOptions
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam.transforms import window
> from apache_beam.transforms import trigger
> from apache_beam.transforms.trigger import AfterProcessingTime,
> Repeatedly, AccumulationMode
>
> class KafkaToGCSOptions(PipelineOptions):
> @classmethod
> def _add_argparse_args(cls, parser):
> #parser.add_argument('--bootstrapServer', required=True,
> help='Kafka bootstrap servers')
> parser.add_argument('--inputTopic', required=True, help='Input
> Kafka topic')
> parser.add_argument('--output', required=True, help='Output
> GCS bucket path')
> parser.add_argument('--output2', required=True, help='Output
> GCS bucket path2')
> #parser.add_argument('--authenticationString', required=True,
> help='SASL JAAS config for Kafka authentication')
>
> def run(argv=None):
> #options = PipelineOptions(argv)
> #pipeline_options = options.view_as(KafkaToGCSOptions)
> parser = argparse.ArgumentParser()
> known_args, pipeline_args = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(pipeline_args,
> save_main_session=True)
> options = pipeline_options.view_as(KafkaToGCSOptions)
> options.view_as(StandardOptions).streaming = True
>
> kafka_config = {
> 'bootstrap.servers': 'custom.servicebus.windows.net:9093',
> 'security.protocol': 'SASL_SSL',
> 'sasl.mechanism': 'PLAIN',
> 'sasl.jaas.config':
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="$ConnectionString"
> password="Endpoint=sb://
> custom.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=";',
> 'auto.offset.reset': 'earliest'
> }
> wordsList = ["1", "2", "3", "4"]
> with beam.Pipeline(options=options) as pipeline:
> msg_kv_bytes = (
> pipeline
> | 'Read from Kafka' >> ReadFromKafka(
> topics=[options.inputTopic],
> consumer_config=kafka_config,
> # Only for Direct Runner and when testing
> #max_num_records=10,
> with_metadata=True
> )
> # | 'Fixed window 5s' >> beam.WindowInto(
> # window.FixedWindows(5),
> # trigger=Repeatedly(AfterProcessingTime(5)),
> # accumulation_mode=AccumulationMode.DISCARDING
> # )
> )
> (msg_kv_bytes | 'Print results' >> beam.Map(lambda x: logging.info
> (x)))
> (msg_kv_bytes | 'Write to GCS' >>
> beam.io.textio.WriteToText(options.output))
> (msg_kv_bytes | 'Write to GCS2' >>
> beam.io.textio.WriteToText(options.output2))
>
>
> pipeline_result = pipeline.run()
> pipeline_result.wait_until_finish()
>
>
>
> if __name__ == '__main__':
> print("Streaming Pub/Sub messages to BigQuery...")
> print(f"Python version: {sys.version}")
> print(f"Apache Beam version: {beam.__version__}")
> print("running to BigQuery...")
> logging.getLogger().setLevel(logging.INFO)
> run()
>
> ########################################################################
>
> Every time I try to run this using Dataflow runner I am getting the
> error attached in the email
>
> ValueError: GroupByKey cannot be applied to an unbounded PCollection
> with global windowing and a default trigger
>
> I tried troubleshooting from my end, looked at various links:
>
> https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
> https://github.com/apache/beam/issues/25598
> https://beam.apache.org/releases/pydoc/current/apache_beam.io.kafka.html
>
> Is this issue still pending or am I doing something wrong?
>
> Appreciate your help.
>
> --
> Warm Regards
> Dhiren
>