damondouglas commented on code in PR #29305: URL: https://github.com/apache/beam/pull/29305#discussion_r1385276000
########## website/www/site/content/en/blog/apache-beam-flink-and-kubernetes.md: ########## @@ -0,0 +1,355 @@ +--- +title: "How to Build a Scalable Self-Managed Streaming Infrastructure with Beam and Flink" +date: 2023-11-03 09:00:00 -0400 +categories: + - blog +authors: + - talat +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +In this blog series, [Talat Uyarer (Senior Principal Engineer)](https://www.linkedin.com/in/talatuyarer/), [Rishabh Kedia (Principal Engineer)](https://www.linkedin.com/in/rishabhkedia/), and [David He (Engineering Director)](https://www.linkedin.com/in/davidqhe/) will describe how we built a self managed streaming platform and our learnings by using Apache Beam and Flink. In part I, we describe why and how we built a large-scale self managed streaming infrastructure and services based on Flink, by migrating from a cloud managed streaming service, and the learnings for operational scalability and observability, performance, and cost effectiveness. We summarize useful techniques, and experience in our journey. + +<!--more--> + +# How to Build a Scalable Self Managed Streaming Infrastructure with Flink - Part 1 + +# Introduction + +Palo Alto Networks (PANW) is a leader in cybersecurity, providing products, services and solutions to our customers. Data is the center of our products and services. We stream and store exabytes of data in our data lake, with near real-time ingestion, data transformation, data insertion to data store, and forwarding data to our internal ML-based systems and external SIEM’s. We support multi-tenancy in each component so that we can isolate tenants and provide optimal performance and SLA. Streaming processing plays a critical role in the pipelines. + +In this blog series, [Talat Uyarer (Senior Principal Engineer)](https://www.linkedin.com/in/talatuyarer/), [Rishabh Kedia (Principal Engineer)](https://www.linkedin.com/in/rishabhkedia/), and [David He (Engineering Director)](https://www.linkedin.com/in/davidqhe/) will describe how we built a self managed streaming platform and our learnings. In part I, we describe why and how we built a large-scale self managed streaming infrastructure and services based on Flink, by migrating from a cloud managed streaming service, and the learnings for operational scalability and observability, performance, and cost effectiveness. We summarize useful techniques, and experience in our journey. + +In Part 2, we will give a deeper description of our core building blocks of streaming infrastructure, such as autoscaler. We will also give more details on our heavy customization, so that we can build a high performance and large scale streaming system, along with the learnings of solving challenging problems. + + +# Why Self Managed Streaming Infrastructure Matters + +In the past several years, we have built a large scale data platform on GCP. We used GCP Dataflow as a managed streaming service including the streaming engine running our application Apache Beam codes, and observability and tools such as Cloud Logging and Cloud Monitoring, more details can be referenced to [1]. The system can handle 15 millions of events per second, 1 trillion events daily, at 4 petabytes of data volume daily. We run ~30,000 Dataflow jobs, each job can have 1 or hundreds of workers depending on customer’s event throughputs. We support various applications using different endpoints: BigQuery data store, HTTPs based external SIEMs or internal endpoints, Syslog based SIEMs, GCS endpoints. Our customers and products rely on this data platform to handle cybersecurity postures and reactions. Our streaming infrastructure is highly flexible to add/update/delete a use case through a streaming job subscription. As an example of the use cases, a customer wants to ingest log events from a Firewall device into the data lake buffered in Kafka topics, and a streaming job is subscribed to extract/filter, transform data format, and do streaming insert to our BigQuery data warehouse endpoint in real-time, so that the customer through our visualization/dashboard products can view traffics or threads captured by this Firewall. This diagram illustrates the event producer and use case subscription workflow, and key components of the streaming platform: + + + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes/image1.png" +alt="Streaming service design"> + + +This managed Dataflow based streaming infrastructure runs fine, but with some caveats: + + + +1. Cost is high as it is a managed service. For the same resources used in a Dataflow application, such as vCPU and memory, the cost is much more expensive than using an open source streaming engine such as Flink running the same Beam application code. +2. Not easy to extend features, such as autoscaling based on different applications, endpoints, or different parameters within one same application, so that we can achieve our goals of ladency of SLA. +3. Runs only on GCP. + +Another important factor we want to use a self managed service is the uniqueness of PANW’s streaming use cases. We support multi-tenancy. A tenant (a customer) can ingest data at a very high rate (>100k requests per seconds), or very low rate (< 100 requests per second). A Dataflow job runs on VM’s instead of Kubernate, requiring a minimal one vCPU core. With a small tenant, this is just wasting resources. Our streaming infrastructure supports thousands of jobs, the CPU utilization will be more efficient if we do not have to use one core for a job. It is natural for us to use a streaming engine running on Kubernetes, so that we can allocate minimal resources for a small tenant, for example, using a GKE POD with ½ or less vCPU core. + + +# Choosing Apache Flink and Kubernetes + +In an effort to handle the above problems, we evaluated various streaming frameworks, including Apache Samza, Apache Flink, and Apache Spark, against GCP Dataflow to find the most efficient solution. + +**Performance:** + + + +* One standout factor was Apache Flink’s native Kubernetes support. Unlike Samza, which lacked native Kubernetes support and required Apache Zookeeper for coordination, Flink seamlessly integrated with Kubernetes. This eliminated unnecessary complexities. Performance-wise, both Samza and Flink were close competitors. +* Apache Spark, while popular, proved to be significantly slower in our tests. A presentation at the Beam Summit revealed that Apache Beam’s Spark Runner was approximately 10 times slower than Native Apache Spark [3]. We could not afford such a drastic performance hit and rewriting our entire Beam codebase with native Spark was not a viable option, especially given the extensive codebase we had built over the past four years with Apache Beam. + +**Community:** + + + +* The robustness of community support played a pivotal role in our decision-making. GCP Dataflow had provided excellent support, but we needed assurance in our choice of an open-source framework. Apache Flink’s vibrant community and active contributions from multiple companies offered a level of confidence that was unmatched. This collaborative environment meant that bug identification and fixes were ongoing processes. In fact, in our journey, we have patched our system using many Flink fixes from the community such as fixing the gcs file reading exceptions by merging Flink 1.15 open source fix [FLINK-26063](https://issues.apache.org/jira/browse/FLINK-26063?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17504555#comment-17504555) (we are using 1.13), and worker restarting issue for stateful jobs from [FLINK-31963](https://issues.apache.org/jira/browse/FLINK-31963). We also found and fixed some bugs on the open source and contributed back t o the community during our journey, such as [FLINK-32700](https://issues.apache.org/jira/browse/FLINK-32700) for Flink Kubernetes Operator. We created a new GKE Auth support for Kubernetes client and merged to github at [4]. + +**Integration:** + + + +* Additionally, the seamless integration of Apache Flink with Kubernetes provided us with a flexible and scalable platform for orchestration. The synergy between Apache Flink and Kubernetes not only optimized our data processing workflows but also future-proofed our system. + + +# Architecture and Deployment Workflow + +In the realm of real-time data processing and analytics, Apache Flink stands tall as a powerful and versatile framework. When combined with Kubernetes, the industry-standard container orchestration system, Flink applications can scale horizontally and enjoy robust management capabilities. We explore a cutting-edge design where Apache Flink and Kubernetes synergize seamlessly, thanks to the Apache Flink Kubernetes Operator. + +At its core, the Flink Kubernetes Operator serves as a control plane, mirroring the knowledge and actions of a human operator managing Flink deployments. Unlike traditional methods, the Operator automates critical activities, from starting and stopping applications to handling upgrades and errors. Its versatile feature set includes fully-automated job lifecycle management, support for different Flink versions, and multiple deployment modes like application clusters and session jobs. Moreover, the Operator's operational prowess extends to metrics, logging, and even dynamic scaling via the Job Autoscaler. + + +## Building a Seamless Deployment Workflow + +Imagine a robust system where Flink jobs are deployed effortlessly, monitored diligently, and managed proactively. This is precisely what our team achieved by integrating Apache Flink, Apache Flink Kubernetes Operator, and Kubernetes. Central to this setup is our custom-built Apache Flink Kubernetes Operator Client Library. This library acts as a bridge, enabling atomic operations such as starting, stopping, updating, and canceling Flink jobs. + + + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes/stream-service-changes.png" +alt="Streaming service changes"> + + + +## The Deployment Process + +In our code, the client provides Apache Beam Pipeline Options, which include essential information such as the Kubernetes cluster's API endpoint, authentication details, GCP/S3 temporary location for uploading the JAR file, and worker type specifications. The Kubernetes Operator Library utilizes this information to orchestrate a seamless deployment process, We broke down steps below, note most of the core steps are automated in our code base: + +**Step1: ** + + + +* Client wants to start a job for a customer and a specific application. + +**Step 2:** + + + +* **Generating a Unique Job ID**: The library generates a unique job ID, which is set as a Kubernetes label. This identifier helps in tracking and managing the deployed Flink job. +* **Configuration and Code Upload**: The library takes care of uploading all necessary configurations and user code to a designated location on Google Cloud Storage (GCS) or Amazon S3. This ensures that the Flink application's resources are readily available for deployment. +* **YAML Payload Generation**: Once the upload process is complete, the library constructs a YAML payload. This payload contains crucial deployment information, including resource settings determined based on the specified worker type. + +In terms of worker VM instance type naming convention I would like to mention a little bit. We borrow a similar naming convention that Google Cloud has today. The naming convention n1-standard-1 refers to a specific predefined VM machine type. Let’s break down what each component of the name means: + + + +* **n1**: This indicates cpu type of the instance. In this case, it refers to the intel based on instances in the N1 series. Google Cloud Platform has multiple generations of instances with varying hardware and performance characteristics. +* **standard**: This signifies the machine type family. "Standard" machine types offer a balanced ratio of 1 virtual CPU and 4 GB of memory for Task Manager and 0.5 vCPU and 2 GB memory for Job Manager +* **1**: This represents the number of virtual CPUs (vCPUs) available in the instance. In the case of n1-standard-1, it means the instance has 1 virtual CPU. + +**Step 3:** + + + +* **Calling Kubernetes API with Fabric8**: To initiate the deployment, the library interacts with the Kubernetes API using Fabric8. It's worth noting that Fabric8 initially lacked support for authentication in Google Kubernetes Engine (GKE) or Amazon Elastic Kubernetes Service (EKS). To address this limitation, our team implemented the necessary authentication support, which can be found in our Merge Request at GitHub PR [4]. + +**Step 4:** + + + +* **Flink Operator Deployment**: Upon receiving the YAML payload, the Flink Operator takes charge of deploying the various components of the Flink job. This includes provisioning resources and managing the deployment of the Flink Job Manager, Task Manager, and Job Service. + +**Step 5:** + + + +* **Job Submission and Execution**: Once the Flink Job Manager is up and running, it fetches the JAR file and configurations from the designated GCS or S3 location. With all necessary resources in place, it submits the Flink job to the Standalone Flink Cluster for execution. + +**Step 6** + + + +* **Continuous Monitoring**: Post-deployment, our operator continuously monitors the status of the running Flink job. This real-time feedback loop enables us to promptly address any issues that may arise, ensuring the overall health and optimal performance of our Flink applications. + +In summary, our deployment process leverages Apache Beam Pipeline Options, integrates seamlessly with Kubernetes and the Flink Operator, and employs custom logic to handle configuration uploads and authentication. This end-to-end workflow ensures a reliable and efficient deployment of Flink applications in Kubernetes clusters while maintaining vigilant monitoring for smooth operation. We created a sequence diagram to follow the steps below. + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes/job-start-activity-diagram.png" +alt="Job Start Activity Diagram"> + + + +# Developing an Autoscaler + +Autoscaler is the key of the self-managed streaming service. There are not enough resources available on the internet for us to learn to build our own autoscaler which makes this problem even more tricky. + +Autoscaler needs to scale up the number of task managers to drain the lag and keep up with the throughput. It will also scale down the minimum number of resources required to process the incoming traffic to save cost. We need to do this frequently while keeping the processing disruption to minimum. + +We have tuned the autoscaler intensively, so that we can meet the SLA for latency, and make trade off for cost. We also make the autoscaler application specific so that we can meet specific needs for certain applications. Every decision has a hidden cost. We dive into the great details of autoscaler in Part 2 of this blog. + + +# Creating a Client Library for Steaming Job Development + +To deploy the job via Flink Kubernetes Operator, one has to be knowledgeable enough in the workings of kubernetes. The steps to create a single Flink job is: + + + +1. Define a yaml file with proper specifications. This is an example: + + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes/flink-deployment-yaml.png" +alt="Flink Deployment Yaml"> Review Comment: Thank you for doing this! Is there a way that you might provide the flink-deployment-yaml as a code block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
