rszper commented on code in PR #29619:
URL: https://github.com/apache/beam/pull/29619#discussion_r1433177604


##########
website/www/site/content/en/blog/scaling-streaming-workload.md:
##########
@@ -0,0 +1,286 @@
+---
+layout: post
+title:  "Scaling a streaming workload on Apache Beam, 1 million events per 
second and beyond"
+date:   2023-12-01 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - pabs
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Scaling a streaming workload on Apache Beam
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/0-intro.png"
+    alt="Streaming Processing">
+
+Scaling a streaming workload is critical for ensuring that a pipeline can 
handle large amounts of data being processed, while minimizing latency and 
executing efficiently. Without proper scaling, a pipeline may experience 
performance issues or even fail entirely, delaying the time to insights for the 
business.
+
+When using Apache Beam, developing a streaming pipeline can be made easy given 
the support for the sources and sinks needed by the workload. One can focus on 
the needed processing (transformations, enrichments or aggregations) and 
setting the right configurations on each case.
+
+However, it is important to identify the key performance bottlenecks and make 
sure that the pipeline has the resources it needs to handle the load 
efficiently. This may involve right-sizing the number of workers, understanding 
the settings needed for the source and sinks of the pipeline, optimizing the 
processing logic, and even the transport formats in use.
+
+This article aims to illustrate how to attack the problem of scaling and 
optimizing a streaming workload developed in Apache Beam and run on GCP Cloud 
Dataflow. The goal is to reach 1 million events per second, while also trying 
to minimize latency and resources in use for the execution. The workload will 
be grounded in Pub/Sub as the streaming source and BigQuery as the sink. We 
plan to describe the reasoning behind the configuration settings and code 
changes in place to help the workload achieve the desired scale and beyond.
+
+Finally, it is worth mentioning progression described in this article maps the 
evolution of a real-life workload, with lots of simplifications. After the 
initial business requirements for the pipeline were achieved, the focus was on 
optimizing the performance and reducing the resources needed for the execution.
+
+## Execution setup
+
+For this article we created a test suite which takes care of creating the 
necessary components for the pipelines to execute. The code can be found in 
this Github 
[repository](https://github.com/prodriguezdefino/apache-beam-streaming-tests), 
and the subsequent changes which are introduced on every run can be found in 
this 
[folder](https://github.com/prodriguezdefino/apache-beam-streaming-tests/tree/main/scaling-streaming-workload-blog)
 as scripts that can be executed to achieve similar results.
+
+All the execution scripts will also execute a Terraform based automation to 
create a PubSub topic and subscription, BigQuery dataset and table to run the 
workload. Also, it will launch 2 pipelines, one data generation pipeline that 
will be pushing events to the PubSub topic and the ingestion pipeline which is 
our main focus to understand the potential improvement points.
+
+In all cases the pipelines start with an empty PubSub topic and subscription, 
and an empty BigQuery table. The plan is to quickly reach the generation of 1 
million events per second and after a few minutes review how the ingestion 
pipeline scales with time. The data being auto generated is based on provided 
schemas or IDL (given the configuration) and the goal is to have messages 
ranging between 800 bytes and 2KB, adding up to ~1GB/s volume throughput. Also, 
the ingestion pipelines are using the same worker type configuration on all 
runs (`n2d-standard-4` GCE machines) and capping the max workers number to 
avoid very large fleets.
+
+As previously mentioned, all the executions run on GCP Cloud Dataflow, but all 
the configurations and format changes can be applied to the suite while 
executing on other supported Apache Beam runners (changes and recommendations 
are not runner specific).
+
+### Local Environment Requirements
+
+Before launching the startup scripts, some requisites for the local 
environment should be fulfilled:
+
+*   gcloud must be installed and configured with the correct permissions.
+*   terraform must be installed.
+*   JDK 17 or later must be installed.
+*   Maven 3.6 or later must be present.
+
+See the 
[requirements](https://github.com/prodriguezdefino/apache-beam-streaming-tests#requisites)
 section on the repository for the full details on them.
+
+Also, it is important to review the service quotas and resources available in 
the GCP project that will be in use, more specifically: PubSub regional 
capacity, BigQuery ingestion quota and GCE instances available in the selected 
region for the tests.
+
+### Workload Description
+
+Focusing on the ingestion pipeline, our workload is [very 
simple](https://github.com/prodriguezdefino/apache-beam-streaming-tests/blob/main/canonical-streaming-pipelines/src/main/java/com/google/cloud/pso/beam/pipelines/StreamingSourceToBigQuery.java#L55):
 read data in a specific format from PubSub (Apache Thrift in this case), deal 
with potential compression and batching settings (not enabled by default), 
execute a UDF (identity function by default), transform the input format to one 
of the formats supported by the BigQueryIO transform and then write the data to 
the configured table.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/0-pipeline.png"
+    alt="Example Workload">
+
+The pipeline we used for the tests is highly configurable, see the 
[options](https://github.com/prodriguezdefino/apache-beam-streaming-tests/blob/main/canonical-streaming-pipelines/src/main/java/com/google/cloud/pso/beam/pipelines/StreamingSourceToBigQuery.java#L39)
 in the file for more details on how to tweak the ingestion. On each of our 
steps no code changes are needed, the execution scripts will take care of the 
configurations needed.
+
+Although these tests are focused on reading data from PubSub, the ingestion 
pipeline is capable of reading data from a generic streaming source. The 
repository contains other 
[examples](https://github.com/prodriguezdefino/apache-beam-streaming-tests/tree/main/example-suite-scripts)
 on how to launch this same test suite but instead reading data from PubSubLite 
and Kafka as well (in all cases taking care of the automation needed to set up 
the streaming infrastructure).
+
+Finally as seen in the configuration 
[options](https://github.com/prodriguezdefino/apache-beam-ptransforms/blob/a0dd229081625c7b593512543614daf995a9f870/common/src/main/java/com/google/cloud/pso/beam/common/formats/options/TransportFormatOptions.java),
 the pipeline supports many transport format options for the input (Thrift, 
Avro and JSON). This suite focuses on Thrift because it is a common open source 
format and generates a format transformation need (the intent is to put some 
strain in the workload processing), but similar tests can be executed for Avro 
and JSON input data. The streaming data generator pipeline can generate random 
data for the [3 supported 
formats](https://github.com/prodriguezdefino/apache-beam-streaming-tests/tree/main/streaming-data-generator/src/main/java/com/google/cloud/pso/beam/generator/formats)
 by walking directly on the schema (Avro and JSON) or IDL (Thrift) provided for 
execution.
+
+## First Run : Default settings
+
+The default values for the execution implies writing the data to BigQuery 
using `STREAMING_INSERTS` mode for BigQueryIO, which correlates with the 
[tableData insertAll 
API](https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll)
 for BigQuery. This API supports data in JSON format. From the Apache Beam 
perspective by using `BigQueryIO.writeTableRows` method we can easily resolve 
the writes into BigQuery.
+
+For our ingestion pipeline it means that the Thrift format needs to be 
transformed into TableRow, and to do that we need to discover first how to 
translate the Thrift IDL into a BigQuery table schema. That can be achieved by 
translating the Thrift IDL into an Avro schema, and then using Beam utilities 
the table schema for BigQuery can be easily translated. Luckily this only needs 
to be done at bootstrap and that schema transformation is cached at the DoFn 
level.
+
+After setting up the data generation and ingestion pipelines, and after 
letting the pipelines execute for some minutes, we quickly see problems to 
sustain the desired throughput.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/1-default-ps.png"
+    alt="PubSub metrics">
+
+As seen in the previous image, the number of messages that are not being 
processed by the ingestion pipeline start to show as unacknowledged messages in 
PubSub metrics.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/1-default-throughput.png"
+    alt="Throughput">
+
+By reviewing the per stage performance metrics, we clearly see that the 
pipeline shows a saw-like shape normally associated with the throttling 
mechanisms the Dataflow runner uses when some of the stages are acting as 
bottlenecks for the throughput. Also, we clearly see that the Reshuffle step on 
the BigQueryIO write transform does not scale as expected.
+
+This happens because by default the 
`[BigQueryOptions](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java#L57)`
 uses 50 different keys to shuffle data to workers before the writes happen on 
BigQuery. To solve this problem we can include a configuration to our launch 
script that will enable the write operations to scale to a larger number of 
workers, improving the performance.
+
+
+## Second Run : Improve write bottleneck
+
+After increasing the number of streaming keys to a higher number, 512 keys in 
our case, we restarted the test suite and started to see how the PubSub metrics 
started to improve. After an initial ramp on the size of the backlog we see the 
curve to start to ease out.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/2-skeys-ps.png"
+    alt="PubSub metrics">
+
+This is good, but we should take a look at the throughput per stage numbers to 
understand if we are achieving the goal we set up for this exercise.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/2-skeys-throughput.png"
+    alt="Throughput">
+
+Although the performance has clearly improved, and the PubSub backlog no 
longer increases monotonically, we are still far from the goal of processing 1 
million events per second (1GB/s) for our ingestion pipeline. In fact, the 
throughput metrics jump all over the time, indicating that there are 
bottlenecks preventing the processing to further scale.
+
+## Third Run : Unleash autoscale
+
+Luckily for us, auto-scaling the writes is an option when writing into 
BigQuery. This simplifies the configuration, preventing us from guessing on the 
right number of shards. We switched the pipeline’s configuration and enabled 
this setting for the next launch 
[script](https://github.com/prodriguezdefino/apache-beam-streaming-tests/blob/main/scaling-streaming-workload-blog/3-ps2bq-si-tr-streamingautoshard.sh).
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-parallelism.png"
+    alt="Key Parallelism">
+
+Right off the bat, we can see the auto-sharding mechanism tweaks the number of 
keys very aggressively and in a dynamic way, this is good since different 
moments in time may have different scale needs (early backlog recoveries, 
spikes in the execution, etc.).
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-throughput-tr.png"
+    alt="Throughput">
+
+By inspecting the throughput performance per stage we can clearly see that as 
the number of keys gets increased the performance of the writes increases as 
well, in fact reaching very large numbers!
+
+Also once the initial backlog was consumed and the pipeline stabilized we can 
see that the performance numbers we planned as objectives are reached. The 
pipeline can sustain processing quite more than a million events per second 
from PubSub and processing several GB/s of BigQuery ingestion (yay!).
+
+But we can try to do better. There are several improvements we can introduce 
to the pipeline to make the execution more efficient, and in most cases those 
are simply configuration changes. We just need to know where to focus next.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-autoscale.png"
+    alt="Resources">
+
+As seen in the previous image, the number of workers needed to sustain this 
throughput is quite high still. The workload itself is not really CPU 
intensive, most of the cost is spent on transforming formats and on IO 
interactions (shuffles and the actual writes). So we can look closer at the 
transport formats first to understand what can be improved.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-tr-input.png"
+    alt="Thrift Input Size">
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-tr-output.png"
+    alt="TableRow Output Size">
+
+When looking at the input size, right before the identity UDF execution, the 
data format is binary Thrift which is a decently compact format even when no 
compression is used; but while comparing the PCollection approximated size with 
the TableRow format needed for BigQuery ingestion we can see a clear size 
increase. This is something we can act on, by changing the BigQuery write API 
in use.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/3-autoshard-tr-overhead.png"
+    alt="Translation Overhead">
+
+If we inspect the StoreInBigQuery transform, we can see that the large 
majority of the wall time is spent on the actual writes. Also, we note that the 
wall time spent converting data to the destination format (TableRows) compared 
with how much is spent in the actual writes is quite large (13 times bigger for 
the writes). This can be improved by switching the pipeline write mode.
+
+## Fourth Run : In with the new (using StorageWrites API)
+
+Enabling StorageWrite API for this pipeline is simple, we just need to set the 
write mode as `STORAGE_WRITE_API` and define a write triggering frequency. We 
will be writing data at most every 10 seconds for this test. The write 
triggering frequency controls how long the per-stream data will be accumulated, 
a higher number will define a larger output to be written after the stream 
assignment but also impose a larger end to end latency for every element read 
from PubSub. Similar to the `STREAMING_WRITES` configuration, BigQueryIO can 
handle auto-sharding for the writes, this showed already to be the best setting 
for performance.
+
+After both pipelines, data generator and the ingestion one, become stable we 
can quickly and clearly spot the performance benefits seen when using 
StorageWrites API in BigQueryIO. The first thing is about the wall time rate 
between the format transformation and write operation. After enabling the new 
implementation this rate becomes much smaller since the wall time spent on 
writes is only ~34% larger than the format transformation.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/4-format-transformation.png"
+    alt="Translation Overhead">
+
+We also see that the pipeline throughput is quite smooth after stabilization. 
This enables the runner to quickly and steadily downscale the pipeline 
resources needed to sustain the desired throughput.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/4-throughput.png"
+    alt="Throughput">
+
+Also, taking a look at the resource scale needed to process the data we see 
another dramatical improvement, the streaming inserts based pipeline needed 
more than 80 workers while being stable to sustain the throughput, the storage 
writes pipeline only needs 49 (a sizable 40% improvement).
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/4-ingestion-scale.png"
+    alt="Resources">
+
+We can use the reference of the data generation pipeline, which only needs to 
randomly generate data and write the events to PubSub. It executes steadily 
with an average of 40 workers; the improvements on the ingestion pipeline using 
the right configuration for the workload makes it closer to those resources 
needed for the generation.
+
+Similarly to the streaming inserts based pipeline, to write the data into 
BigQuery a format translation needs to be executed, from Thrift to TableRow in 
the former and Thrift to protobuf on the latter. In fact, since we are using 
the `BigQueryIO.writeTableRows` method we are adding another hop in the format 
translation. And also, as seen previously, the TableRow format implies
+
+Increasing the size of the PCollection being processed, this is an improvement 
area we can delve further into.
+
+## Fifth Run : A better write format
+
+The BigQueryIO transform, when using `STORAGE_WRITE_API` exposes a method that 
can be used to write Beam Row type directly into BigQuery. This is very useful 
for a large number of pipelines given to the flexibility the Row type provides 
for interoperability and schema management. Also, it is quite efficient for 
shuffling and compared with TableRow is more dense, so we can expect smaller 
PCollection sizes for our pipeline.
+
+For the next run, we will be decreasing the triggering frequency when writing 
to BigQuery since our data volume is not small, and given that we will be using 
a different format a slightly different code will be running (the test pipeline 
script will get configured with the flag `--formatToStore=BEAM_ROW` for this 
change).
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/5-input-size.png"
+    alt="Thrift input size">
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/5-output-size.png"
+    alt="Row output size">
+
+As expected we can see that the PCollection size that gets written into 
BigQuery is considerably smaller than the one on previous executions, in fact 
for this particular execution the Beam Row format implies a smaller size 
compared with Thrift format. This could be an important factor given that a 
larger PCollection conformed by bigger per element sizes can put a non trivial 
memory pressure in smaller worker configurations, reducing the overall 
throughput.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/5-format-trasformation.png"
+    alt="Translation overhead">
+
+Also, the wall clock rate for the format transformation and the actual 
BigQuery writes maintain a very similar rate. Handling the Beam Row format does 
not impose a performance penalty in the format translation and subsequent 
writes. This is confirmed also by the number of workers in use by the pipeline 
once the throughput becomes stable, slightly smaller than the previous run but 
clearly in the same ballpark.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/5-ingestion-scale.png"
+    alt="Resources">
+
+We are in a much better position than when we started, but given our test 
pipeline input format we can improve things a little bit more.
+
+## Sixth Run : Further reduce the format translation effort
+
+There is another supported format for the input PCollection in the BigQueryIO 
transform that can be very advantageous for our input format, the method 
`writeGenericRecords` enables the transform to transform Avro GenericRecords 
directly into protobuf before the write operation. Apache Thrift can be 
transformed into Avro GenericRecords very efficiently so we can make another 
test run configuring our test ingestion pipeline by setting the option 
`--formatToStore=AVRO_GENERIC_RECORD` on our execution script.
+
+This time, the difference between format translation and writes increases 
significantly, given performance improvement we were expecting, we see now that 
the translation to Avro GenericRecords is only a 20% of the write effort spent 
on writing those records into BigQuery. Given that all the test pipelines had 
similar runtimes and the wall clock seen in the WriteIntoBigQuery stage is also 
aligned with other StorageWrite related runs, it is clear that for this 
particular workload using this format is the right call.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/6-format-transformation.png"
+    alt="Translation overhead">
+
+Also when checking on resource utilization we see further gains, considering 
we need less CPU time to execute the format translations for our workload while 
achieving the desired throughput.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/6-ingestion-scale.png"
+    alt="Resources">
+
+This pipeline compares better with the previous run, running steadily on 42 
workers once throughput has been stabilized. Given the worker configuration 
used (`nd2-standard-4), and the volume throughput of the workload process (~1 
GB/s), we are achieving ~6 MB/s throughput per CPU core which is quite 
impressive for a streaming pipeline with exactly once semantics.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/6-latencies.png"
+    alt="Latencies">
+
+Finally the latency seen at this scale deserves a callout, achieving 
sub-second E2E latencies during sustained periods of time (when adding up all 
the stages executed in the main path of the pipeline).
+
+Given the workload requirements and the implemented pipeline code this is the 
best performance we can extract without further tuning the runner’s specific 
settings (which are outside of the scope for this article).
+
+## Seventh Run : Lets just relax (at least some constraints)
+
+When using the STORAGE\_WRITE\_API setting for BigQueryIO we are enforcing 
exactly once semantics on the writes. This is great for a lot of use cases that 
need strong consistency on the data that gets processed, but it imposes a 
performance and cost penalty.
+
+From a high level perspective, writes into BigQuery are made in batches which 
are released given the current sharding and the triggering frequency. If writes 
fail during the execution of the particular bundle it will be retried, and a 
bundle of data will be committed into BigQuery only when all the data in that 
particular bundle was correctly appended to a stream. This implementation needs 
to shuffle the full volume of data to create the batches that will be written, 
and also the information of the finished batches for later commit (although 
this last piece is very small compared with the first).
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/7-previous-data-input.png"
+    alt="Read data size">
+
+When looking at the previous pipeline execution, we can see that the total 
data being processed from Streaming Engine perspective for the pipeline is 
quite larger than the data being read from PubSub. In these images we see 7 TB 
of data being read from PubSub, and the processing of data for the whole 
execution of the pipeline implies moving 25 TB of data to and from Streaming 
Engine to achieve the level of consistency desired.
+
+<img class="center-block"
+    src="/images/blog/scaling-streaming-workload/7-previous-shuffle-total.png"
+    alt="Streamed data size">
+
+In the cases where data consistency is not a hard requirement for the 
ingestion, we can relax the BigQueryIO write mode to use an at-least-once 
semantic. By doing this, the implementation will avoid shuffling and grouping 
data for the writes, but this decision may cause a small number of repeated 
rows being written into the destination table (this could happen in case of 
append errors, infrequent worker restarts, and other even less frequent errors).

Review Comment:
   ```suggestion
   When data consistency is not a hard requirement for ingestion, you can use 
at-least-once semantics with `BigQueryIO` write mode. This implementation 
avoids shuffling and grouping data for the writes. However, this change might 
cause a small number of repeated rows to be written into the destination table. 
This can happen with append errors, infrequent worker restarts, and other even 
less frequent errors.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to