This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository
https://gitbox.apache.org/repos/asf/incubator-streampark-website.git
The following commit(s) were added to refs/heads/dev by this push:
new ffc877f1 [Improve] Add blog about streampark-usercase-joymaker (#391)
ffc877f1 is described below
commit ffc877f1d7903a474bfb6deba1109e25e6d913ff
Author: Kick156 <[email protected]>
AuthorDate: Fri Jul 12 17:33:15 2024 +0800
[Improve] Add blog about streampark-usercase-joymaker (#391)
[Improve] Add blog about streampark-usercase-joymaker (#391)
---
blog/12-streampark-usercase-joymaker.md | 1124 ++++++++++++++++++++
.../12-streampark-usercase-joymaker.md | 1121 +++++++++++++++++++
static/blog/joymaker/access_credential.png | Bin 0 -> 213942 bytes
static/blog/joymaker/alarm_configuration.png | Bin 0 -> 145651 bytes
.../joymaker/application_first_configuration.png | Bin 0 -> 137400 bytes
.../joymaker/application_second_configuration.png | Bin 0 -> 102577 bytes
.../joymaker/application_third_configuration.png | Bin 0 -> 57719 bytes
static/blog/joymaker/cloud_environment.png | Bin 0 -> 133621 bytes
static/blog/joymaker/clusters.png | Bin 0 -> 119312 bytes
static/blog/joymaker/cover.png | Bin 0 -> 251651 bytes
static/blog/joymaker/data_reporting.png | Bin 0 -> 154866 bytes
static/blog/joymaker/dynamic_parameter.png | Bin 0 -> 228888 bytes
static/blog/joymaker/first_configuration.png | Bin 0 -> 112445 bytes
static/blog/joymaker/flink_cluster.png | Bin 0 -> 129258 bytes
static/blog/joymaker/flink_home.png | Bin 0 -> 101016 bytes
static/blog/joymaker/incoming_parameter.png | Bin 0 -> 115661 bytes
static/blog/joymaker/instance_management.png | Bin 0 -> 153197 bytes
static/blog/joymaker/jar.png | Bin 0 -> 233382 bytes
static/blog/joymaker/jar_configuration.png | Bin 0 -> 93938 bytes
static/blog/joymaker/kubernetes_configuration.png | Bin 0 -> 103744 bytes
static/blog/joymaker/new_cover.png | Bin 0 -> 631452 bytes
static/blog/joymaker/running.png | Bin 0 -> 140531 bytes
static/blog/joymaker/second_configuration.png | Bin 0 -> 115569 bytes
static/blog/joymaker/session_cluster.png | Bin 0 -> 43437 bytes
static/blog/joymaker/sql_configuration.png | Bin 0 -> 192036 bytes
static/blog/joymaker/task_construction.png | Bin 0 -> 169912 bytes
static/blog/joymaker/third_configuration.png | Bin 0 -> 143633 bytes
static/blog/joymaker/variable_management.png | Bin 0 -> 90993 bytes
28 files changed, 2245 insertions(+)
diff --git a/blog/12-streampark-usercase-joymaker.md
b/blog/12-streampark-usercase-joymaker.md
new file mode 100644
index 00000000..fd383d40
--- /dev/null
+++ b/blog/12-streampark-usercase-joymaker.md
@@ -0,0 +1,1124 @@
+---
+slug: streampark-usercase-joymaker
+title: Apache StreamPark™ Cloud Native Platform Practice at Joymaker
+tags: [StreamPark, Production Practice]
+---
+
+
+
+
+**Introduction**: This article mainly introduces Joymaker's application of big
data technology architecture in practice, and explains why Joymaker chose
"Kubernetes + StreamPark" to continuously optimise and enhance the existing
architecture. It not only systematically describes how to deploy and apply
these key technologies in the actual environment, but also explains the
practical use of StreamPark in depth, emphasising the perfect integration of
theory and practice. I believe that by [...]
+
+Github: https://github.com/apache/streampark
+
+Welcome to follow, Star, Fork and participate in contributing!
+
+Contributed by | Joymaker
+
+Author | Du Yao
+
+Article Organiser | Yang Linwei
+
+Proofreader | Pan Yuepeng
+
+<!-- truncate -->
+
+Joymaker, is a global game development and publishing company; the company's
products focus on two categories: MMORPG and MMOACT; Joy's product philosophy:
"It is better to make a product that 100 people scream than a product that
10,000 people applaud"; Joy's products are always of high quality. RO Legend of
Wonderland: Love is Like the First Time" was launched in Southeast Asia, and
reached the milestone of 10 million reservations during the booking period,
achieved the first place in [...]
+
+## **1. Operational Context and Challenges**
+
+The big data base currently used by Joymaker Entertainment is Tencent Cloud's
EMR, and the number warehouse architecture is mainly divided into the following
categories:
+
+- **Offline Warehouse Architecture**: Hadoop + Hive + Dolphinscheduler + Trino
+
+- **Real-time Warehouse Architecture**: Kafka + Flink + StreamPark + Doris
+
+- **Streaming Warehouse Architecture (in planning)**: Paimon + Flink +
StreamPark + Doris
+
+Among them, the streaming warehouse architecture is one of our key plans for
2024, with the goal of improving big data analytics and the value output of
various data products based on the elasticity feature of Kubernetes. However,
due to earlier architectural reasons (mainly the mixed deployment of Apache
Doris and EMR core nodes), it made us encounter many problems, such as:
+
+- **Resource contention**: Under the YARN resource scheduling platform, Apache
Doris, Apache Flink real-time tasks, Hive's offline tasks, and Spark's SQL
tasks often contend for CPU and memory resources, resulting in Doris query
stability degradation and frequent query errors.
+
+- **Insufficient resources**: YARN's machine resource usage rate is between
70% and 80% most of the time, especially in the scenario of a large number of
complements, the cluster resources are often insufficient.
+
+- **Manual Intervention**: Real-time Flink has more and more tasks, the data
of each game has obvious peaks and valleys changes in a day, especially the
hangout activities carried out on weekends, the data volume doubles, and even
increases to more than 10 billion entries per day, and this requires frequent
manual adjustment of parallelism to adapt to the computational pressure in
order to ensure the data consumption capacity.
+
+## **2. Solution Selection**
+
+In order to solve the above problems, we are looking for components that can
perform "**resource elasticity scaling**" and "**task management**", and
considering the trend of big data cloud-native, it is imminent to adopt
cloud-native architecture, which can reduce the burden of big data operation
and maintenance and provide better and more stable capabilities such as high
availability, load balancing, grey update, etc. under Kubernetes architecture.
The Kubernetes architecture reduces t [...]
+
+At the same time, simplifying the development and operation and maintenance of
Apache Flink real-time tasks is also a problem that has to be faced. Since
mid-2022, we have focused on researching Flink-related open-source and
commercialisation-related projects, and in the process of technology selection,
the company has conducted in-depth investigations mainly from several key
dimensions, including:**Kubernetes containerised deployment, lightweightness,
multi-version support for Flink, an [...]
+
+
+
+In the process of using StreamPark, we have carried out in-depth application
and exploration of it, and have successfully achieved 100+ Flink tasks easily
deployed to Kubernetes, which can be very convenient for job management.
+
+Next, will detail how we use StreamPark to achieve Flink on kubernetes in the
production environment of the practice process, although the content is longer,
but full of dry goods, the content is mainly divided into the following parts:
+
+- **Environmental Preparation**: This part introduces the installation and
configuration process of Kubernetes, StreamPark, Maven, Docker, kubectl tools
and Flink.
+
+- **StreamPark Use Chapter**: This part introduces how to use StreamPark to
start session clusters, as well as the process of deploying SQL and JAR jobs.
+
+- **Flink Tasks Application Scenarios**: In this section, we will share the
practical scenarios of Joyful Hub's actual use of Flink tasks.
+
+
+## **3. Practical - Environment Preparation**
+
+Next we will start from the installation of a detailed introduction to the
StreamPark landing practice, on the Tencent cloud products, some of the steps,
will be briefly introduced at the end of the article, here mainly share the
core steps.
+
+### **3.1. Preparing the Kubernetes Environment**
+
+Joymaker's Kubernetes environment uses the Tencent Cloud container service
product, in which the Kubernetes namespace is mainly divided as follows:
+
+- **Apache Doris**: Doris cluster + 3 FE (standard nodes) + CN (1 supernode).
+
+- **Apache Flink**: The Flink cluster consists of a supernode, which is the
equivalent of an oversized resource machine that is very easy to use and can
quickly and elastically scale to tens of thousands of pods.
+
+- **BigData**: contains various Big Data components.
+
+In addition, we have two native nodes that are mainly used to deploy
StreamPark, Dolphinscheduler, and Prometheus + Grafana. screenshots related to
the cluster information are shown below:
+
+
+
+### **3.2. Mounting StreamPark**
+
+We are using the latest stable version of StreamPark 2.1.4, which fixes some
bugs and adds some new features, such as support for Flink 1.18, Jar task
support via pom or uploading dependencies and so on.
+
+Although StreamPark supports Docker and Kubernetes deployment, for the sake of
saving time and the need to containerise all big data components in a top-level
design, we currently chose **to quickly build StreamPark on a CVM in the
cloud**, and the following is the installation and deployment script for
StreamPark:
+
+```shell
+#Use root user to enter the installation directory (java environment is
required first) and k8s environment on the same network segment!!!! You can
save yourself the trouble.
+cd /data
+
+#Download the binary installer.
+wget
https://dlcdn.apache.org/incubator/streampark/2.1.4/apache-streampark_2.12-2.1.4-incubating-bin.tar.gz
+
+#Compare it with the values provided to ensure the package is complete.
+sha512sum apache-streampark_2.12-2.1.2-incubating-bin.tar.gz #unzip it
+tar -xzvf apache-streampark_2.12-2.1.2-incubating-bin.tar.gz #rename it
+mv apache-streampark_2.12-2.1.2-incubating-bin streampark_2.12-2.1.2 #create
softlinks
+ln -s streampark_2.12-2.1.2 streampark
+
+# Modify the database configuration
+vim /data/streampark/conf/application-mysql.ymlspring:
+ datasource:
+ username: root
+ password: xxxxx
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ url: jdbc:mysql://10.16.x.x:3306/streampark_k8s?...
+
+# Modify streampark kernel configuration You can modify the port (because k8s
nodeport port range 30000-32767) so you can configure this range to save the
port for external development and then configure or account password We use cos
for storage so we do not configure hdfs.
+vim /data/streampark/conf/application.yml workspace:
+ local: /data/streampark/streampark_workspace #create the required directory
+
+
+mkdir -p vim /data/streampark/conf/application.yml
+
+#Initialise the database tables for streampark. Requires a mysql client in the
environment. After execution, you can go to mysql and see if the tables exist.
+mysql -u root -p xxxxx < /data/streampark/script/schema/mysql-schema.sql
+
+#Add the dependent mysql-connector package because it's not under the
streampark lib. Under /data/streampark/lib run.
+wget
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
+
+#Start and log in. You can verify the java environment first. java -version to
make sure it's ok. Execute under /data/streampark/lib.
+ ./startup.sh
+```
+
+### **3.3. Mounting Maven**
+
+To configure Flink tasks on StreamPark, you can download the dependencies via
maven, here is the Maven installation configuration:
+
+```shell
+#Download maven as root user.
+cd /data && wget
https://dlcdn.apache.org/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz
+
+#Verify that the installation package is complete.
+sha512sum apache-maven-3.9.6-bin.tar.gz
+
+#Unzip Softlink.
+tar -xzvf apache-maven-3.9.6-bin.tar.gz
+ln -s apache-maven-3.9.6 maven
+
+#Set up a maven mirror to speed up downloads from the AliCloud repository.
+vim /data/maven/conf/settings.xml
+
+<mirror>
+ <id>alimaven</id>
+ <mirrorOf>central</mirrorOf>
+ <name>aliyun maven</name>
+ <url>https://maven.aliyun.com/repository/central</url>
+</mirror>
+
+<mirror>
+ <id>alimaven</id>
+ <name>aliyun maven</name>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <mirrorOf>central</mirrorOf>
+ </mirror>
+
+#Environment Variables Finally add.
+vim /etc/profile
+export MAVEN_HOME=/data/maven
+PATH=$MAVEN_HOME/bin:$PATH
+
+#Validate.
+source /etc/profile && mvn -version
+```
+
+### **3.4 Mounting Docker**
+
+Because StreamPark uses Kubernetes, you can use Docker to build and upload
images to your company's popular Harbor image service, here's the Docker
install script
+
+```shell
+#Uninstall the old version If available
+sudo yum remove docker \
+ docker-client \
+ docker-client-latest \
+ docker-common \
+ docker-latest \
+ docker-latest-logrotate \
+ docker-logrotate \
+ docker-engine
+
+#Download the repository
+wget -O /etc/yum.repos.d/docker-ce.repo
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo #AliCloud
docker-ce image repository
+
+#Update the package index
+yum makecache
+
+#Download and install
+yum install docker-ce docker-ce-cli containerd.io -y # docker-ce community
edition ee enterprise edition
+
+#Start View
+systemctl start docker && docker version
+
+#Boot
+systemctl enable docker
+```
+
+### **3.5. Kubectl Tool Installation**
+
+In order to interact with the Kubernetes cluster, view configurations, access
logs, etc., we also installed the kubectl tool. Before installing kubectl, you
first need to get the access credentials for the Kubernetes cluster from here,
as shown in the following figure:
+
+
+
+Next, download a copy of the configuration to the root user directory, specify
the directory and rename it:
+
+```shell
+# 1)Create the default hidden directory.
+mkdir -p /root/.kube
+
+# 2)Upload k8s intranet access credentials.
+rz cls-dtyxxxxl-config
+
+# 3)Modify the default name of the credentials.
+mv cls-dtyxxxxl-config config
+
+# 4)Place the credentials in the specified location.
+mv config /root/.kube
+```
+
+You can install the kubetcl client as follows (you can also install
**kubectx** to make it easier to access multiple kubernetes clusters and switch
between fixed namespaces):
+
+```shell
+# 1)Download the latest version.
+curl -LO "https://dl.k8s.io/release/$(curl -L -s
https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
+
+# 2)Add execute permissions.
+chmod +x kubectl
+
+# 3)Move to a common tool command location.
+mv ./kubectl /usr/local/bin/kubectl
+
+# 4)This can be configured in /etc/profile or in .bashrc in the root user's
home directory.
+vim /root/.bashrc
+export JAVA_HOME=/usr/local/jdk1.8.0_391
+PATH=/usr/local/bin:$JAVA_HOME/bin:$PATH
+
+# 5)Verify the client and cluster keys.
+kubectl cluster-info
+```
+
+Finally, you also need to create a Flink-specific account credentials (which
will be used later):
+
+```shell
+# 1)Create namespace.
+kubectl create namespace flink
+
+# 2)Create account for flink to access k8s Remember to bring the namespace!
+kubectl create serviceaccount flink-service-account -n flink
+
+# 3)Bind some permissions for container operations to this account Remember
with namespaces!!!
+kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit
--serviceaccount=flink:flink-service-account -n flink
+```
+
+### **3.6. Installation Configuration Flink**
+
+We chose the Flink1.17-scala2.12 Java11 image for our Kubernetes environment
because some of the dependencies of Flink 1.18 were not easy to find.
+
+Download and extract the flink-1.17.2-bin-scala_2.12.tgz installer with the
following script:
+
+```shell
+cd /data && wget
https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
+
+#Integrity checksum.
+sha512sum flink-1.17.2-bin-scala_2.12.tgz
+
+#Unzip
+tar -xzvf flink-1.17.2-bin-scala_2.12.tgz
+
+#Rename
+mv flink-1.17.2-bin-scala_2.12 flink-1.17.2
+
+#Soft connect.
+ln -s flink-1.17.2 flink
+
+#Store old binary packages.
+mv flink-1.17.2-bin-scala_2.12.tgz /data/softbag
+```
+
+At this point, we have set up the environment, the following continue to step
by step detailed explanation of the use of StreamPark.
+
+## **4. Practical - StreamPark in Action**
+
+Readers who want to get a quick overview of StreamPark running jobs to
Kubernetes can watch the video below:
+
+// Video link (Flink On Kubernetes Application Hands-On Tutorial)
+
+// Video link (Flink On Kubernetes Session Hands-On Tutorial)
+
+### **4.1. Configure Flink Home**
+
+After logging in to StreamPark, switch to the Flink Home menu and configure
the directory where we extracted the Flink installation package earlier, as
shown in the screenshot below:
+
+
+
+### **4.2. Establish Flink Session**
+
+Then switch to FlinkCluster and Add New to add a new Flink cluster:
+
+
+
+Configuration details are as follows, here is the first part of the
configuration:
+
+**Configuration details**: cluster name and Kubernetes cluster ID We usually
write the same, fill in the correct Kubernetes service account, here the image
is used in tencent cloud tcr, and use lb as the external access to the Flink
UI. (The slots here are usually set to 2, and the task parallelism is usually a
multiple of 2, so that the session cluster has no idle resources).
+
+
+
+The second part of the configuration content:
+
+**Configuration details**: We take the Session mode, a JobManager manages a
lot of tasks, so the resources can be slightly larger, we give 2G, after
observing can be satisfied, but due to the task is more, the memory allocation
of JobManager metaspace consumption is relatively large, so you can be the
default of 256M, change to 500M, after After observation, a session manages
less than 10 tasks, this configuration is reasonable.
+
+After TaskManager memory session configuration is given, it can not be
modified in StreamPark programme configuration page, so you can estimate the
amount of session project data, if the amount of data is large, you can adjust
"CPU : Memory" to "1 core : 2G", of course, you can also adjust "CPU : Memory"
to "1 core : 2G", of course, you can also change the default to 256M. 2G", of
course, can also be larger, because the data processing and the number of
TaskManager related, that is, the [...]
+
+
+
+The last part reads as follows::
+
+
+
+The exact textual content is posted here:
+
+```shell
+#Access method reuse lb+port lb can be created in advance without k8s creation
so that the ip is fixed. It is more convenient to reuse.
+-Drest.port=8091
+-Dkubernetes.rest-service.annotations=service.kubernetes.io/qcloud-share-existed-lb:true,service.kubernetes.io/tke-existed-lbid:lb-iv9ixxxxx
+-Dkubernetes.rest-service.exposed.type=LoadBalancer
+
+#cp sp access tencent cloud cos using s3 protocol plugin way.
+-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dcontainerized.taskmanager.env.EANABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dstate.checkpoints.dir=s3://k8s-bigdata-xxxxxx/flink-checkpoints
+-Dstate.savepoints.dir=s3://k8s-bigdata-xxxxx/flink-savepoints
+
+#The task scheduling policy in #k8s, the node selection policy, because the
flink namespace is logical, and the node selection policy can be made to run on
a flink-specific physical supernode.
+-Dkubernetes.jobmanager.node-selector=usefor:flink
+-Dkubernetes.taskmanager.node-selector=usefor:flink
+
+#Enable high availability for jobmanager Use k8s implementation, use cos
directory storage, no hdfs overall.
+-Dhigh-availability.type=kubernetes
+-Dhigh-availability.storageDir=s3://k8s-bigdata-xxx/flink-recovery
+-Dkubernetes.cluster-id=streampark-share-session
+-Dkubernetes.jobmanager.replicas=2
+
+#pod image pull policy and flink cancel cp reservation Time zone
+-Dkubernetes.container.image.pull-policy=Always
+-Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
+-Dtable.local-time-zone=Asia/Shanghai
+
+#Mirror pull cache acceleration as explained below.
+-Dkubernetes.taskmanager.annotations=eks.tke.cloud.tencent.com/use-image-cache:
imc-6iubofdt
+-Dkubernetes.jobmanager.annotations=eks.tke.cloud.tencent.com/use-image-cache:
imc-6iubofdt
+```
+
+Note: Flink basic information can be configured in the conf file, and more
general information is recommended to be written in the session, StreamPark is
more appropriate to fill in the parallelism/cp/sp/fault-tolerance/main
parameter, and so on.
+
+Finally, you can start the session cluster. You can use the kubectl command to
check whether the Flink kubernetes session cluster has started successfully:
+
+
+
+### **4.3. Task Configuration**
+
+Here we will continue to demonstrate the use of StreamPark to submit three
types of tasks, namely **SQL tasks, Jar tasks, and Application mode tasks**.
+
+- **Submitting a Flink SQL job**
+
+A screenshot of the configuration of a Flink Sql job is shown below:
+
+
+
+The kubernetes clusterid should just be the session name of the corresponding
project:
+
+
+
+The dynamic parameters are configured as follows:
+
+```shell
+-Dstate.checkpoints.dir=s3://k8s-bigdata-12xxxxx/flink-checkpoints/ro-cn-sync
+-Dstate.savepoints.dir=s3://k8s-bigdata-12xxxxxxx/flink-savepoints/ro-cn-sync
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=25s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dtable.local-time-zone=Asia/Shanghai
+-Drestart-strategy.fixed-delay.attempts=10
+-Drestart-strategy.fixed-delay.delay=3min
+```
+
+- **Submit Flink JAR Tasks**
+
+The Jar task is basically the same as the Sql task, the configuration
screenshot is below:
+
+
+
+
+Dynamic parameters are as follows (if there is a configuration template, here
can be more streamlined, because StreamPark supports very good task
replication, so you can write a standard Jar and SQL task demo, the rest of the
tasks are replicated, which can greatly improve the efficiency of task
development):
+
+```shell
+-Dstate.checkpoints.dir=s3://k8s-bigdata-1xxxx/flink-checkpoints/ro-cn-shushu
+-Dstate.savepoints.dir=s3://k8s-bigdata-1xxxx/flink-savepoints/ro-cn-shushu
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=60s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dstate.checkpoints.num-retained=2
+-Drestart-strategy.type=failure-rate
+-Drestart-strategy.failure-rate.delay=3min
+-Drestart-strategy.failure-rate.failure-rate-interval=30min
+-Drestart-strategy.failure-rate.max-failures-per-interval=8
+```
+
+- **Submitting a Flink Application task**
+
+Of course, you can also deploy Application mode tasks to Kubernetes, so here's
an example, starting with the first part of the configuration:
+
+
+
+This is the second part of the configuration as follows:
+
+
+
+The third part of the configuration is as follows:
+
+
+
+Dynamic parameters:
+
+
+
+Detailed configuration content::
+
+```shell
+-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dstate.checkpoints.dir=s3://k8s-bigdata-xxx/flink-checkpoints/Kafka2Rocketmq2Kafka
+-Dstate.savepoints.dir=s3://k8s-bigdata-xxx/flink-savepoints/Kafka2Rocketmq2Kafka
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=60s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dstate.checkpoints.num-retained=2
+-Drest.port=8092
+-Dkubernetes.jobmanager.node-selector=usefor:flink
+-Dkubernetes.taskmanager.node-selector=usefor:flink
+-Dkubernetes.rest-service.annotations=service.kubernetes.io/qcloud-share-existed-lb:true,service.kubernetes.io/tke-existed-lbid:lb-xxx
+-Dkubernetes.rest-service.exposed.type=LoadBalancer
+-Dfs.allowed-fallback-filesystems=s3
+```
+
+### **4.4. Build Tasks**
+
+The task is then constructed, and each step of the task construction can be
clearly seen here:
+
+
+
+Eventually you can run it and observe that the task is running:
+
+
+
+### **4.5. Notification Alarm Configuration (optional)**
+
+Of course, StreamPark supports multiple notification alert methods (e.g.
mailboxes and flybooks, etc.), and we use the mailbox method, which can be
configured directly from the interface:
+
+
+
+So far, we have shared how to successfully go about deploying different types
of Flink tasks running in kubernetes in StremPark, very silky smooth!
+
+## **5. Practical - Homework Application Scenarios**
+
+Next, we will continue to dissect the job, from the tuning, hands-on and other
scenarios to give us a better understanding of the application of Flink tasks.
+
+### **5.1. Tuning Recommendations**
+
+**Recommendation 1 (time zone)**: containers need to be in utc+8 to easily see
the logs, we can add the following in the conf configuration:
+
+```shell
+env.java.opts.jobmanager: -Duser.timezone=GMT+08
+env.java.opts.taskmanager: -Duser.timezone=GMT+08
+```
+
+**Recommendation 2 (StreamPark Variable Management)**: StreamPark provides
variable management, which allows us to manage some configurations to improve
security and convenience:
+
+
+
+**Proposal 3 (StreamPark Passing Parameters)**: On YARN, we used Flink's own
toolkit to pass parameters, and it was difficult to pass parameters to the
Flink environment, so we didn't look into it. We chose to pass in the program
parameters through StreamPark, so that it can be used in both YARN and
kubernetes, **while the application and session methods in kubernetes can also
be used, it becomes more general and easy to change, highly recommended**!
+
+
+
+**Recommendation 4 (Use Session pattern)**: Although the Yarn-application
pattern provides a high degree of task isolation, each of its tasks requires a
separate jobmanager resource, which leads to excessive resource consumption. In
addition, merging tasks, while reducing resource consumption, can lead to
problems such as high workload and unstable data. In contrast, the session
pattern allows tasks to share a single jobmanager resource in a session, which
can maximise the saving of comp [...]
+
+### **5.2. Data CDC synchronisation**
+
+**[MySQL → Doris]**: The following code essentially demonstrates the Flink sql
pipeline for **MySQL data synchronisation to Apache Doris**:
+
+```sql
+CREATE TABLE IF NOT EXISTS `login_log_mysql` (
+ login_log_id bigint not null,
+ account_id bigint ,
+ long_account_id string ,
+ short_account_id string ,
+ game_code string ,
+ package_code string ,
+ channel_id int ,
+ login_at int ,
+ login_ip string ,
+ device_ua string ,
+ device_id string ,
+ device_net string ,
+ device_ratio string ,
+ device_os string ,
+ device_carrier string ,
+ ext string ,
+ created_at int ,
+ updated_at int ,
+ deleted_at int ,
+ PRIMARY KEY(`login_log_id`)
+ NOT ENFORCED
+) with (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = '10.xxx.xxx.xxx',
+ 'port' = '3306',
+ 'username' = 'http_readxxxxx',
+ 'password' = 'xxxx@)',
+ 'database-name' = 'player_user_center',
+ -- 'scan.startup.mode' = 'latest-offset',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'table-name' = 'login_log_202[3-9][0-9]{2}',
+ 'server-time-zone' = '+07:00'
+);
+
+create table if not EXISTS login_log_doris(
+ `date` date ,
+ login_log_id bigint ,
+ account_id bigint ,
+ long_account_id string ,
+ short_account_id string ,
+ game_code string ,
+ `package_code` string ,
+ `channel_id` int ,
+ login_at string ,
+ login_ip string ,
+ device_ua string ,
+ device_id string ,
+ device_net string ,
+ device_ratio string ,
+ device_os string ,
+ device_carrier string ,
+ ext string ,
+ created_at string ,
+ updated_at string ,
+ deleted_at string ,
+ PRIMARY KEY(`date`,`login_log_id`)
+ NOT ENFORCED
+) WITH (
+ 'connector' = 'doris',
+
'jdbc-url'='jdbc:mysql://xxx.xx.xx.xx:9030,xxx.xxx.xxx.xxx:9030,xxx.xxx.xxx.xxx:9030',
+ 'load-url'='xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030',
+ 'database-name' = 'ro_sea_player_user_center',
+ 'table-name' = 'ods_login_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once',
+ 'sink.max-retries' = '10'
+);
+
+create view login_log_flink_trans as
+select
+ to_date(cast(to_timestamp_ltz(login_at,0) as varchar) ) ,
+ login_log_id ,
+ account_id ,
+ long_account_id ,
+ short_account_id ,
+ game_code ,
+ package_code ,
+ channel_id,
+ cast(to_timestamp_ltz(login_at,0) as varchar) as login_at,
+ login_ip ,
+ device_ua ,
+ device_id ,
+ device_net ,
+ device_ratio ,
+ device_os ,
+ device_carrier ,
+ ext ,
+ cast(to_timestamp_ltz(created_at,0) as varchar) as created_at,
+ cast(to_timestamp_ltz(updated_at,0) as varchar) as updated_at,
+ cast(to_timestamp_ltz(deleted_at,0) as varchar) as deleted_at
+from login_log_mysql;
+
+insert into login_log_doris select * from login_log_flink_trans;
+
+CREATE TABLE IF NOT EXISTS `account_mysql` (
+ account_id bigint ,
+ open_id string ,
+ dy_openid string ,
+ dy_ios_openid string ,
+ ext string ,
+ last_channel int ,
+ last_login_time int ,
+ last_login_ip string ,
+ created_at int ,
+ updated_at int ,
+ deleted_at string ,
+ PRIMARY KEY(`account_id`)
+ NOT ENFORCED
+) with (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = 'xxx.xx.xx.xx',
+ 'port' = '3306',
+ 'username' = 'http_readxxxx',
+ 'password' = 'xxxxx@)',
+ 'database-name' = 'player_user_center',
+ -- 'scan.startup.mode' = 'latest-offset',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'table-name' = 'account_[0-9]+',
+ 'server-time-zone' = '+07:00'
+);
+
+create table if not EXISTS account_doris(
+ `date` date ,
+ account_id bigint ,
+ open_id string ,
+ dy_openid string ,
+ dy_ios_openid string ,
+ ext string ,
+ `last_channel` int ,
+ last_login_time string ,
+ last_login_ip string ,
+ created_at string ,
+ updated_at string ,
+ deleted_at string ,
+ PRIMARY KEY(`date`,`account_id`)
+ NOT ENFORCED
+) WITH (
+ 'connector' = 'doris',
+
'jdbc-url'='jdbc:mysql://xxx.xx.xx.xx:9030,xxx.xx.xx.xx:9030,xxx.xx.xx.xx:9030',
+ 'load-url'='xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030',
+ 'database-name' = 'ro_sea_player_user_center',
+ 'table-name' = 'ods_account_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once',
+ 'sink.max-retries' = '10'
+);
+
+create view account_flink_trans as
+select
+ to_date(cast(to_timestamp_ltz(created_at,0) as varchar) ) ,
+ account_id ,
+ open_id ,
+ dy_openid ,
+ dy_ios_openid ,
+ ext ,
+ last_channel ,
+cast(to_timestamp_ltz(last_login_time,0) as varchar) as last_login_time,
+ last_login_ip ,
+ cast(to_timestamp_ltz(created_at,0) as varchar) as created_at,
+ cast(to_timestamp_ltz(updated_at,0) as varchar) as updated_at,
+ deleted_at
+from account_mysql;
+
+insert into account_doris select * from account_flink_trans;
+```
+
+**[Game Data Synchronisation]**: pull kafka data collected by overseas
**filebeat to domestic kafka** and process metadata processing:
+
+```sql
+--Create a kafka source table for hmt.
+CREATE TABLE kafka_in (
+ `env` STRING comment 'Game environment hdf /qc/cbt/obt/prod',
+ `host` STRING comment 'Game server where logs are located',
+ `tags` STRING comment 'Log Tags normal | fix',
+ `log` STRING comment 'File and offset of the log',
+ `topic` STRING METADATA VIRTUAL comment 'kafka topic',
+ `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL comment 'The kafka
partition where the logs are located',
+ `offset` BIGINT METADATA VIRTUAL comment 'The offset of the kafka
partition',
+ `uuid` STRING comment 'uuid generated by filebeat',
+ `message` STRING comment 'tlog message',
+ `@timestamp` STRING comment 'filebeat capture time',
+ `ts` TIMESTAMP(3) METADATA FROM 'timestamp' comment 'kafka storage message
time'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog',
+ 'properties.bootstrap.servers' =
'sea-kafka-01:9092,sea-kafka-02:9092,sea-kafka-03:9092',
+ 'properties.group.id' = 'streamx-ro-sea-total',
+ 'properties.client.id' = 'streamx-ro-sea-total',
+ 'properties.session.timeout.ms' = '60000',
+ 'properties.request.timeout.ms' = '60000',
+ 'scan.startup.mode' = 'group-offsets',
+ --'scan.startup.mode' = 'earliest-offset',\
+ 'properties.fetch.max.bytes' = '123886080',
+ 'properties.max.partition.fetch.bytes' = '50388608',
+ 'properties.fetch.max.wait.ms' = '2000',
+ 'properties.max.poll.records' = '1000',
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'false'
+);
+
+--Creating the kafka official table for emr
+CREATE TABLE kafka_out_sea (
+`env` STRING comment 'Game environment qc/cbt/obt/prod',
+`hostname` STRING comment 'Game server where logs are located',
+`tags` STRING comment 'Log Tags normal | fix',
+`log_offset` STRING comment 'File and offset of the log',
+`uuid` STRING comment 'uuid generated by filebeat',
+`topic` STRING comment 'kafka topic',
+`partition_id` BIGINT comment 'The kafka partition where the logs are
located',
+`kafka_offset` BIGINT comment 'The offset of the kafka partition',
+ eventname string comment 'event name',
+`message` STRING comment 'tlog message',
+`filebeat_ts` STRING comment 'filebeat collection time',
+`kafka_ts` TIMESTAMP(3) comment 'kafka stored message time',
+`flink_ts` TIMESTAMP(3) comment 'flink processed message time'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog',
+ --'properties.client.id' = 'flinkx-ro-sea-prod-v2',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'format' = 'json',
+ --'sink.partitioner'= 'fixed',
+ 'sink.delivery-guarantee' = 'exactly-once',
+ 'sink.transactional-id-prefix' = 'ro_sea_kafka_sync_v10',
+ 'properties.compression.type' = 'zstd',
+ 'properties.transaction.timeout.ms' = '600000',
+ 'properties.message.max.bytes' = '13000000',
+ 'properties.max.request.size' = '13048576',
+ --'properties.buffer.memory' = '83554432',
+ 'properties.acks' = '-1'
+);
+
+--etl create target view
+create view kafka_sea_in_view as
+select `env`,JSON_VALUE(`host`,'$.name') as hostname,`tags`,`log` as
log_offset,`uuid`,
+ `topic`,`partition_id`,`offset` as kafka_offset,
+ lower(SPLIT_INDEX(message,'|',0)) as eventname,`message`,
+ CONVERT_TZ(REPLACE(REPLACE(`@timestamp`,'T',' '),'Z',''), 'UTC',
'Asia/Bangkok') as filebeat_ts, `ts` as kafka_ts ,CURRENT_TIMESTAMP as flink_ts
+from kafka_in;
+
+--Write data to emr sea topic
+insert into kafka_out_sea
+select * from kafka_sea_in_view ;
+```
+
+**[Real-time counting warehouse]**: shunt Jar program and ods -> dwd sql
processing function, part of the code screenshot below:
+
+
+
+and the associated Flink sql:
+
+```sql
+--2 dwd_moneyflow_log
+CREATE TABLE kafka_in_money (
+`env` STRING comment 'Game environment qc/cbt/obt/prod',
+`hostname` STRING comment 'Game server where logs are located',
+`uuid` STRING comment 'Log unique id',
+`message` STRING comment 'tlog message',
+`filebeat_ts` STRING comment 'filebeat collection time'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog_split_moneyflow',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.group.id' = 'flinksql_kafka_in_moneyflow_v2',
+ 'scan.startup.mode' = 'group-offsets',
+ --'scan.startup.mode' = 'earliest-offset',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'false'
+);
+
+CREATE TABLE kafka_out_money(
+ `date` string not NULL COMMENT 'date',
+ `vroleid` int NULL COMMENT 'role id',
+ `moneytype` int NULL COMMENT 'Currency type',
+ `env` string NULL COMMENT 'Game environment hdf /qc/cbt/obt/prod',
+ `hostname` string NULL COMMENT 'The game server where the logs are located',
+ `uuid` string NULL COMMENT 'The uuid generated by the filebeat capture',
+ `filebeat_ts` string NULL COMMENT 'filebeat capture time',
+ flink_ts string NULL COMMENT 'message time written by flink',
+ `gamesvrid` int NULL COMMENT 'Logged in game server number',
+ `dteventtime` string NULL COMMENT 'The time of the game event, format
YYYY-MM-DD HH:MM:SS',
+ `vgameappid` string NULL COMMENT 'Game APPID',
+ `platid` int NULL COMMENT 'ios 0 /android 1',
+ `izoneareaid` int NULL COMMENT 'The zone id is used to uniquely identify a
zone for split-server games; 0 for non-split-server games',
+ `vopenid` string NULL COMMENT 'User's OPENID number',
+ `vrolename` string NULL COMMENT 'Character name',
+ `jobid` int NULL COMMENT 'Role Occupation 0=Wizard 1=......',
+ `gender` int NULL COMMENT 'Character Gender 0=Male 1=Female',
+ `ilevel` int NULL COMMENT 'Character base level',
+ `ijoblevel` int NULL COMMENT 'Character's career level',
+ `playerfriendsnum` int NULL COMMENT 'Number of player friends',
+ `chargegold` int NULL COMMENT 'Character's recharge experience (cumulative
recharge),
+ `iviplevel` int NULL COMMENT 'Character's VIP level',
+ `createtime` string NULL COMMENT 'Account creation time',
+ `irolece` int NULL COMMENT 'Battle power/rating',
+ `unionid` int NULL COMMENT 'Guild ID',
+ `unionname` string NULL COMMENT 'Guild name',
+ `regchannel` int NULL COMMENT 'registration channel',
+ `loginchannel` int NULL COMMENT 'login channel',
+ `sequence` int NULL COMMENT 'Used to correlate a single action to generate
multiple logs of different types of currency movement',
+ `reason` int NULL COMMENT 'Behavior (currency flow level 1 reason)',
+ `subreason` int NULL COMMENT 'Flow direction (item flow definition)',
+ `imoney` int NULL COMMENT 'Number of currency changes',
+ `aftermoney` int NULL COMMENT 'Number of aftermoney after action',
+ `afterboundmoney` int NULL COMMENT 'Number of money bound after the action',
+ `addorreduce` int NULL COMMENT 'Increase or decrease: 0 is increase; 1 is
decrease',
+ `serialnum` string NULL COMMENT 'The running number',
+ `sourceid` int NULL COMMENT 'Channel number',
+ `cmd` string NULL COMMENT 'command word',
+ `orderid` string NULL COMMENT 'Order id (contains meowgoincrease and
meowgoincrease, also contains cash recharge order id)',
+ `imoneytype` int NULL COMMENT 'Currency type 2',
+ `distincid` string NULL COMMENT 'Visitor ID',
+ `deviceuid` string NULL COMMENT 'device ID',
+ `guildjob` int NULL COMMENT 'Guild Position',
+ `regtime` string NULL COMMENT 'Account registration time'
+) WITH (
+ 'connector' = 'doris',
+ 'jdbc-url'='jdbc:mysql://xxx:9030,xxx:9030,xxx:9030',
+ 'load-url'='xx:8030;xxx:8030;xxx:8030',
+ 'database-name' = 'ro_sea',
+ 'table-name' = 'dwd_moneyflow_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once'
+);
+
+create view kafka_out_money_view1 as
+select IF(to_date(SPLIT_INDEX(message,'|',2)) > CURRENT_DATE + interval '1'
day,CURRENT_DATE,to_date(SPLIT_INDEX(message,'|',2))) as `date`,
+ `env`,`hostname` as hostname,`uuid`,
+ try_cast(SPLIT_INDEX(message,'|',1) as int) as gamesvrid ,
+ SPLIT_INDEX(message,'|',2) as dteventtime ,
+ SPLIT_INDEX(message,'|',3) as vgameappid ,
+ try_cast(SPLIT_INDEX(message,'|',4) as int) as platid
,
+ try_cast(SPLIT_INDEX(message,'|',5) as int) as izoneareaid
,
+ SPLIT_INDEX(message,'|',6) as vopenid ,
+ try_cast(SPLIT_INDEX(message,'|',7) as int) as vroleid
,
+ SPLIT_INDEX(message,'|',8) as vrolename ,
+ try_cast(SPLIT_INDEX(message,'|',9) as int) as jobid
,
+ try_cast(SPLIT_INDEX(message,'|',10) as int) as gender
,
+ try_cast(SPLIT_INDEX(message,'|',11) as int) as ilevel
,
+ try_cast(SPLIT_INDEX(message,'|',12) as int) as ijoblevel
,
+ try_cast(SPLIT_INDEX(message,'|',13) as int) as playerfriendsnum
,
+ try_cast(SPLIT_INDEX(message,'|',14) as int) as chargegold
,
+ try_cast(SPLIT_INDEX(message,'|',15) as int) as iviplevel
,
+ SPLIT_INDEX(message,'|',16) as createtime ,
+ try_cast(SPLIT_INDEX(message,'|',17) as int) as irolece
,
+ try_cast(SPLIT_INDEX(message,'|',18) as int) as unionid
,
+ SPLIT_INDEX(message,'|',19) as unionname ,
+ try_cast(SPLIT_INDEX(message,'|',20) as int) as regchannel
,
+ try_cast(SPLIT_INDEX(message,'|',21) as int) as loginchannel
,
+ try_cast(SPLIT_INDEX(message,'|',22) as int) as `sequence`
,
+ try_cast(SPLIT_INDEX(message,'|',23) as int) as `reason`
,
+ try_cast(SPLIT_INDEX(message,'|',24) as int) as subreason
,
+ try_cast(SPLIT_INDEX(message,'|',25) as int) as moneytype
,
+ try_cast(SPLIT_INDEX(message,'|',26) as int) as imoney
,
+ try_cast(SPLIT_INDEX(message,'|',27) as int) as aftermoney
,
+ try_cast(SPLIT_INDEX(message,'|',28) as int) as afterboundmoney
,
+ try_cast(SPLIT_INDEX(message,'|',29) as int) as addorreduce
,
+ SPLIT_INDEX(message,'|',30) as serialnum ,
+ try_cast(SPLIT_INDEX(message,'|',31) as int) as sourceid
,
+ SPLIT_INDEX(message,'|',32) as `cmd` ,
+ SPLIT_INDEX(message,'|',33) as orderid ,
+ try_cast(SPLIT_INDEX(message,'|',34) as int) as imoneytype
,
+ SPLIT_INDEX(message,'|',35) as distincid ,
+ SPLIT_INDEX(message,'|',36) as deviceuid ,
+ try_cast(SPLIT_INDEX(message,'|',37) as int) as guildjob
,
+ SPLIT_INDEX(message,'|',38) as regtime ,
+ filebeat_ts,CURRENT_TIMESTAMP as flink_ts
+from kafka_in_money;
+
+insert into kafka_out_money
+select
+cast(`date` as varchar) as `date`,
+`vroleid`,
+`moneytype`,
+`env`,
+`hostname`,
+`uuid`,
+`filebeat_ts`,
+cast(`flink_ts` as varchar) as flink_ts,
+`gamesvrid`,
+`dteventtime`,
+`vgameappid`,
+`platid`,
+`izoneareaid`,
+`vopenid`,
+`vrolename`,
+`jobid`,
+`gender`,
+`ilevel`,
+`ijoblevel`,
+`playerfriendsnum`,
+`chargegold`,
+`iviplevel`,
+`createtime`,
+`irolece`,
+`unionid`,
+`unionname`,
+`regchannel`,
+`loginchannel`,
+`sequence`,
+`reason`,
+`subreason`,
+`imoney`,
+`aftermoney`,
+`afterboundmoney`,
+`addorreduce`,
+`serialnum`,
+`sourceid`,
+`cmd`,
+`orderid`,
+`imoneytype`,
+`distincid`,
+`deviceuid`,
+`guildjob`,
+`regtime`
+from kafka_out_money_view1;
+```
+
+**[Advertisement system]: Use Flink SQL to slice and dice a row of log data to
parse out the data in each field and complete the business calculations :**
+
+```sql
+CREATE TABLE IF NOT EXISTS ods_adjust_in (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ad_adjust_callback',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.group.id' = 'ods_adjust_etl_20231011',
+ 'properties.max.partition.fetch.bytes' = '4048576',
+ 'scan.startup.mode' = 'group-offsets',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'raw'
+);
+
+CREATE TABLE IF NOT EXISTS ods_af_in (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ad_af_callback',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.max.partition.fetch.bytes' = '4048576',
+ 'properties.group.id' = 'ods_af_etl_20231011',
+ 'scan.startup.mode' = 'group-offsets',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'raw'
+);
+
+CREATE TABLE IF NOT EXISTS ods_mmp_out (
+ `date` DATE,
+ `mmp_type` STRING,
+ `app_id` STRING,
+ `event_name` STRING,
+ `event_time` TIMESTAMP(3),
+ `mmp_id` STRING,
+ `distinct_id` STRING,
+ `open_id` STRING,
+ `account_id` STRING,
+ `os_name` STRING, -- platform
+ `country_code` STRING,
+ `install_time` TIMESTAMP(3),
+ `bundle_id` STRING,
+ `media` STRING,
+ `channel` STRING,
+ `campaign` STRING,
+ `campaign_id` STRING,
+ `adgroup` STRING,
+ `adgroup_id` STRING,
+ `ad` STRING,
+ `ad_id` STRING,
+ `flink_ts` TIMESTAMP(3) comment 'Message time processed by flink',
+ `device_properties` STRING,
+ `log` STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ods_mmp_log',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'scan.topic-partition-discovery.interval'='60000',
+ 'properties.group.id' = 'mmp2etl-out',
+ 'format' = 'json'
+);
+
+INSERT INTO ods_mmp_out
+SELECT
+ `date`
+ ,'Adjust' as `mmp_type`
+ ,`app_token` as `app_id`
+ ,`event_name`
+ ,`event_time`
+ ,`adid` as `mmp_id`
+ ,`distinct_id`
+ ,`open_id`
+ ,`account_id`
+ ,`os_name`
+ ,`country_code`
+ ,`install_time`
+ ,'' as `bundle_id`
+ ,'' as `media`
+ ,`network` as `channel`
+ ,`campaign`
+ ,REGEXP_EXTRACT(`campaign`, '([\\(=])([a-z0-9]+)', 2) as `campaign_id`
+ ,`adgroup`
+ ,REGEXP_EXTRACT(`adgroup`, '([\\(=])([a-z0-9]+)', 2) as `adgroup_id`
+ ,`creative` as `ad`
+ ,REGEXP_EXTRACT(`creative`, '([\\(=])([a-z0-9]+)', 2) as `ad_id`
+ ,`flink_ts`
+ ,`device_properties`
+ ,`log`
+FROM (
+ SELECT
+ to_date(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.created_at') as bigint))) as
`date`
+ ,JSON_VALUE(log,'$.app_token') as `app_token`
+ ,JSON_VALUE(log,'$.adid') as `adid`
+ ,LOWER(REPLACE( TRIM( COALESCE(JSON_VALUE(log,'$.event_name'),
JSON_VALUE(log,'$.activity_kind')) ), ' ', '_')) as `event_name`
+ ,TO_TIMESTAMP(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.created_at') as
bigint))) as `event_time`
+ ,TO_TIMESTAMP(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.installed_at') as
bigint))) as `install_time`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
COALESCE(JSON_VALUE(log,'$.fb_install_referrer_publisher_platform'),'facebook')
ELSE JSON_VALUE(log,'$.network_name') END) AS `network`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_campaign_group_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_campaign_group_id'), ')')) ELSE
JSON_VALUE(log,'$.campaign_name') END) AS `campaign`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_campaign_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_campaign_id'), ')')) ELSE
JSON_VALUE(log,'$.adgroup_name') END) AS `adgroup`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_adgroup_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_adgroup_id'), ')')) ELSE
JSON_VALUE(log,'$.creative_name') END) AS `creative`
+ ,JSON_VALUE(log,'$.os_name') as `os_name`
+ ,JSON_VALUE(log,'$.country_code') as `country_code`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.ta_distinct_id')
as `distinct_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.open_id') as
`open_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.ta_account_id')
as `account_id`
+ ,CURRENT_TIMESTAMP as `flink_ts`
+ ,JSON_OBJECT(
+ 'ip' VALUE JSON_VALUE(log,'$.ip')
+ ,'ua' VALUE JSON_VALUE(log,'$.ua')
+ ,'idfa' VALUE JSON_VALUE(log,'$.idfa')
+ ,'idfv' VALUE JSON_VALUE(log,'$.idfv')
+ ,'gps_adid' VALUE JSON_VALUE(log,'$.gps_adid')
+ ,'android_id' VALUE JSON_VALUE(log,'$.android_id')
+ ,'mac_md5' VALUE JSON_VALUE(log,'$.mac_md5')
+ ,'oaid' VALUE JSON_VALUE(log,'$.oaid')
+ ,'gclid' VALUE JSON_VALUE(log,'$.gclid')
+ ,'gbraid' VALUE JSON_VALUE(log,'$.gbraid')
+ ,'dcp_wbraid' VALUE JSON_VALUE(log,'$.dcp_wbraid')
+ ) as `device_properties`
+ ,`log`
+ FROM ods_adjust_in
+ WHERE COALESCE(JSON_VALUE(log,'$.activity_kind'),
JSON_VALUE(log,'$.event_name')) not in ('impression', 'click')
+)
+UNION ALL
+SELECT
+ to_date(CONVERT_TZ(JSON_VALUE(log,'$.event_time'), 'UTC', 'Asia/Shanghai'))
as `date`
+ ,'AppsFlyer' as `mmp_type`
+ ,JSON_VALUE(log,'$.app_id') as `app_id`
+ ,LOWER(REPLACE( TRIM(JSON_VALUE(log,'$.event_name') ), ' ', '-')) as
`event_name`
+ ,TO_TIMESTAMP(CONVERT_TZ(JSON_VALUE(log,'$.event_time'), 'UTC',
'Asia/Shanghai')) as `event_time`
+ ,JSON_VALUE(log,'$.appsflyer_id') as `mmp_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.ta_distinct_id') as
`distinct_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.open_id') as `open_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.ta_account_id') as
`account_id`
+ ,LOWER(JSON_VALUE(log,'$.platform')) AS `os_name`
+ ,LOWER(JSON_VALUE(log,'$.country_code')) as `country_code`
+ ,TO_TIMESTAMP(CONVERT_TZ(JSON_VALUE(log,'$.install_time'), 'UTC',
'Asia/Shanghai')) as `install_time`
+ ,LOWER(JSON_VALUE(log,'$.bundle_id')) AS `bundle_id`
+ ,LOWER(JSON_VALUE(log,'$.media_source')) AS `media`
+ ,LOWER(JSON_VALUE(log,'$.af_channel')) AS `channel`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.campaign')), ' (',
LOWER(JSON_VALUE(log,'$.af_c_id')), ')') AS `campaign`
+ ,LOWER(JSON_VALUE(log,'$.af_c_id')) AS `campaign_id`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.af_adset')), ' (',
LOWER(JSON_VALUE(log,'$.af_adset_id')), ')') AS `adgroup`
+ ,LOWER(JSON_VALUE(log,'$.af_adset_id')) AS `adgroup_id`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.af_ad')), ' (',
LOWER(JSON_VALUE(log,'$.af_ad_id')), ')') AS `ad`
+ ,LOWER(JSON_VALUE(log,'$.af_ad_id')) AS `ad_id`
+ ,CURRENT_TIMESTAMP as `flink_ts`
+ ,JSON_OBJECT(
+ 'ip' VALUE JSON_VALUE(log,'$.ip')
+ ,'ua' VALUE JSON_VALUE(log,'$.user_agent')
+ ,'idfa' VALUE JSON_VALUE(log,'$.idfa')
+ ,'idfv' VALUE JSON_VALUE(log,'$.idfv')
+ ,'gps_adid' VALUE JSON_VALUE(log,'$.advertising_id')
+ ,'android_id' VALUE JSON_VALUE(log,'$.android_id')
+ ,'oaid' VALUE JSON_VALUE(log,'$.oaid')
+ ) as `device_properties`
+ ,`log`
+FROM ods_af_in;
+```
+
+**Data reporting**: similar to Shenzhe
+
+
+
+## **6. Practical - Tencent Cloud Environment**
+
+The Flink task in this article is running in Tencent's kubernetes environment,
which includes some related knowledge points that will not be detailed here,
for example:
+
+- **Mirror synchronisation**: harbor configures automatic synchronisation of
mirrors to the Tencent cloud mirror service TCR.
+
+- **Mirror configuration**: such as secret-free pull, cache acceleration.
+
+- **Flink webui extranet access**: need to buy from the extranet lb in order
to access Flink UI.
+
+- **Environmental access**: because the kubernetes cluster is mainly a closed
intranet computing environment, with the system network, security groups,
permissions, routing, etc. need to be well configured and tested, in order to
let the task really run in the production environment better.
+
+- **Security Group Configuration**: In order to pull real-time data from
overseas games and write it to the domestic Kafka cluster to ensure a smooth
data flow, we use a dedicated line and open a dedicated extranet access to the
kubernetes Flink super node to achieve this.
+
+Relevant screenshots:
+
+
+
+
+
+## **7. Benefits and expectations**
+
+**After using Apache StreamPark, the biggest impression we got is that
StreamPark is very easy to use. With simple configuration, we can quickly
deploy Flink tasks to any cluster and monitor and manage thousands of jobs in
real time, which is very nice!**
+
+- The benefits are as follows: A large number of real-time tasks were quickly
migrated to Kubernetes via StreamPark. After YARN was dropped, the Apache Doris
cluster, which had been a part of the early mix, became very stable after there
was no resource contention.
+
+- When replenishing data, we can quickly borrow the capacity of super nodes in
TKE, and through the StreamPark platform, it is easy to populate and shrink
thousands of pods in a minute, which makes replenishment very efficient.
+
+- The technology stack has become more advanced and unified. The pile of
components in the era of big data has become more and more efficient and
unified. At present, Joy Big Data has already completed the Kubernetes cloud
nativeisation of a series of components such as Doris, Flink, Spark, Hive,
Grafana, Prometheus, etc., which makes operation and maintenance simpler and
more efficient, and StreamPark has played a great role in the Flink
containerisation process.
+
+- StreamPark platform is easy to operate, complete, stable and responsive to
the community, which really makes stream processing job development more and
more simple, and brings us a lot of convenience, allowing us to focus more on
business, kudos~.
+
+Here, we also expect StreamPark can do better and better, here are some
optimisation suggestions:
+
+- **Support Operator Deployment**: Expect Flink on kubernetes to support
operator deployment method because of its benefits of customising resource
types, specifying invocation order, monitoring metrics and so on.
+
+- **Support autoscaler**: Because real-time streaming tasks have business
peaks and valleys, special weekly events, doubled data volume, and artificial
periodic adjustment of parallelism is not very suitable. Therefore, in
combination with Kubernetes, we expect StreamPark to complete the operator way
to create a Flink cluster as soon as possible. In Flink 1.17 or above, you can
set a better threshold, use the automatic pop-up shrinkage to achieve the peak
period to pop up more resources [...]
+
+- **programme error logic**: the current Flink task set a fixed number of
times or ratio retry, but the current StreamPark mail alarms are particularly
frequent, not the expected task fails once for an alarm, so I hope that it will
be further upgraded.
+
+Finally, we would like to thank the **Apache StreamPark** community for their
selfless help in using StreamPark. Their professionalism and user-centred
attitude has enabled us to use this powerful framework more efficiently and
smoothly. We look forward to StreamPark getting better and better, and becoming
a model for up-and-coming Apache projects!
+
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-blog/12-streampark-usercase-joymaker.md
b/i18n/zh-CN/docusaurus-plugin-content-blog/12-streampark-usercase-joymaker.md
new file mode 100644
index 00000000..70d316fa
--- /dev/null
+++
b/i18n/zh-CN/docusaurus-plugin-content-blog/12-streampark-usercase-joymaker.md
@@ -0,0 +1,1121 @@
+---
+slug: streampark-usercase-joymaker
+title: Apache StreamPark™ 在欢乐互娱的云原生平台实践
+tags: [StreamPark, 生产实践]
+---
+
+
+
+
+**导读**:本文主要详细介绍欢乐互娱在实战中对大数据技术架构的应用,阐述为何选择 “Kubernetes + StreamPark”
来持续优化和增强现有的架构。不仅系统地阐述了如何在实际环境中部署并运用这些关键技术,更是深入地讲解了 StreamPark
的实践使用,强调理论与实践的完美融合,相信读者通过阅读这篇文章,将有助于理解和掌握相关技术,并能在实践中进步,从而取得显著的学习效果。
+
+Github: https://github.com/apache/streampark
+
+欢迎关注、Star、Fork,参与贡献
+
+供稿单位 | 欢乐互娱
+
+文章作者 | 杜遥
+
+文章整理 | 杨林伟
+
+内容校对 | 潘月鹏
+
+<!-- truncate -->
+
+欢乐互娱,是一家全球游戏研发和发行公司;公司产品聚焦在 MMORPG 和 MMOACT
两大品类;欢乐产品理念:“宁做100人尖叫的产品,不做1万人叫好的产品”;欢乐出品,必属精品。《RO仙境传说:爱如初见》上线东南亚,预约期间达成了 1000
万预约的里程碑,上线首日取得五国畅销榜第一,并在日韩美、港澳台等地都有发行,取得了不错的成绩。
+
+## **1. 业务背景与挑战**
+
+欢乐互娱目前使用的大数据底座是腾讯云的 EMR,数仓架构主要分为了如下几类:
+
+- **离线数仓架构**:Hadoop + Hive + Dolphinscheduler + Trino
+
+- **实时数仓架构**:Kafka + Flink + StreamPark + Doris
+
+- **流式数仓架构(规划中)**:Paimon + Flink + StreamPark + Doris
+
+其中,流式数仓架构是我们 2024 年的重点计划之一,目标是基于 Kubernetes
的弹性特点以提高大数据分析能力及各种数据产品的价值输出。然而,由于早前的架构原因(主要是 Apache Doris 和 EMR
核心节点的混部署),让我们遇到了许多问题,例如:
+- **资源争夺**:在 YARN 资源调度平台下,Apache Doris、Apache Flink 实时任务、Hive 的离线任务以及 Spark 的
SQL 任务经常争抢 CPU 和内存资源,导致 Doris 查询稳定性下降,查询错误频繁。
+
+- **资源不足**:YARN 的机器资源使用率大部分时间位于 70% 至 80% 之间,特别是在大量的补数的场景下,集群资源往往不足。
+
+- **人工干预**:实时 Flink 的任务越来越多,各游戏的数据在一天内有明显的波峰波谷变化,特别是在周末进行的挂机活动,数据量翻倍,甚至增加至每天超过
100 亿条,而且这需要频繁地手动调整并行度来适应计算压力,以保证数据消费能力。
+
+## **2. 解决方案选型**
+
+为了解决以上问题,我们寻求能够进行 “**资源弹性伸缩**” 和 “**任务管理**” 的相关组件,同时考虑到大数据云原生的趋势,采用云原生的架构迫在眉睫,
Kubernetes 架构下能为大数据运维减负、提供高可用、负载均衡、灰度更新等更好更稳定的能力。
+
+同时简化 Apache Flink 实时任务的开发和运维也是不得不面对的问题,自 2022 年中旬开始,就重点研究 Flink
相关的开源和商业化相关的项目,在进行技术选型时,公司主要从几个关键维度进行了深入考察,包括:**Kubernetes 容器化部署、轻量程度、多
Flink版本的支持、功能的完备性、多种部署模式、对 CI/CD 的支持以及稳定性、成熟度等,最终不出意外的选择了 Apache StreamPark。**
+
+
+
+我们在使用 StreamPark 的过程中,对其进行了深入的应用和探索,已经成功实现了100+ Flink 任务轻松部署到
Kubernetes,可以十分方便地进行作业管理。
+
+接下来,将详述我们是如何使用 StreamPark 实现 Flink on kubernetes
在生产环境中的实践过程,尽管内容较长,但是干货满满,内容主要分为以下的几个部分:
+
+- **环境准备篇**:这部分介绍 Kubernetes、StreamPark、Maven、Docker、kubectl 工具和 Flink
的安装与配置过程。
+
+- **StreamPark 使用篇**:这部分介绍如何使用 StreamPark 启动 Session 集群,以及部署 SQL、JAR 作业的流程等。
+
+- **Flink 任务应用场景篇**:在这一部分,将分享欢乐互娱实际运用 Flink 任务的实践场景。
+
+## **3. 落地实践 - 环境准备篇**
+
+接下来我们将从安装开始详细介绍 StreamPark 的落地实践,关于腾讯云产品的一些使用步骤,将在文末简要介绍,此处主要分享核心的步骤。
+
+### **3.1. 准备 Kubernetes 环境**
+
+欢乐互娱的 Kubernetes 环境使用的是腾讯云容器服务产品,其中 Kubernetes 命名空间主要划分如下:
+
+- **Apache Doris**:Doris 集群 + 3 FE(标准节点) + CN(1 台超级节点)。
+
+- **Apache Flink**:Flink
集群由一个超级节点组成,这个节点相当于一台超大资源的机器,使用起来非常简单,并且可以迅速弹性扩展到数以万计的 pod。
+
+- **BigData**:包含各种大数据组件。
+
+另外,我们还有两台原生节点,主要用于部署 StreamPark、Dolphinscheduler 以及 Prometheus +
Grafana。集群信息相关截图如下:
+
+
+
+### **3.2. 安装 StreamPark**
+
+我们采用的是 StreamPark 2.1.4 最新的稳定版本,该版本修复了一些 Bug,并增加了一些新的特性,如支持 Flink 1.18、Jar
任务支持通过 pom 或者上传依赖等。
+
+虽然 StreamPark 支持 Docker 和 Kubernetes
部署,但是出于节省时间和对所有大数据组件需要进行顶层设计容器化的考虑,我们当前选择了**在云上的 CVM 上进行 StreamPark 的快速搭建**,以下是
StreamPark 的安装部署脚本:
+
+```shell
+#使用 root 用户进入安装目录(需先有 java 环境)和 k8s 环境同一个网段!!!可以省事
+cd /data
+
+#下载二进制安装包
+wget
https://dlcdn.apache.org/incubator/streampark/2.1.4/apache-streampark_2.12-2.1.4-incubating-bin.tar.gz
+
+#与提供的值进行比较、保证包的完成性
+sha512sum apache-streampark_2.12-2.1.2-incubating-bin.tar.gz #解压缩
+tar -xzvf apache-streampark_2.12-2.1.2-incubating-bin.tar.gz #改名
+mv apache-streampark_2.12-2.1.2-incubating-bin streampark_2.12-2.1.2 #创建软链
+ln -s streampark_2.12-2.1.2 streampark
+
+#修改数据库配置
+vim /data/streampark/conf/application-mysql.ymlspring:
+ datasource:
+ username: root
+ password: xxxxx
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ url: jdbc:mysql://10.16.x.x:3306/streampark_k8s?...
+
+# 修改 streampark 核心配置 可以修改下端口(因为 k8s nodeport 端口范围 30000-32767) 故可以配置这个范围
省去端口对外开发再配置 或者账号密码 我们存储用 cos 故不配置 hdfs
+vim /data/streampark/conf/application.yml workspace:
+ local: /data/streampark/streampark_workspace #创建需要的目录
+
+
+mkdir -p vim /data/streampark/conf/application.yml
+
+#初始化 streampark 的数据库表 要求环境有 mysql 客户端 执行完可以去 mysql 中看下库表是否存在
+mysql -u root -p xxxxx < /data/streampark/script/schema/mysql-schema.sql
+
+#添加依赖的 mysql-connector 包 因为 streampark lib下面没有 在 /data/streampark/lib下执行
+wget
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
+
+#启动并登陆 可以先验证下 java 环境 java -version 确保没问题 在 /data/streampark/lib下执行
+ ./startup.sh
+```
+
+### **3.3. 安装 Maven**
+
+在 StreamPark 上配置 Flink 任务时可以通过 maven 下载相关依赖,下面是 Maven 的安装配置:
+
+```shell
+#下载 maven 同 root 用户操作
+cd /data && wget
https://dlcdn.apache.org/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz
+
+#检验安装包完整
+sha512sum apache-maven-3.9.6-bin.tar.gz
+
+#解压 软连
+tar -xzvf apache-maven-3.9.6-bin.tar.gz
+ln -s apache-maven-3.9.6 maven
+
+# 设置阿里云仓库 maven mirror,加速下载
+vim /data/maven/conf/settings.xml
+
+<mirror>
+ <id>alimaven</id>
+ <mirrorOf>central</mirrorOf>
+ <name>aliyun maven</name>
+ <url>https://maven.aliyun.com/repository/central</url>
+</mirror>
+
+<mirror>
+ <id>alimaven</id>
+ <name>aliyun maven</name>
+ <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
+ <mirrorOf>central</mirrorOf>
+ </mirror>
+
+#环境变量 最后添加
+vim /etc/profile
+export MAVEN_HOME=/data/maven
+PATH=$MAVEN_HOME/bin:$PATH
+
+#生效 验证
+source /etc/profile && mvn -version
+```
+
+### **3.4 安装 Docker**
+
+因为 StreamPark 使用 Kubernetes 时,可以使用 Docker 来构建和上传镜像到公司常用的 Harbor 镜像服务中,以下是
Docker 的安装脚本
+
+```shell
+#卸载旧版本 如果有
+sudo yum remove docker \
+ docker-client \
+ docker-client-latest \
+ docker-common \
+ docker-latest \
+ docker-latest-logrotate \
+ docker-logrotate \
+ docker-engine
+
+#下载镜像仓库
+wget -O /etc/yum.repos.d/docker-ce.repo
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#阿里云docker-ce 镜像
+
+#更新软件包索引
+yum makecache
+
+#下载安装
+yum install docker-ce docker-ce-cli containerd.io -y # docker-ce 社区版 ee 企业版
+
+#启动 查看
+systemctl start docker && docker version
+
+#开机自启
+systemctl enable docker
+```
+
+### **3.5. Kubectl 工具安装**
+
+为了方便与 Kubernetes 集群进行交互、进行配置查看、访问日志等,我们还安装了kubectl 工具。安装 kubectl 前,首先需要先从这里获取
Kubernetes 集群的访问凭证,下载方式如下图:
+
+
+
+接着下载配置拷贝到 root 用户目录,指定目录并改名:
+
+```shell
+# 1)创建默认隐藏目录
+mkdir -p /root/.kube
+
+# 2)上传 k8s 内网访问凭证
+rz cls-dtyxxxxl-config
+
+# 3)修改凭证默认名字
+mv cls-dtyxxxxl-config config
+
+# 4)将凭证放到指定位置
+mv config /root/.kube
+```
+
+具体安装 kubetcl 客户端命令如下(当然也可以安装 **kubectx** ,更加方便地去访问多个 kubernetes 集群和切换固定的命名空间):
+
+```shell
+# 1)下载最新版
+curl -LO "https://dl.k8s.io/release/$(curl -L -s
https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
+
+# 2)添加执行权限
+chmod +x kubectl
+
+# 3)移动到常用的工具命令位置
+mv ./kubectl /usr/local/bin/kubectl
+
+# 4)可以在 /etc/profile 或者 root用户家目录的 .bashrc 中配置
+vim /root/.bashrc
+export JAVA_HOME=/usr/local/jdk1.8.0_391
+PATH=/usr/local/bin:$JAVA_HOME/bin:$PATH
+
+# 5)验证客户端和集群密钥
+kubectl cluster-info
+```
+
+最后,还需要创建一个 Flink 专用的账户凭证(后面会用到):
+
+```shell
+# 1)创建命名空间
+kubectl create namespace flink
+
+# 2)创建 flink 访问 k8s 的账号 记得带命名空间!
+kubectl create serviceaccount flink-service-account -n flink
+
+# 3)给该账号绑定容器操作的一些权限 记得带命名空间!!!
+kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit
--serviceaccount=flink:flink-service-account -n flink
+```
+
+### **3.6. 安装配置 Flink**
+
+我们 Kubernetes 环境选择了 Flink1.17-scala2.12 Java11 的镜像,因为当时 Flink 1.18 有些依赖包不好找。
+
+下载并解压 flink-1.17.2-bin-scala_2.12.tgz 安装包,脚本如下:
+
+```shell
+cd /data && wget
https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
+
+#完整性校验
+sha512sum flink-1.17.2-bin-scala_2.12.tgz
+
+#解压
+tar -xzvf flink-1.17.2-bin-scala_2.12.tgz
+
+#改名
+mv flink-1.17.2-bin-scala_2.12 flink-1.17.2
+
+#软连
+ln -s flink-1.17.2 flink
+
+#旧二进制包收纳
+mv flink-1.17.2-bin-scala_2.12.tgz /data/softbag
+```
+
+至此,我们已经把环境搭建好了,下面继续按步骤详细讲解 StreamPark 的使用。
+
+## **4. 落地实践 - StreamPark 使用篇**
+
+想快速了解 StreamPark 运行作业至 Kubernetes 的读者,可以观看下面的视频:
+
+//视频链接 (Flink On Kubernetes Application 上手教程)
+
+//视频链接 (Flink On Kubernetes Session 上手教程)
+
+### **4.1. 配置 Flink Home**
+
+登录 StreamPark 之后,切换至 Flink Home 菜单,配置我们在前面解压的 Flink 安装包目录,截图如下:
+
+
+
+### **4.2. 创建 Flink Session**
+
+接着切换到 FlinkCluster,Add New 新增一个 Flink 集群:
+
+
+
+配置详情如下,以下是第一部分的配置内容:
+
+**配置详解**:集群名字和 Kubernetes 集群 ID 我们通常写一样,填写正确的 Kubernetes 服务账号,这里的镜像使用腾讯云 tcr
中的镜像,并使用 lb 作为对外访问 Flink UI 的方式。这里的槽一般设置为 2,任务并行度一般是 2 的倍数,这样就可以让 Session
集群没有闲置的资源)
+
+
+
+第二部分的配置内容:
+
+**配置详解**:我们采取的是 Session 模式,一个 JobManager 管理着很多任务,故资源可以稍微大一些,我们给
2G,经观察可以满足,但是由于任务多,JobManager 的内存分配中 metaspace 消耗比较大,故可以将默认的 256M,改到
500M,经过观察,一个 Session 管理 10 个任务以下,这个配置合理。
+
+TaskManager 内存在 session 配置给出后,在 StreamPark 程序配置页将无法修改,故可以对 Session
项目数据量进行预估,数据量大就可以 把 “CPU : 内存” 调整到 “1 核 : 2G”,当然也可以更大,因为数据处理还有 TaskManager
数量有关,也就是并行度有关,这样基本就可以避免一些 OOM 问题,我们 “1 : 2” Session 集群项目一天处理数据量 60 亿,18
个并行度,仅供参考。默认 cpu: 内存 1 : 1 基本 ok。
+
+
+
+最后一部分的内容如下:
+
+
+
+这里贴出具体的文本内容:
+
+```shell
+#访问方式 复用lb+port lb可以提前创建 不用k8s创建 这样ip固定。复用后更加方便
+-Drest.port=8091
+-Dkubernetes.rest-service.annotations=service.kubernetes.io/qcloud-share-existed-lb:true,service.kubernetes.io/tke-existed-lbid:lb-iv9ixxxxx
+-Dkubernetes.rest-service.exposed.type=LoadBalancer
+
+#cp sp 使用s3协议访问腾讯云cos 插件方式依赖包 s3的cos认证
+-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dcontainerized.taskmanager.env.EANABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dstate.checkpoints.dir=s3://k8s-bigdata-xxxxxx/flink-checkpoints
+-Dstate.savepoints.dir=s3://k8s-bigdata-xxxxx/flink-savepoints
+
+#k8s中的任务调度策略、节点选择策略,因为flink命名空间是逻辑的,而节点选择策略可以让其跑在flink专用的物理超级节点上去
+-Dkubernetes.jobmanager.node-selector=usefor:flink
+-Dkubernetes.taskmanager.node-selector=usefor:flink
+
+#开启jobmanager高可用 使用 k8s实现方式、 使用cos的目录存储 整体就不用hdfs
+-Dhigh-availability.type=kubernetes
+-Dhigh-availability.storageDir=s3://k8s-bigdata-xxx/flink-recovery
+-Dkubernetes.cluster-id=streampark-share-session
+-Dkubernetes.jobmanager.replicas=2
+
+#pod镜像拉取策略和flink取消cp的保留 时区
+-Dkubernetes.container.image.pull-policy=Always
+-Dexecution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
+-Dtable.local-time-zone=Asia/Shanghai
+
+#镜像拉取缓存加速 详细下文有解释
+-Dkubernetes.taskmanager.annotations=eks.tke.cloud.tencent.com/use-image-cache:
imc-6iubofdt
+-Dkubernetes.jobmanager.annotations=eks.tke.cloud.tencent.com/use-image-cache:
imc-6iubofdt
+```
+
+备注:Flink 基础信息的可以配置到 conf 文件中,更多程序通用的建议在 session 中写好,StreamPark 中填写
并行度/cp/sp/容错/main 参数等比较合适。
+
+最后启动 Session 集群即可,可以通过 kubectl 命令来查看 Flink kubernetes session 集群是否成功启动:
+
+
+
+### **4.3. 任务配置**
+
+这里将继续展示使用 StreamPark 提交3种类型的任务,分别是 **SQL任务、Jar任务 以及 Application 模式任务**。
+
+- **提交 Flink SQL 任务**
+
+Flink Sql 作业的配置截图如下:
+
+
+
+其中的 kubernetes clusterid 填写对应的项目 session 名就行:
+
+
+
+动态参数配置如下:
+
+```shell
+-Dstate.checkpoints.dir=s3://k8s-bigdata-12xxxxx/flink-checkpoints/ro-cn-sync
+-Dstate.savepoints.dir=s3://k8s-bigdata-12xxxxxxx/flink-savepoints/ro-cn-sync
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=25s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dtable.local-time-zone=Asia/Shanghai
+-Drestart-strategy.fixed-delay.attempts=10
+-Drestart-strategy.fixed-delay.delay=3min
+```
+
+- **提交 Flink JAR 任务**
+
+Jar 任务基本和 Sql 任务基本一样,配置截图如下:
+
+
+
+动态参数如下(如果有配置模版,这里就可以更加精简,因为 StreamPark 支持很好的任务复制,故可以写一标准的 Jar 和 SQL 的任务
demo,剩下任务都采用复制,可大大提高任务开发效率):
+
+```shell
+-Dstate.checkpoints.dir=s3://k8s-bigdata-1xxxx/flink-checkpoints/ro-cn-shushu
+-Dstate.savepoints.dir=s3://k8s-bigdata-1xxxx/flink-savepoints/ro-cn-shushu
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=60s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dstate.checkpoints.num-retained=2
+-Drestart-strategy.type=failure-rate
+-Drestart-strategy.failure-rate.delay=3min
+-Drestart-strategy.failure-rate.failure-rate-interval=30min
+-Drestart-strategy.failure-rate.max-failures-per-interval=8
+```
+
+- **提交 Flink Application 任务**
+
+当然也可以部署 Application 模式的任务至 Kubernetes,这里贴上示例,首先是第一部分的配置内容:
+
+
+
+这是第二部分的配置内容如下:
+
+
+
+第三部分的配置内容如下:
+
+
+
+动态参数:
+
+
+
+详细的配置内容:
+
+```shell
+-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.2.jar
+-Dstate.checkpoints.dir=s3://k8s-bigdata-xxx/flink-checkpoints/Kafka2Rocketmq2Kafka
+-Dstate.savepoints.dir=s3://k8s-bigdata-xxx/flink-savepoints/Kafka2Rocketmq2Kafka
+-Dkubernetes.container.image.pull-policy=Always
+-Dkubernetes.service-account=flink-service-account
+-Dexecution.checkpointing.interval=60s
+-Dexecution.checkpointing.mode=EXACTLY_ONCE
+-Dstate.checkpoints.num-retained=2
+-Drest.port=8092
+-Dkubernetes.jobmanager.node-selector=usefor:flink
+-Dkubernetes.taskmanager.node-selector=usefor:flink
+-Dkubernetes.rest-service.annotations=service.kubernetes.io/qcloud-share-existed-lb:true,service.kubernetes.io/tke-existed-lbid:lb-xxx
+-Dkubernetes.rest-service.exposed.type=LoadBalancer
+-Dfs.allowed-fallback-filesystems=s3
+```
+
+### **4.4. 构建任务**
+
+接着对任务进行构建,这里可以很清晰的看到任务构建的每一步骤:
+
+
+
+最终可以跑起来观察到任务正在运行:
+
+
+
+### **4.5. 通知告警配置(非必选)**
+
+当然 StreamPark 支持多种通知告警方式(例如:邮箱和飞书等),我们使用的是邮箱的方式,直接界面配置即可:
+
+
+
+至此,我们已经分享了如何在 StremPark 中成功地去部署运行在 kubernetes 不同类型的 Flink任务,十分的丝滑!
+
+## **5. 落地实践 - 作业应用场景篇**
+
+接下来,继续针对作业进行剖析,从调优、实操等场景中让我们更进一步地去了解 Flink 任务的应用。
+
+### **5.1. 调优建议**
+
+**建议一(时区)**:容器中需要以utc+8方便看日志,我们可以在conf配置中添加如下:
+
+```shell
+env.java.opts.jobmanager: -Duser.timezone=GMT+08
+env.java.opts.taskmanager: -Duser.timezone=GMT+08
+```
+
+**建议二(StreamPark变量管理)**:StreamPark 提供变量管理、让我们可以管理一些配置,提高安全和便捷性:
+
+
+
+**建议三(StreamPark传参)**:在 on YARN 上使用 Flink自带的工具类传参数,参数难传递到 Flink
环境去,我们也没深究。我们选择通过 StreamPark 传入程序参数,这样在 YARN 和 kubernetes 都可以用,**同时在 kubernetes
的 application 和 session 方式也都可以用,变得更加通用且改动方便,强烈推荐**!
+
+
+
+**建议四(使用Session模式)**:尽管 Yarn-application模式提供了高度的任务隔离,但其每个任务都需要独立的 jobmanager
资源,这导致资源消耗过高。此外,合并任务虽然可以减少资源消耗,但会带来工作量大、数据不稳定等问题。相比之下,session 模式允许任务在一个 session
中共享一个 jobmanager 资源,这可以最大限度地节约计算资源,虽然它可能存在单点问题,但可以通过使用两个 jobmanager
实例实现高可用性来解决。更重要的是,session
模式还可以更方便地查看因任务失败而产生的日志,有助于提高故障诊断的效率。因此,鉴于其在资源利用、高可用性和任务故障处理方面的优势,选择 session
模式成为了一种更为理想的选择。
+
+### **5.2. 数据CDC同步**
+
+**【MySQL → Doris】**:下面代码主要演示了 **MySQL 数据同步至 Apache Doris** 的 Flink sql 流水线:
+
+```sql
+CREATE TABLE IF NOT EXISTS `login_log_mysql` (
+ login_log_id bigint not null,
+ account_id bigint ,
+ long_account_id string ,
+ short_account_id string ,
+ game_code string ,
+ package_code string ,
+ channel_id int ,
+ login_at int ,
+ login_ip string ,
+ device_ua string ,
+ device_id string ,
+ device_net string ,
+ device_ratio string ,
+ device_os string ,
+ device_carrier string ,
+ ext string ,
+ created_at int ,
+ updated_at int ,
+ deleted_at int ,
+ PRIMARY KEY(`login_log_id`)
+ NOT ENFORCED
+) with (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = '10.xxx.xxx.xxx',
+ 'port' = '3306',
+ 'username' = 'http_readxxxxx',
+ 'password' = 'xxxx@)',
+ 'database-name' = 'player_user_center',
+ -- 'scan.startup.mode' = 'latest-offset',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'table-name' = 'login_log_202[3-9][0-9]{2}',
+ 'server-time-zone' = '+07:00'
+);
+
+create table if not EXISTS login_log_doris(
+ `date` date ,
+ login_log_id bigint ,
+ account_id bigint ,
+ long_account_id string ,
+ short_account_id string ,
+ game_code string ,
+ `package_code` string ,
+ `channel_id` int ,
+ login_at string ,
+ login_ip string ,
+ device_ua string ,
+ device_id string ,
+ device_net string ,
+ device_ratio string ,
+ device_os string ,
+ device_carrier string ,
+ ext string ,
+ created_at string ,
+ updated_at string ,
+ deleted_at string ,
+ PRIMARY KEY(`date`,`login_log_id`)
+ NOT ENFORCED
+) WITH (
+ 'connector' = 'doris',
+
'jdbc-url'='jdbc:mysql://xxx.xx.xx.xx:9030,xxx.xxx.xxx.xxx:9030,xxx.xxx.xxx.xxx:9030',
+ 'load-url'='xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030',
+ 'database-name' = 'ro_sea_player_user_center',
+ 'table-name' = 'ods_login_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once',
+ 'sink.max-retries' = '10'
+);
+
+create view login_log_flink_trans as
+select
+ to_date(cast(to_timestamp_ltz(login_at,0) as varchar) ) ,
+ login_log_id ,
+ account_id ,
+ long_account_id ,
+ short_account_id ,
+ game_code ,
+ package_code ,
+ channel_id,
+ cast(to_timestamp_ltz(login_at,0) as varchar) as login_at,
+ login_ip ,
+ device_ua ,
+ device_id ,
+ device_net ,
+ device_ratio ,
+ device_os ,
+ device_carrier ,
+ ext ,
+ cast(to_timestamp_ltz(created_at,0) as varchar) as created_at,
+ cast(to_timestamp_ltz(updated_at,0) as varchar) as updated_at,
+ cast(to_timestamp_ltz(deleted_at,0) as varchar) as deleted_at
+from login_log_mysql;
+
+insert into login_log_doris select * from login_log_flink_trans;
+
+CREATE TABLE IF NOT EXISTS `account_mysql` (
+ account_id bigint ,
+ open_id string ,
+ dy_openid string ,
+ dy_ios_openid string ,
+ ext string ,
+ last_channel int ,
+ last_login_time int ,
+ last_login_ip string ,
+ created_at int ,
+ updated_at int ,
+ deleted_at string ,
+ PRIMARY KEY(`account_id`)
+ NOT ENFORCED
+) with (
+ 'connector' = 'mysql-cdc',
+ 'hostname' = 'xxx.xx.xx.xx',
+ 'port' = '3306',
+ 'username' = 'http_readxxxx',
+ 'password' = 'xxxxx@)',
+ 'database-name' = 'player_user_center',
+ -- 'scan.startup.mode' = 'latest-offset',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'table-name' = 'account_[0-9]+',
+ 'server-time-zone' = '+07:00'
+);
+
+create table if not EXISTS account_doris(
+ `date` date ,
+ account_id bigint ,
+ open_id string ,
+ dy_openid string ,
+ dy_ios_openid string ,
+ ext string ,
+ `last_channel` int ,
+ last_login_time string ,
+ last_login_ip string ,
+ created_at string ,
+ updated_at string ,
+ deleted_at string ,
+ PRIMARY KEY(`date`,`account_id`)
+ NOT ENFORCED
+) WITH (
+ 'connector' = 'doris',
+
'jdbc-url'='jdbc:mysql://xxx.xx.xx.xx:9030,xxx.xx.xx.xx:9030,xxx.xx.xx.xx:9030',
+ 'load-url'='xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030;xxx.xx.xx.xx:8030',
+ 'database-name' = 'ro_sea_player_user_center',
+ 'table-name' = 'ods_account_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once',
+ 'sink.max-retries' = '10'
+);
+
+create view account_flink_trans as
+select
+ to_date(cast(to_timestamp_ltz(created_at,0) as varchar) ) ,
+ account_id ,
+ open_id ,
+ dy_openid ,
+ dy_ios_openid ,
+ ext ,
+ last_channel,
+cast(to_timestamp_ltz(last_login_time,0) as varchar) as last_login_time,
+ last_login_ip ,
+ cast(to_timestamp_ltz(created_at,0) as varchar) as created_at,
+ cast(to_timestamp_ltz(updated_at,0) as varchar) as updated_at,
+ deleted_at
+from account_mysql;
+
+insert into account_doris select * from account_flink_trans;
+```
+
+**【游戏数据同步】**:将海外 **filebeat 采集的 kafka 数据拉到国内 kafka**,并进行处理元数据加工:
+
+```sql
+--创建hmt 的kafka source 表
+CREATE TABLE kafka_in (
+ `env` STRING comment '游戏环境 hdf /qc/cbt/obt/prod',
+ `host` STRING comment '日志所在游戏服务器',
+ `tags` STRING comment '日志标签 normal | fix',
+ `log` STRING comment '日志所在文件及偏移量',
+ `topic` STRING METADATA VIRTUAL comment 'kafka的topic',
+ `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL comment
'日志所在的kafka分区',
+ `offset` BIGINT METADATA VIRTUAL comment 'kafka分区的偏移量',
+ `uuid` STRING comment 'filebeat生成的uuid',
+ `message` STRING comment 'tlog信息',
+ `@timestamp` STRING comment 'filebeat采集时间',
+ `ts` TIMESTAMP(3) METADATA FROM 'timestamp' comment 'kafka存储消息时间'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog',
+ 'properties.bootstrap.servers' =
'sea-kafka-01:9092,sea-kafka-02:9092,sea-kafka-03:9092',
+ 'properties.group.id' = 'streamx-ro-sea-total',
+ 'properties.client.id' = 'streamx-ro-sea-total',
+ 'properties.session.timeout.ms' = '60000',
+ 'properties.request.timeout.ms' = '60000',
+ 'scan.startup.mode' = 'group-offsets',
+ --'scan.startup.mode' = 'earliest-offset',\
+ 'properties.fetch.max.bytes' = '123886080',
+ 'properties.max.partition.fetch.bytes' = '50388608',
+ 'properties.fetch.max.wait.ms' = '2000',
+ 'properties.max.poll.records' = '1000',
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'false'
+);
+
+--创建emr 的kafka 正式表
+CREATE TABLE kafka_out_sea (
+`env` STRING comment '游戏环境 qc/cbt/obt/prod',
+`hostname` STRING comment '日志所在游戏服务器',
+`tags` STRING comment '日志标签 normal | fix',
+`log_offset` STRING comment '日志所在文件及偏移量',
+`uuid` STRING comment 'filebeat生成的uuid',
+`topic` STRING comment 'kafka的topic',
+`partition_id` BIGINT comment '日志所在的kafka分区',
+`kafka_offset` BIGINT comment 'kafka分区的偏移量',
+ eventname string comment '事件名',
+`message` STRING comment 'tlog信息',
+`filebeat_ts` STRING comment 'filebeat采集时间',
+`kafka_ts` TIMESTAMP(3) comment 'kafka存储消息时间',
+`flink_ts` TIMESTAMP(3) comment 'flink处理的消息时间'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog',
+ --'properties.client.id' = 'flinkx-ro-sea-prod-v2',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'format' = 'json',
+ --'sink.partitioner'= 'fixed',
+ 'sink.delivery-guarantee' = 'exactly-once',
+ 'sink.transactional-id-prefix' = 'ro_sea_kafka_sync_v10',
+ 'properties.compression.type' = 'zstd',
+ 'properties.transaction.timeout.ms' = '600000',
+ 'properties.message.max.bytes' = '13000000',
+ 'properties.max.request.size' = '13048576',
+ --'properties.buffer.memory' = '83554432',
+ 'properties.acks' = '-1'
+);
+
+--etl 创建目标视图
+create view kafka_sea_in_view as
+select `env`,JSON_VALUE(`host`,'$.name') as hostname,`tags`,`log` as
log_offset,`uuid`,
+ `topic`,`partition_id`,`offset` as kafka_offset,
+ lower(SPLIT_INDEX(message,'|',0)) as eventname,`message`,
+ CONVERT_TZ(REPLACE(REPLACE(`@timestamp`,'T',' '),'Z',''), 'UTC',
'Asia/Bangkok') as filebeat_ts, `ts` as kafka_ts ,CURRENT_TIMESTAMP as flink_ts
+from kafka_in;
+
+--写数据到emr sea topic
+insert into kafka_out_sea
+select * from kafka_sea_in_view ;
+```
+
+**【实时数仓】**:分流 Jar 程序和 ods -> dwd sql 加工函数,部分代码截图如下:
+
+
+
+以及相关的 Flink sql:
+
+```sql
+--2 dwd_moneyflow_log
+CREATE TABLE kafka_in_money (
+`env` STRING comment '游戏环境 qc/cbt/obt/prod',
+`hostname` STRING comment '日志所在游戏服务器',
+`uuid` STRING comment '日志唯一id',
+`message` STRING comment 'tlog信息',
+`filebeat_ts` STRING comment 'filebeat采集时间'
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ro_sea_tlog_split_moneyflow',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.group.id' = 'flinksql_kafka_in_moneyflow_v2',
+ 'scan.startup.mode' = 'group-offsets',
+ --'scan.startup.mode' = 'earliest-offset',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'false'
+);
+
+CREATE TABLE kafka_out_money(
+ `date` string not NULL COMMENT '日期',
+ `vroleid` int NULL COMMENT '角色ID',
+ `moneytype` int NULL COMMENT '货币类型',
+ `env` string NULL COMMENT '游戏环境 hdf /qc/cbt/obt/prod',
+ `hostname` string NULL COMMENT '日志所在游戏服务器',
+ `uuid` string NULL COMMENT 'filebeat 采集生成的uuid',
+ `filebeat_ts` string NULL COMMENT 'filebeat采集时间',
+ flink_ts string NULL COMMENT 'flink写入的消息时间',
+ `gamesvrid` int NULL COMMENT '登录游戏服务器编号',
+ `dteventtime` string NULL COMMENT '游戏事件的时间, 格式 YYYY-MM-DD HH:MM:SS',
+ `vgameappid` string NULL COMMENT '游戏APPID',
+ `platid` int NULL COMMENT 'ios 0 /android 1',
+ `izoneareaid` int NULL COMMENT '针对分区分服的游戏填写分区id,用来唯一标示一个区;非分区分服游戏请填写0',
+ `vopenid` string NULL COMMENT '用户OPENID号',
+ `vrolename` string NULL COMMENT '角色姓名',
+ `jobid` int NULL COMMENT '角色职业 0=巫师 1=……',
+ `gender` int NULL COMMENT '角色性别 0=男 1=女',
+ `ilevel` int NULL COMMENT '角色基础等级',
+ `ijoblevel` int NULL COMMENT '角色职业等级',
+ `playerfriendsnum` int NULL COMMENT '玩家好友数量',
+ `chargegold` int NULL COMMENT '角色充值经历(累计充值)',
+ `iviplevel` int NULL COMMENT '角色VIP等级',
+ `createtime` string NULL COMMENT '账号创建时间',
+ `irolece` int NULL COMMENT '战力/评分',
+ `unionid` int NULL COMMENT '公会ID',
+ `unionname` string NULL COMMENT '公会名称',
+ `regchannel` int NULL COMMENT '注册渠道',
+ `loginchannel` int NULL COMMENT '登录渠道',
+ `sequence` int NULL COMMENT '用于关联一次动作产生多条不同类型的货币流动日志',
+ `reason` int NULL COMMENT '行为(货币流动一级原因)',
+ `subreason` int NULL COMMENT '流向(物品流向定义)',
+ `imoney` int NULL COMMENT '货币变更数量',
+ `aftermoney` int NULL COMMENT '动作后的金钱数',
+ `afterboundmoney` int NULL COMMENT '动作后的绑定金钱数',
+ `addorreduce` int NULL COMMENT '增加或减少: 0是增加; 1是减少',
+ `serialnum` string NULL COMMENT '流水号',
+ `sourceid` int NULL COMMENT '渠道号',
+ `cmd` string NULL COMMENT '命令字',
+ `orderid` string NULL COMMENT '订单id(包含喵果增加和喵果减少,也包含现金充值订单id)',
+ `imoneytype` int NULL COMMENT '货币类型2',
+ `distincid` string NULL COMMENT '访客ID',
+ `deviceuid` string NULL COMMENT '设备ID',
+ `guildjob` int NULL COMMENT '公会职位',
+ `regtime` string NULL COMMENT '账号注册时间'
+) WITH (
+ 'connector' = 'doris',
+ 'jdbc-url'='jdbc:mysql://xxx:9030,xxx:9030,xxx:9030',
+ 'load-url'='xx:8030;xxx:8030;xxx:8030',
+ 'database-name' = 'ro_sea',
+ 'table-name' = 'dwd_moneyflow_log',
+ 'username' = 'root',
+ 'password' = 'root123',
+ 'sink.semantic' = 'exactly-once'
+);
+
+create view kafka_out_money_view1 as
+select IF(to_date(SPLIT_INDEX(message,'|',2)) > CURRENT_DATE + interval '1'
day,CURRENT_DATE,to_date(SPLIT_INDEX(message,'|',2))) as `date`,
+ `env`,`hostname` as hostname,`uuid`,
+ try_cast(SPLIT_INDEX(message,'|',1) as int) as gamesvrid ,
+ SPLIT_INDEX(message,'|',2) as dteventtime ,
+ SPLIT_INDEX(message,'|',3) as vgameappid ,
+ try_cast(SPLIT_INDEX(message,'|',4) as int) as platid
,
+ try_cast(SPLIT_INDEX(message,'|',5) as int) as izoneareaid
,
+ SPLIT_INDEX(message,'|',6) as vopenid ,
+ try_cast(SPLIT_INDEX(message,'|',7) as int) as vroleid
,
+ SPLIT_INDEX(message,'|',8) as vrolename ,
+ try_cast(SPLIT_INDEX(message,'|',9) as int) as jobid
,
+ try_cast(SPLIT_INDEX(message,'|',10) as int) as gender
,
+ try_cast(SPLIT_INDEX(message,'|',11) as int) as ilevel
,
+ try_cast(SPLIT_INDEX(message,'|',12) as int) as ijoblevel
,
+ try_cast(SPLIT_INDEX(message,'|',13) as int) as playerfriendsnum
,
+ try_cast(SPLIT_INDEX(message,'|',14) as int) as chargegold
,
+ try_cast(SPLIT_INDEX(message,'|',15) as int) as iviplevel
,
+ SPLIT_INDEX(message,'|',16) as createtime ,
+ try_cast(SPLIT_INDEX(message,'|',17) as int) as irolece
,
+ try_cast(SPLIT_INDEX(message,'|',18) as int) as unionid
,
+ SPLIT_INDEX(message,'|',19) as unionname ,
+ try_cast(SPLIT_INDEX(message,'|',20) as int) as regchannel
,
+ try_cast(SPLIT_INDEX(message,'|',21) as int) as loginchannel
,
+ try_cast(SPLIT_INDEX(message,'|',22) as int) as `sequence`
,
+ try_cast(SPLIT_INDEX(message,'|',23) as int) as `reason`
,
+ try_cast(SPLIT_INDEX(message,'|',24) as int) as subreason
,
+ try_cast(SPLIT_INDEX(message,'|',25) as int) as moneytype
,
+ try_cast(SPLIT_INDEX(message,'|',26) as int) as imoney
,
+ try_cast(SPLIT_INDEX(message,'|',27) as int) as aftermoney
,
+ try_cast(SPLIT_INDEX(message,'|',28) as int) as afterboundmoney
,
+ try_cast(SPLIT_INDEX(message,'|',29) as int) as addorreduce
,
+ SPLIT_INDEX(message,'|',30) as serialnum ,
+ try_cast(SPLIT_INDEX(message,'|',31) as int) as sourceid
,
+ SPLIT_INDEX(message,'|',32) as `cmd` ,
+ SPLIT_INDEX(message,'|',33) as orderid ,
+ try_cast(SPLIT_INDEX(message,'|',34) as int) as imoneytype
,
+ SPLIT_INDEX(message,'|',35) as distincid ,
+ SPLIT_INDEX(message,'|',36) as deviceuid ,
+ try_cast(SPLIT_INDEX(message,'|',37) as int) as guildjob
,
+ SPLIT_INDEX(message,'|',38) as regtime ,
+ filebeat_ts,CURRENT_TIMESTAMP as flink_ts
+from kafka_in_money;
+
+insert into kafka_out_money
+select
+cast(`date` as varchar) as `date`,
+`vroleid`,
+`moneytype`,
+`env`,
+`hostname`,
+`uuid`,
+`filebeat_ts`,
+cast(`flink_ts` as varchar) as flink_ts,
+`gamesvrid`,
+`dteventtime`,
+`vgameappid`,
+`platid`,
+`izoneareaid`,
+`vopenid`,
+`vrolename`,
+`jobid`,
+`gender`,
+`ilevel`,
+`ijoblevel`,
+`playerfriendsnum`,
+`chargegold`,
+`iviplevel`,
+`createtime`,
+`irolece`,
+`unionid`,
+`unionname`,
+`regchannel`,
+`loginchannel`,
+`sequence`,
+`reason`,
+`subreason`,
+`imoney`,
+`aftermoney`,
+`afterboundmoney`,
+`addorreduce`,
+`serialnum`,
+`sourceid`,
+`cmd`,
+`orderid`,
+`imoneytype`,
+`distincid`,
+`deviceuid`,
+`guildjob`,
+`regtime`
+from kafka_out_money_view1;
+```
+
+**【广告系统】:使用 Flink SQL 将一行日志数据进行切分解析出各个字段的数据并完成业务计算:**
+
+```sql
+CREATE TABLE IF NOT EXISTS ods_adjust_in (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ad_adjust_callback',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.group.id' = 'ods_adjust_etl_20231011',
+ 'properties.max.partition.fetch.bytes' = '4048576',
+ 'scan.startup.mode' = 'group-offsets',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'raw'
+);
+
+CREATE TABLE IF NOT EXISTS ods_af_in (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ad_af_callback',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'properties.max.partition.fetch.bytes' = '4048576',
+ 'properties.group.id' = 'ods_af_etl_20231011',
+ 'scan.startup.mode' = 'group-offsets',
+ 'properties.isolation.level' = 'read_committed',
+ 'format' = 'raw'
+);
+
+CREATE TABLE IF NOT EXISTS ods_mmp_out (
+ `date` DATE,
+ `mmp_type` STRING,
+ `app_id` STRING,
+ `event_name` STRING,
+ `event_time` TIMESTAMP(3),
+ `mmp_id` STRING,
+ `distinct_id` STRING,
+ `open_id` STRING,
+ `account_id` STRING,
+ `os_name` STRING, -- platform
+ `country_code` STRING,
+ `install_time` TIMESTAMP(3),
+ `bundle_id` STRING,
+ `media` STRING,
+ `channel` STRING,
+ `campaign` STRING,
+ `campaign_id` STRING,
+ `adgroup` STRING,
+ `adgroup_id` STRING,
+ `ad` STRING,
+ `ad_id` STRING,
+ `flink_ts` TIMESTAMP(3) comment 'flink处理的消息时间',
+ `device_properties` STRING,
+ `log` STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'ods_mmp_log',
+ 'properties.bootstrap.servers' =
'emr_kafka1:9092,emr_kafka2:9092,emr_kafka3:9092',
+ 'scan.topic-partition-discovery.interval'='60000',
+ 'properties.group.id' = 'mmp2etl-out',
+ 'format' = 'json'
+);
+
+INSERT INTO ods_mmp_out
+SELECT
+ `date`
+ ,'Adjust' as `mmp_type`
+ ,`app_token` as `app_id`
+ ,`event_name`
+ ,`event_time`
+ ,`adid` as `mmp_id`
+ ,`distinct_id`
+ ,`open_id`
+ ,`account_id`
+ ,`os_name`
+ ,`country_code`
+ ,`install_time`
+ ,'' as `bundle_id`
+ ,'' as `media`
+ ,`network` as `channel`
+ ,`campaign`
+ ,REGEXP_EXTRACT(`campaign`, '([\\(=])([a-z0-9]+)', 2) as `campaign_id`
+ ,`adgroup`
+ ,REGEXP_EXTRACT(`adgroup`, '([\\(=])([a-z0-9]+)', 2) as `adgroup_id`
+ ,`creative` as `ad`
+ ,REGEXP_EXTRACT(`creative`, '([\\(=])([a-z0-9]+)', 2) as `ad_id`
+ ,`flink_ts`
+ ,`device_properties`
+ ,`log`
+FROM (
+ SELECT
+ to_date(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.created_at') as bigint))) as
`date`
+ ,JSON_VALUE(log,'$.app_token') as `app_token`
+ ,JSON_VALUE(log,'$.adid') as `adid`
+ ,LOWER(REPLACE( TRIM( COALESCE(JSON_VALUE(log,'$.event_name'),
JSON_VALUE(log,'$.activity_kind')) ), ' ', '_')) as `event_name`
+ ,TO_TIMESTAMP(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.created_at') as
bigint))) as `event_time`
+ ,TO_TIMESTAMP(FROM_UNIXTIME(cast(JSON_VALUE(log,'$.installed_at') as
bigint))) as `install_time`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
COALESCE(JSON_VALUE(log,'$.fb_install_referrer_publisher_platform'),'facebook')
ELSE JSON_VALUE(log,'$.network_name') END) AS `network`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_campaign_group_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_campaign_group_id'), ')')) ELSE
JSON_VALUE(log,'$.campaign_name') END) AS `campaign`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_campaign_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_campaign_id'), ')')) ELSE
JSON_VALUE(log,'$.adgroup_name') END) AS `adgroup`
+ ,LOWER(CASE WHEN (JSON_VALUE(log,'$.network_name') = 'Unattributed') THEN
(concat(JSON_VALUE(log,'$.fb_install_referrer_adgroup_name'), '(',
JSON_VALUE(log,'$.fb_install_referrer_adgroup_id'), ')')) ELSE
JSON_VALUE(log,'$.creative_name') END) AS `creative`
+ ,JSON_VALUE(log,'$.os_name') as `os_name`
+ ,JSON_VALUE(log,'$.country_code') as `country_code`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.ta_distinct_id')
as `distinct_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.open_id') as
`open_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.publisher_parameters'), '$.ta_account_id')
as `account_id`
+ ,CURRENT_TIMESTAMP as `flink_ts`
+ ,JSON_OBJECT(
+ 'ip' VALUE JSON_VALUE(log,'$.ip')
+ ,'ua' VALUE JSON_VALUE(log,'$.ua')
+ ,'idfa' VALUE JSON_VALUE(log,'$.idfa')
+ ,'idfv' VALUE JSON_VALUE(log,'$.idfv')
+ ,'gps_adid' VALUE JSON_VALUE(log,'$.gps_adid')
+ ,'android_id' VALUE JSON_VALUE(log,'$.android_id')
+ ,'mac_md5' VALUE JSON_VALUE(log,'$.mac_md5')
+ ,'oaid' VALUE JSON_VALUE(log,'$.oaid')
+ ,'gclid' VALUE JSON_VALUE(log,'$.gclid')
+ ,'gbraid' VALUE JSON_VALUE(log,'$.gbraid')
+ ,'dcp_wbraid' VALUE JSON_VALUE(log,'$.dcp_wbraid')
+ ) as `device_properties`
+ ,`log`
+ FROM ods_adjust_in
+ WHERE COALESCE(JSON_VALUE(log,'$.activity_kind'),
JSON_VALUE(log,'$.event_name')) not in ('impression', 'click')
+)
+UNION ALL
+SELECT
+ to_date(CONVERT_TZ(JSON_VALUE(log,'$.event_time'), 'UTC', 'Asia/Shanghai'))
as `date`
+ ,'AppsFlyer' as `mmp_type`
+ ,JSON_VALUE(log,'$.app_id') as `app_id`
+ ,LOWER(REPLACE( TRIM(JSON_VALUE(log,'$.event_name') ), ' ', '-')) as
`event_name`
+ ,TO_TIMESTAMP(CONVERT_TZ(JSON_VALUE(log,'$.event_time'), 'UTC',
'Asia/Shanghai')) as `event_time`
+ ,JSON_VALUE(log,'$.appsflyer_id') as `mmp_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.ta_distinct_id') as
`distinct_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.open_id') as `open_id`
+ ,JSON_VALUE(JSON_VALUE(log,'$.custom_data'), '$.ta_account_id') as
`account_id`
+ ,LOWER(JSON_VALUE(log,'$.platform')) AS `os_name`
+ ,LOWER(JSON_VALUE(log,'$.country_code')) as `country_code`
+ ,TO_TIMESTAMP(CONVERT_TZ(JSON_VALUE(log,'$.install_time'), 'UTC',
'Asia/Shanghai')) as `install_time`
+ ,LOWER(JSON_VALUE(log,'$.bundle_id')) AS `bundle_id`
+ ,LOWER(JSON_VALUE(log,'$.media_source')) AS `media`
+ ,LOWER(JSON_VALUE(log,'$.af_channel')) AS `channel`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.campaign')), ' (',
LOWER(JSON_VALUE(log,'$.af_c_id')), ')') AS `campaign`
+ ,LOWER(JSON_VALUE(log,'$.af_c_id')) AS `campaign_id`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.af_adset')), ' (',
LOWER(JSON_VALUE(log,'$.af_adset_id')), ')') AS `adgroup`
+ ,LOWER(JSON_VALUE(log,'$.af_adset_id')) AS `adgroup_id`
+ ,CONCAT(LOWER(JSON_VALUE(log,'$.af_ad')), ' (',
LOWER(JSON_VALUE(log,'$.af_ad_id')), ')') AS `ad`
+ ,LOWER(JSON_VALUE(log,'$.af_ad_id')) AS `ad_id`
+ ,CURRENT_TIMESTAMP as `flink_ts`
+ ,JSON_OBJECT(
+ 'ip' VALUE JSON_VALUE(log,'$.ip')
+ ,'ua' VALUE JSON_VALUE(log,'$.user_agent')
+ ,'idfa' VALUE JSON_VALUE(log,'$.idfa')
+ ,'idfv' VALUE JSON_VALUE(log,'$.idfv')
+ ,'gps_adid' VALUE JSON_VALUE(log,'$.advertising_id')
+ ,'android_id' VALUE JSON_VALUE(log,'$.android_id')
+ ,'oaid' VALUE JSON_VALUE(log,'$.oaid')
+ ) as `device_properties`
+ ,`log`
+FROM ods_af_in;
+```
+
+**数据上报**:类似于神策
+
+
+
+## **6. 落地实践 - 腾讯云环境篇**
+
+本文的 Flink 任务是运行在腾讯的 kubernetes 环境,其中包括一些相关的知识点,这里就不再详述了,例如:
+
+- **镜像同步**:harbor 配置自动同步镜像到腾讯云镜像服务 TCR 中。
+
+- **镜像配置**:如免密拉取、缓存加速。
+
+- **Flink webui 外网访问**:需要从外网lb购买以便于访问 Flink UI。
+
+- **环境打通**:因为 kubernetes
集群主要是一个偏封闭的内网计算环境、与各系统网络、安全组、权限、路由等都需要很好的配置和测试,才能让任务真正比较好的跑在该生产环境。
+
+- **安全组配置**:为了实现海外游戏实时数据的拉取处理和写入国内 Kafka 集群,确保数据流程的平稳,我们使用专线并且通过给 kubernetes
Flink超级节点开通专线外网访问权来实现。
+
+**相关截图**:
+
+
+
+
+
+## **7. 收益与期望**
+
+**使用 Apache StreamPark 之后,给我们最大的感受就是 StreamPark 用起来十分地简单,通过简单的配置就能很快地把 Flink
任务部署到任意的集群,且可以实时的去监控并管理成千上万的作业,十分的 nice!收益如下:**
+
+- 大量的实时任务通过 StreamPark 很快速地就迁移到了 Kubernetes,下掉 YARN 后,早期混部的 Apache Doris
集群没有资源争抢后,变得十分稳定。
+
+- 补数据时,可快速借用 TKE 中超级节点的能力,通过 StreamPark 平台,很容易进行上千个 pod 的一分钟弹缩,补数特别高效。
+
+- 技术栈变得更加先进和统一。大数据时代的一堆组件,变的越来越高效和统一。目前欢乐大数据已经完成了
Doris、Flink、Spark、Hive、Grafana、Prometheus 等一系列组件的 Kubernetes
云原生化,运维起来更加简单和高效,StreamPark 在 Flink 容器化进程中起到了很大的作用。
+
+- StreamPark 平台操作简单、功能完备、性能稳定、社区响应积极,真正让流处理作业开发越来越简单,为我们带来了很多便利,让我们更加专注于业务,点赞~
+
+在这里,我们也期望 StreamPark 能做得越来越好,这里提出了一些优化的建议:
+
+- **支持 Operator 部署**:期望 Flink on kubernetes 支持 operator
部署方式,因为其可以自定义资源类型、指定调用顺序、监控指标等等好处。
+
+- **支持 autoscaler**:因为实时流任务有业务高峰和波谷、特别周么活动、数据量翻倍、人为周期性调整并行度不是很合适。故结合
Kubernetes、期待 StreamPark 早日完成 operator 方式创建 Flink 集群、在 Flink
1.17以上、就可以比较好的设置阈值、利用自动弹缩、实现高峰期弹更多资源处理数据、低谷期收缩资源节约成本、非常期待。
+
+- **程序报错逻辑**:当前 Flink 任务设置了固定次数或者比率重试,但当前 StreamPark
邮件告警特别频繁、不是期待的任务失败一次进行一次告警,故希望其进一步提升。
+
+最后,我们衷心地感谢 **Apache StreamPark** 社区在我们使用 StreamPark
时提供的无私帮助。他们专业的服务精神和以用户为本的态度,使我们得以更加高效、流畅地运用这一强大框架。期望 StreamPark 越来越好,成为新晋
Apache 项目的典范!
+
diff --git a/static/blog/joymaker/access_credential.png
b/static/blog/joymaker/access_credential.png
new file mode 100644
index 00000000..8f781f1d
Binary files /dev/null and b/static/blog/joymaker/access_credential.png differ
diff --git a/static/blog/joymaker/alarm_configuration.png
b/static/blog/joymaker/alarm_configuration.png
new file mode 100644
index 00000000..c4cb7f95
Binary files /dev/null and b/static/blog/joymaker/alarm_configuration.png differ
diff --git a/static/blog/joymaker/application_first_configuration.png
b/static/blog/joymaker/application_first_configuration.png
new file mode 100644
index 00000000..fcfcb0aa
Binary files /dev/null and
b/static/blog/joymaker/application_first_configuration.png differ
diff --git a/static/blog/joymaker/application_second_configuration.png
b/static/blog/joymaker/application_second_configuration.png
new file mode 100644
index 00000000..61bdcaee
Binary files /dev/null and
b/static/blog/joymaker/application_second_configuration.png differ
diff --git a/static/blog/joymaker/application_third_configuration.png
b/static/blog/joymaker/application_third_configuration.png
new file mode 100644
index 00000000..d58f9f86
Binary files /dev/null and
b/static/blog/joymaker/application_third_configuration.png differ
diff --git a/static/blog/joymaker/cloud_environment.png
b/static/blog/joymaker/cloud_environment.png
new file mode 100644
index 00000000..81157642
Binary files /dev/null and b/static/blog/joymaker/cloud_environment.png differ
diff --git a/static/blog/joymaker/clusters.png
b/static/blog/joymaker/clusters.png
new file mode 100644
index 00000000..0a78a01e
Binary files /dev/null and b/static/blog/joymaker/clusters.png differ
diff --git a/static/blog/joymaker/cover.png b/static/blog/joymaker/cover.png
new file mode 100644
index 00000000..1368f8be
Binary files /dev/null and b/static/blog/joymaker/cover.png differ
diff --git a/static/blog/joymaker/data_reporting.png
b/static/blog/joymaker/data_reporting.png
new file mode 100644
index 00000000..e79bdee1
Binary files /dev/null and b/static/blog/joymaker/data_reporting.png differ
diff --git a/static/blog/joymaker/dynamic_parameter.png
b/static/blog/joymaker/dynamic_parameter.png
new file mode 100644
index 00000000..bc89f59f
Binary files /dev/null and b/static/blog/joymaker/dynamic_parameter.png differ
diff --git a/static/blog/joymaker/first_configuration.png
b/static/blog/joymaker/first_configuration.png
new file mode 100644
index 00000000..aace8b5f
Binary files /dev/null and b/static/blog/joymaker/first_configuration.png differ
diff --git a/static/blog/joymaker/flink_cluster.png
b/static/blog/joymaker/flink_cluster.png
new file mode 100644
index 00000000..f5f624c2
Binary files /dev/null and b/static/blog/joymaker/flink_cluster.png differ
diff --git a/static/blog/joymaker/flink_home.png
b/static/blog/joymaker/flink_home.png
new file mode 100644
index 00000000..a67a4b96
Binary files /dev/null and b/static/blog/joymaker/flink_home.png differ
diff --git a/static/blog/joymaker/incoming_parameter.png
b/static/blog/joymaker/incoming_parameter.png
new file mode 100644
index 00000000..8bcc2e47
Binary files /dev/null and b/static/blog/joymaker/incoming_parameter.png differ
diff --git a/static/blog/joymaker/instance_management.png
b/static/blog/joymaker/instance_management.png
new file mode 100644
index 00000000..9d33067c
Binary files /dev/null and b/static/blog/joymaker/instance_management.png differ
diff --git a/static/blog/joymaker/jar.png b/static/blog/joymaker/jar.png
new file mode 100644
index 00000000..7b5d8b3b
Binary files /dev/null and b/static/blog/joymaker/jar.png differ
diff --git a/static/blog/joymaker/jar_configuration.png
b/static/blog/joymaker/jar_configuration.png
new file mode 100644
index 00000000..32cc046a
Binary files /dev/null and b/static/blog/joymaker/jar_configuration.png differ
diff --git a/static/blog/joymaker/kubernetes_configuration.png
b/static/blog/joymaker/kubernetes_configuration.png
new file mode 100644
index 00000000..2817419f
Binary files /dev/null and b/static/blog/joymaker/kubernetes_configuration.png
differ
diff --git a/static/blog/joymaker/new_cover.png
b/static/blog/joymaker/new_cover.png
new file mode 100644
index 00000000..985ac56f
Binary files /dev/null and b/static/blog/joymaker/new_cover.png differ
diff --git a/static/blog/joymaker/running.png b/static/blog/joymaker/running.png
new file mode 100644
index 00000000..ca727b18
Binary files /dev/null and b/static/blog/joymaker/running.png differ
diff --git a/static/blog/joymaker/second_configuration.png
b/static/blog/joymaker/second_configuration.png
new file mode 100644
index 00000000..5a2e6fa1
Binary files /dev/null and b/static/blog/joymaker/second_configuration.png
differ
diff --git a/static/blog/joymaker/session_cluster.png
b/static/blog/joymaker/session_cluster.png
new file mode 100644
index 00000000..7b13bc48
Binary files /dev/null and b/static/blog/joymaker/session_cluster.png differ
diff --git a/static/blog/joymaker/sql_configuration.png
b/static/blog/joymaker/sql_configuration.png
new file mode 100644
index 00000000..8a048fd1
Binary files /dev/null and b/static/blog/joymaker/sql_configuration.png differ
diff --git a/static/blog/joymaker/task_construction.png
b/static/blog/joymaker/task_construction.png
new file mode 100644
index 00000000..c6232e7a
Binary files /dev/null and b/static/blog/joymaker/task_construction.png differ
diff --git a/static/blog/joymaker/third_configuration.png
b/static/blog/joymaker/third_configuration.png
new file mode 100644
index 00000000..6d1984e7
Binary files /dev/null and b/static/blog/joymaker/third_configuration.png differ
diff --git a/static/blog/joymaker/variable_management.png
b/static/blog/joymaker/variable_management.png
new file mode 100644
index 00000000..a41683f2
Binary files /dev/null and b/static/blog/joymaker/variable_management.png differ