chamikaramj commented on a change in pull request #12188: URL: https://github.com/apache/beam/pull/12188#discussion_r458298289
########## File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that writes to and reads from Kafka. + + This example reads from the PubSub NYC Taxi stream described in + https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a + given Kafka topic and reads back from the same Kafka topic. + """ + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions + + +def run(bootstrap_servers, topic, pipeline_args): + # bootstrap_servers = '...' Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/README.md ########## @@ -0,0 +1,190 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Python KafkaIO Example + +This example reads from the Google Cloud Pub/Sub NYC Taxi stream described +[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes +to a given Kafka topic, and reads back from the same Kafka topic. This example +uses cross-language transforms available in +[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py). +Transforms are implemented in Java and are available +[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java). + +## Prerequisites + +Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html) +in your system and make sure that `JAVA_HOME` environment variable points to +your JDK installation. Make sure that `java` command is available in +the environment. + +```sh +java --version +<Should print information regarding the installed Java version> +``` + +## Setup the Kafka cluster + +This example requires users to setup a Kafka cluster that the Beam runner +executing the pipeline has access to. + +See [here]((https://kafka.apache.org/quickstart)) for general instructions on +setting up a Kafka cluster. One option is to setup the Kafka cluster in +[GCE](https://cloud.google.com/compute). See +[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) +for step by step instructions on setting up a single node Kafka cluster in GCE. +When using Dataflow consider starting the Kafka cluster in the region where +Dataflow pipeline will be running. See +[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) +for more details regarding the selecting a GCP region for Dataflow. + +Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart) +of the Kafka cluster to be `123.45.67.89:123` and the port to be `9092`. + +```sh +export BOOTSTRAP_SERVER="123.45.67.89:123:9092" +``` + +## Running the example on latest released Beam version + +Perform Beam runner specific setup. Note that cross-language transforms require +portable implementations of Spark/Flink/Direct runners. Dataflow requires +[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). +See [here](https://beam.apache.org/documentation/runners/dataflow/) for +instructions for setting up Dataflow. + +Setup a virtual environment for running Beam Python programs. See +[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. +Dataflow requires the `gcp` tag when installing Beam. + +```sh +python -m venv env +source env/bin/activate +pip install -e 'apache-beam[gcp]' +``` + +Run the Beam pipeline. You can either use the default Kafka topic name or +specify a Kafka topic name. Following command assumes Dataflow. See +[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on +running Beam Python programs on other runners. + +Note that this exemple is not available in Beam versions before 2.24.0 hence +you'll have to either get the example program from Beam or follow steps +provided in the section *Running the Example from a Beam Git Clone*. + +```sh +export PROJECT="$(gcloud config get-value project)" +export TEMP_LOCATION="gs://MY-BUCKET/temp" +export REGION="us-central1" +export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" +export NUM_WORKERS="5" + +python -m apache_beam.examples.kafkataxi.kafka_taxi \ + --runner DataflowRunner \ + --temp_location $TEMP_LOCATION \ + --project $PROJECT \ + --region $REGION \ + --num_workers $NUM_WORKERS \ + --job_name $JOB_NAME \ + --bootstrap_servers $BOOTSTRAP_SERVER \ + --experiments=use_runner_v2 +``` + +## *(Optional)* Running the Example from a Beam Git Clone + +Running this example from a Beam Git clone requires some additional steps. + +Checkout a clone of the Beam Git repo. See +[here](https://beam.apache.org/contribute/) for prerequisites. + +Assume your Github username to be `GITHUB_USERNAME`. + +```sh +git clone [email protected]:${GITHUB_USERNAME}/beam +cd beam +``` + +Build IO expansion service jar. + +```sh +./gradlew :sdks:java:io:expansion-service:build +``` + +Push a java SDK Harness container to [Docker](https://www.docker.com/get-started) +Hub. See +[here](https://beam.apache.org/documentation/runtime/environments/) for +prerequisites and additional information. + +```sh +export DOCKER_ROOT="Your Docker Repository Root" +./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest +docker push $DOCKER_ROOT/beam_java_sdk:latest +``` + +For portable Flink/Spark in local mode, instead of above command just build the +Java SDK harness container locally using the default values for repository root +and the docker tag. + +Activate your Python virtual environment. This example uses `venv`. See +[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for +instructions regarding setting up other types of Python virtual environments. + +```sh +cd .. # Creating the virtual environment in the top level work directory. +python -m venv env +source env/bin/activate +``` + +Install Beam and dependencies and build a Beam distribution. + +```sh +cd beam/sdks/python +pip install -r build-requirements.txt +pip install -e '.[gcp]' +python setup.py sdist +``` + +Run the Beam pipeline. You can either use the default Kafka topic name or specify +a Kafka topic name. Following command assumes Dataflow. See +[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on +running Beam Python programs on other runners. + +See [here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) +for more details regarding the selecting a GCP region for Dataflow. + +```sh +export PROJECT="$(gcloud config get-value project)" +export TEMP_LOCATION="gs://MY-BUCKET/temp" +export REGION="us-central1" +export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" +export export NUM_WORKERS="5" Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/README.md ########## @@ -0,0 +1,190 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Python KafkaIO Example + +This example reads from the Google Cloud Pub/Sub NYC Taxi stream described +[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes +to a given Kafka topic, and reads back from the same Kafka topic. This example +uses cross-language transforms available in +[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py). +Transforms are implemented in Java and are available +[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java). + +## Prerequisites + +Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html) +in your system and make sure that `JAVA_HOME` environment variable points to +your JDK installation. Make sure that `java` command is available in +the environment. + +```sh +java --version +<Should print information regarding the installed Java version> +``` + +## Setup the Kafka cluster + +This example requires users to setup a Kafka cluster that the Beam runner +executing the pipeline has access to. + +See [here]((https://kafka.apache.org/quickstart)) for general instructions on +setting up a Kafka cluster. One option is to setup the Kafka cluster in +[GCE](https://cloud.google.com/compute). See +[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) +for step by step instructions on setting up a single node Kafka cluster in GCE. +When using Dataflow consider starting the Kafka cluster in the region where +Dataflow pipeline will be running. See +[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) +for more details regarding the selecting a GCP region for Dataflow. + +Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart) +of the Kafka cluster to be `123.45.67.89:123` and the port to be `9092`. + +```sh +export BOOTSTRAP_SERVER="123.45.67.89:123:9092" +``` + +## Running the example on latest released Beam version + +Perform Beam runner specific setup. Note that cross-language transforms require +portable implementations of Spark/Flink/Direct runners. Dataflow requires +[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). +See [here](https://beam.apache.org/documentation/runners/dataflow/) for +instructions for setting up Dataflow. + +Setup a virtual environment for running Beam Python programs. See +[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. +Dataflow requires the `gcp` tag when installing Beam. + +```sh +python -m venv env +source env/bin/activate +pip install -e 'apache-beam[gcp]' +``` + +Run the Beam pipeline. You can either use the default Kafka topic name or +specify a Kafka topic name. Following command assumes Dataflow. See +[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on +running Beam Python programs on other runners. + +Note that this exemple is not available in Beam versions before 2.24.0 hence Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that writes to and reads from Kafka. + + This example reads from the PubSub NYC Taxi stream described in + https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a + given Kafka topic and reads back from the same Kafka topic. + """ + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions + + +def run(bootstrap_servers, topic, pipeline_args): + # bootstrap_servers = '...' + # topic = 'kafka_taxirides_realtime' + # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner', + # '--temp_location', 'my-temp-location', + # '--region', 'my-region', '--num_workers', + # 'my-num-workers', '--experiments', 'use_runner_v2'] + + pipeline_options = PipelineOptions(pipeline_args) Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/README.md ########## @@ -0,0 +1,190 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Python KafkaIO Example + +This example reads from the Google Cloud Pub/Sub NYC Taxi stream described +[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes +to a given Kafka topic, and reads back from the same Kafka topic. This example +uses cross-language transforms available in +[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py). +Transforms are implemented in Java and are available +[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java). + +## Prerequisites + +Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html) Review comment: I'm using the same link as Beam quickstart: https://beam.apache.org/get-started/quickstart-java/ ########## File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that writes to and reads from Kafka. + + This example reads from the PubSub NYC Taxi stream described in + https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a + given Kafka topic and reads back from the same Kafka topic. + """ + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions + + +def run(bootstrap_servers, topic, pipeline_args): + # bootstrap_servers = '...' + # topic = 'kafka_taxirides_realtime' + # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner', + # '--temp_location', 'my-temp-location', + # '--region', 'my-region', '--num_workers', + # 'my-num-workers', '--experiments', 'use_runner_v2'] + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + pipeline_options.view_as(StandardOptions).streaming = True + window_size = 15 # size of the Window in seconds. + + def log_ride(ride_bytes): + # Converting bytes record from Kafka to a dictionary. + import ast + ride = ast.literal_eval(ride_bytes.decode("UTF-8")) + logging.info( + 'Found ride at latitude %r and longitude %r with %r ' + 'passengers', + ride['latitude'], + ride['longitude'], + ride['passenger_count']) + + with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | beam.io.ReadFromPubSub( + topic='projects/pubsub-public-data/topics/taxirides-realtime'). + with_output_types(bytes) + | beam.Map(lambda x: (b'', x)).with_output_types( + typing.Tuple[bytes, bytes]) # Kafka write transforms expects KVs. + | beam.WindowInto(beam.window.FixedWindows(window_size)) + | WriteToKafka( + producer_config={'bootstrap.servers': bootstrap_servers}, + topic=topic)) + + _ = ( + pipeline + | ReadFromKafka( + consumer_config={'bootstrap.servers': bootstrap_servers}, + topics=[topic]) + | beam.FlatMap(lambda kv: log_ride(kv[1]))) + + pipeline.run().wait_until_finish() Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py ########## @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that writes to and reads from Kafka. + + This example reads from the PubSub NYC Taxi stream described in + https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a + given Kafka topic and reads back from the same Kafka topic. + """ + +# pytype: skip-file + +from __future__ import absolute_import + +import argparse +import logging +import typing + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions + + +def run(argv=None): + """Main entry point; defines and runs the wordcount pipeline.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--bootstrap_servers', + dest='bootstrap_servers', + required=True, + help='Bootstrap servers for the Kafka cluster. Should be accessible by ' + 'the runner') + parser.add_argument( + '--topic', + dest='topic', + default='kafka_taxirides_realtime', + help='Kafka topic to write to and read from') + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) Review comment: Done. ########## File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py ########## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that writes to and reads from Kafka. + + This example reads from the PubSub NYC Taxi stream described in + https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a + given Kafka topic and reads back from the same Kafka topic. + """ + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.kafka import WriteToKafka +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions + + +def run(bootstrap_servers, topic, pipeline_args): + # bootstrap_servers = '...' + # topic = 'kafka_taxirides_realtime' + # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner', Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
