This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev
in repository 
https://gitbox.apache.org/repos/asf/incubator-streampark-website.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04786f1  [improve] localization Ziroom's Real-Time Computing Platform 
Practice Based on Apache StreamPark (#315)
04786f1 is described below

commit 04786f1696e7fcfccb2a96de89a458688926b39a
Author: VampireAchao <[email protected]>
AuthorDate: Tue Jan 16 11:37:09 2024 +0800

    [improve] localization Ziroom's Real-Time Computing Platform Practice Based 
on Apache StreamPark (#315)
    
    Co-authored-by: VampireAchao <[email protected]>
---
 blog/8-streampark-usercase-ziru.md | 555 +++++++++++++++++++++++++++++++++++++
 1 file changed, 555 insertions(+)

diff --git a/blog/8-streampark-usercase-ziru.md 
b/blog/8-streampark-usercase-ziru.md
new file mode 100644
index 0000000..eda608f
--- /dev/null
+++ b/blog/8-streampark-usercase-ziru.md
@@ -0,0 +1,555 @@
+---
+slug: streampark-usercase-ziru
+title: Ziroom's Real-Time Computing Platform Practice Based on Apache 
StreamPark
+tags: [StreamPark, Production Practice]
+---
+
+![](/blog/ziru/cover.png)
+
+**Introduction:** Ziroom, an O2O internet company focusing on providing rental 
housing products and services, has built an online, data-driven, and 
intelligent platform that covers the entire chain of urban living. Real-time 
computing has always played an important role in Ziroom. To date, Ziroom 
processes TB-level data daily. This article, brought by the real-time computing 
team from Ziroom, introduces the in-depth practice of Ziroom's real-time 
computing platform based on StreamPark.
+
+- Challenges in real-time computing
+- The journey to the solution
+- In-depth practice based on StreamPark
+- Summary of practical experience and examples
+- Benefits brought by the implementation
+- Future plans
+
+<!-- truncate -->
+
+As an O2O internet brand offering rental housing products and services, Ziroom 
was established in October 2011. To date, Ziroom has served nearly 500,000 
landlords and 5 million customers, managing over 1 million housing units. As of 
March 2021, Ziroom has expanded to 10 major cities including Beijing, Shanghai, 
Shenzhen, Hangzhou, Nanjing, Guangzhou, Chengdu, Tianjin, Wuhan, and Suzhou. 
Ziroom has created an online, data-driven, and intelligent platform for quality 
residential products  [...]
+
+With a vast user base, Ziroom has been committed to providing superior product 
experiences and achieving digital transformation of the enterprise. Since 2021, 
real-time computing, particularly Flink, has played an important role in 
Ziroom. To date, Ziroom processes TB-level data daily, with over 500 real-time 
jobs supporting more than 10 million data calls per day.
+
+## **Challenges in Real-Time Computing**
+
+At Ziroom, real-time computing is mainly divided into two application 
scenarios:
+
+- Data synchronization: Includes Kafka, MySQL, and MongoDB data 
synchronization to Hive / Paimon / ClickHouse, etc.
+
+- Real-time data warehouse: Includes real-time indicators for businesses like 
rentals, acquisitions, and home services.
+
+In the process of implementing real-time computing, we faced several 
challenges, roughly as follows:
+
+### **01 Low Efficiency in Job Deployment**
+
+The process of developing and deploying real-time jobs at Ziroom is as 
follows: data warehouse developers embed Flink SQL code in the program, debug 
locally, compile into FatJar, and then submit the job as a work order and JAR 
package to the operation team. The operation team member responsible for job 
deployment then deploys the job to the online Kubernetes session environment 
through the command line. This process involves many steps, each requiring 
manual intervention, resulting in ex [...]
+
+![](/blog/ziru/job_goes_online.png)
+
+### **02 Unclear Job Ownership Information**
+
+Due to the lack of unified management of the real-time computing platform, 
business code is managed by GitLab. Although this solved some problems, we 
found deficiencies between the repository code and the management of online 
Flink jobs: lack of clear ownership, lack of grouping and effective permission 
control, leading to chaotic job management and difficult responsibility 
tracing. To ensure the consistency and controllability of code and online jobs, 
there is an urgent need to establis [...]
+
+### **03 Difficulty in Job Maintenance**
+
+At Ziroom, multiple versions of Flink jobs are running. Due to frequent API 
changes and lack of backward compatibility in major version upgrades of Apache 
Flink, the cost of upgrading project code becomes very high. Therefore, 
managing these different versions of jobs has become a headache.
+
+Without a
+
+unified job platform, these jobs are submitted using scripts. Jobs vary in 
importance and data volume, requiring different resources and runtime 
parameters, necessitating corresponding modifications. Modifications can be 
made by editing the submission script or directly setting parameters in the 
code, but this makes configuration information difficult to access, especially 
when jobs restart or fail and FlinkUI is unavailable, turning configuration 
information into a black box. Therefore, [...]
+
+### **04 Difficulty in Job Development and Debugging**
+
+In our previous development process, we typically embedded SQL code within 
program code in the local IDEA environment for job development and debugging, 
verifying the correctness of the program. However, this approach has several 
disadvantages:
+
+1. Difficulty in multi-data source debugging. Often, a requirement involves 
multiple different data sources. For local environment debugging, developers 
need to apply for white-list access to data, which is both time-consuming and 
cumbersome.
+
+2. SQL code is hard to read and modify. As SQL code is embedded in program 
code, it's difficult to read and inconvenient to modify. More challenging is 
debugging through SQL segments, as the lack of SQL version control and syntax 
verification makes it hard for developers to locate specific SQL lines in 
client logs to identify the cause of execution failure.
+
+Therefore, there is a need to improve the efficiency of development and 
debugging.
+
+## **The Journey to the Solution**
+
+In the early stages of platform construction, we comprehensively surveyed 
almost all relevant projects in the industry, covering both commercial paid 
versions and open-source versions, starting from early 2022. After 
investigation and comparison, we found that these projects have their 
limitations to varying extents, and their usability and stability could not be 
effectively guaranteed.
+
+Overall, StreamPark performed best in our evaluation. It was the only project 
without major flaws and with strong extensibility: supporting both SQL and JAR 
jobs, with the most complete and stable deployment mode for Flink jobs. Its 
unique architectural design not only avoids locking in specific Flink versions 
but also supports convenient version switching and parallel processing, 
effectively solving job dependency isolation and conflict issues. The job 
management & operations capabiliti [...]
+
+In the latest 2.2 version, the community has already restructured this part.
+
+After analyzing the pros and cons of many open-source projects, we decided to 
participate in projects with excellent architecture, development potential, and 
an actively dedicated core team. Based on this understanding, we made the 
following decisions:
+
+1. In terms of job deployment mode, we decided to adopt the On Kubernetes 
mode. Real-time jobs have dynamic resource consumption, creating a strong need 
for Kubernetes' elastic scaling, which helps us better cope with data output 
fluctuations and ensure job stability.
+
+2. In the selection of open-source components, after comprehensive comparison 
and evaluation of various indicators, we finally chose what was then StreamX. 
Subsequent close communication with the community allowed us to deeply 
appreciate the serious and responsible attitude of the founders and the united 
and friendly atmosphere of the community. We also witnessed the project's 
inclusion in the Apache Incubator in September 2022, making us hopeful for its 
future.
+
+3. On the basis of StreamPark, we aim to promote integration with the existing 
ecosystem of the company to better meet our business needs.
+
+## **In-depth Practice Based on StreamPark**
+
+Based on the above decisions, we initiated the evolution of the real-time 
computing platform, oriented by "pain point needs," and built a stable, 
efficient, and easy-to-maintain real-time computing platform based on 
StreamPark. Since the beginning of 2022, we have participated in the 
construction of the community while officially scheduling our internal platform 
construction.
+
+First, we further improved related functionalities on the basis of StreamPark:
+
+![](/blog/ziru/platform_construction.png)
+
+### **01 LDAP Login Support**
+
+On the basis of StreamPark, we further improved related functionalities, 
including support for LDAP, so that in the future we can fully open up 
real-time capabilities, allowing analysts from the company's four business 
lines to use the platform, expected to reach about 170 people. With the 
increase in numbers, account management becomes increasingly important, 
especially in the case of personnel changes, account cancellation, and 
application become frequent and time-consuming operations. [...]
+
+#### step1: Fill in the corresponding LDAP
+
+configuration:
+
+Edit the application.yml file, setting the LDAP basic information as follows:
+
+```yaml
+ldap:
+  # Is ldap enabled?
+  enable: false
+  ## AD server IP, default port 389
+  urls: ldap://99.99.99.99:389
+  ## Login Account
+  base-dn: dc=streampark,dc=com
+  username: cn=Manager,dc=streampark,dc=com
+  password: streampark
+  user:
+    identity-attribute: uid
+    email-attribute: mail
+```
+
+#### step2: LDAP Login
+
+On the login interface, click LDAP login method, then enter the corresponding 
account and password, and click to log in.
+
+![](/blog/ziru/ldap.png)
+
+### **02 Automatic Ingress Generation for Job Submission**
+
+Due to the company's network security policy, only port 80 is opened on the 
Kubernetes host machines by the operation team, making it impossible to 
directly access the job WebUI on Kubernetes via "domain + random port." To 
solve this problem, we needed to use Ingress to add a proxy layer to the access 
path, achieving the effect of access routing. In StreamPark version 2.0, we 
contributed the functionality related to Ingress [3]. We adopted a strategy 
pattern implementation, initially obt [...]
+
+The specific configuration steps are as follows:
+
+#### step1: Click to enter Setting-> Choose Ingress Setting, fill in the 
domain name
+
+![](/blog/ziru/ingress_setting.png)
+
+#### step2: Submit a job
+
+![](/blog/ziru/k8s_job.png)
+
+Upon entering the K8s management platform, you can observe that submitting a 
Flink job also corresponds to submitting an Ingress with the same name.
+
+#### step3: Click to enter Flink's WebUI
+
+You will notice that the generated address consists of three parts: domain + 
job submission namespace + job name.
+
+![](/blog/ziru/flink_webui.png)
+
+### **03 Support for Viewing Job Deployment Logs**
+
+In the process of continuous job deployment, we gradually realized that 
without logs, we cannot perform effective operations. Log retention, archiving, 
and viewing became an important part in our later problem-solving process. 
Therefore, in StreamPark version 2.0, we contributed the ability to archive 
startup logs in On Kubernetes mode and view them on the page [4]. Now, by 
clicking the log viewing button in the job list, it is very convenient to view 
the real-time logs of the job.
+
+![](/blog/ziru/k8s_log.png)
+
+### **04 Integration of Grafana Monitoring Chart Links**
+
+In actual use, as the number of jobs increased, the number of users rose, and 
more departments were involved, we faced the problem of difficulty in 
self-troubleshooting. Our team's operational capabilities are actually very 
limited. Due to the difference in professional fields, when we tell users to 
view charts and logs on Grafana and ELK, users often feel at a loss and do not 
know how to find information related to their jobs.
+
+To solve this problem, we proposed a demand in the community: we hoped that 
each job could directly jump to the corresponding monitoring chart and log 
archive page through a hyperlink, so that users could directly view the 
monitoring information and logs related to their jobs. This avoids tedious 
searches in complex system interfaces, thus improving the efficiency of 
troubleshooting.
+
+We had a discussion in the community, and it was quickly responded to, as 
everyone thought it was a common need. Soon, a developer contributed a design 
and related PR, and the issue was quickly resolved. Now, to enable this feature 
in StreamPark has become very simple:
+
+#### step1: Create a badge label
+
+![](/blog/ziru/create_badge_label.png)
+
+#### step2: Associate the badge label with a redirect link
+
+![](/blog/ziru/relevancy.png)
+
+#### step3: Click the badge label for link redirection
+
+![](/blog/ziru/link_jump.png)
+
+### **05 Integration of Flink SQL Security for Permission Control**
+
+In our system, lineage management is based on Apache Atlas, and permission 
management is based on the open-source project Flink-sql-security, which 
supports user-level data desensitization and row-level data access control, 
allowing specific users to only access desensitized data or authorized rows.
+
+This design is to handle some complex inheritance logic. For example, when 
joining encrypted field age in Table A with Table B to obtain Table C, the age 
field in Table C should inherit the encryption logic of Table A to ensure the 
encryption status of data is not lost during processing. This way, we can 
better protect data security and ensure that data complies with security 
standards throughout the processing process.
+
+For permission control, we developed a Flink-sql-security-streampark plugin 
based on Flink-s
+
+ql-security. The basic implementation is as follows:
+
+1. During submission check, the system parses the submitted SQL, obtaining 
InputTable and OutputTable datasets.
+
+2. The system queries the remote permission service to obtain the user's bound 
RBAC (Role-Based Access Control) permissions.
+
+3. Based on the RBAC permissions, the system gets the encryption rules for the 
corresponding tables.
+
+4. The system rewrites the SQL, wrapping the original SQL query fields with a 
preset encryption algorithm, thereby reorganizing the logic.
+
+5. Finally, the system submits according to the reorganized logic.
+
+Through this integration and plugin development, we implemented permission 
control for user query requests, thereby ensuring data security.
+
+**01 Row-level Permission Conditions**
+
+![](/blog/ziru/row_level_permissions.png)
+
+![](/blog/ziru/row_level_permissions_table.png)
+
+Input SQL
+
+```shell
+SELECT * FROM orders;
+```
+User A's actual execution SQL:
+```shell
+SELECT * FROM orders WHERE region = 'beijing';
+```
+User B's actual execution SQL:
+```shell
+SELECT * FROM orders WHERE region = 'hangzhou';
+```
+
+**02 Field Desensitization Conditions**
+
+![](/blog/ziru/field_desensitization.png)
+
+![](/blog/ziru/field_desensitization_table.png)
+
+Input SQL
+```shell
+SELECT name, age, price, phone FROM user;
+```
+Execution SQL:
+
+User A's actual execution SQL:
+```shell
+SELECT Encryption_function(name), age, price, Sensitive_field_functions(phone) 
FROM user;
+```
+User B's actual execution SQL:
+```shell
+SELECT name, Encryption_function(age), price, Sensitive_field_functions(phone) 
FROM user;
+```
+
+### **06 Data Synchronization Platform Based on StreamPark**
+
+With the successful implementation of StreamPark's technical solutions in the 
company, we achieved deep support for Flink jobs, bringing a qualitative leap 
in data processing. This prompted us to completely revamp our past data 
synchronization logic, aiming to reduce operational costs through technical 
optimization and integration. Therefore, we gradually replaced historical Sqoop 
jobs, Canal jobs, and Hive JDBC Handler jobs with Flink CDC jobs, Flink stream, 
and batch jobs. In this proc [...]
+
+External system integration with StreamPark is simple, requiring only a few 
steps:
+
+1. First, create a token for API access:
+
+![](/blog/ziru/token.png)
+
+2. View the external call link of the Application:
+
+![](/blog/ziru/call_link.png)
+
+```shell
+curl -X POST '/flink/app/start' \
+-H 'Authorization: $token' \
+-H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' \
+--data-urlencode 'savePoint=' \
+--data-urlencode 'allowNonRestored=false' \
+--data-urlencode 'savePointed=false' \
+--data-urlencode 'id=100501'
+```
+
+3. Configure Http scheduling in DolphinScheduler
+
+![](/blog/ziru/http_scheduling.png)
+
+## **Summary of Practical Experience**
+
+During our in-depth use of StreamPark, we summarized some common issues and 
explored solutions in the practice process, which we have compiled into 
examples for reference.
+
+### **01 Building Base Images**
+
+To deploy a Flink job on Kubernetes using StreamPark, you first need to 
prepare a Base image built on Flink. Then, on the Kubernetes platform, the 
user-provided image is used to start the Flink job. If we continue to use the 
official "bare image," it is far from sufficient for actual development. 
Business logic developed by users often involves multiple upstream and 
downstream data sources, requiring related data source Connectors and 
dependencies like Hadoop. Therefore, these dependenci [...]
+
+#### step1: First, create a folder containing two folders and a Dockerfile file
+
+![](/blog/ziru/dockerfile.png)
+
+conf folder: Contains HDFS configuration files, mainly for configuring Flink's 
Checkpoint writing and using Hive metadata in FlinkSQL
+
+![](/blog/ziru/hdfs_conf.png)
+
+lib folder: Contains the related Jar dependency packages, as follows:
+
+![](/blog/ziru/lib.png)
+
+Dockerfile file for defining image construction
+
+```dockerfile
+FROM apache/flink:1.14.5-scala_2.11-java8
+ENV TIME_ZONE=Asia/Shanghai
+COPY ./conf /opt/hadoop/conf
+COPY lib $FLINK_HOME/lib/
+```
+
+#### step2: Image build command using multi-architecture build mode, as 
follows:
+
+```dockerfile
+docker buildx build --push --platform linux/amd64 -t ${private image 
repository address}
+```
+
+### **02 Base Image Integration with Arthas Example**
+
+As more jobs are released and go live within our company, we often encounter 
performance degradation in long-running jobs, such as reduced Kafka consumption 
capacity, increased memory usage, and extended GC time. We recommend using 
Arthas, an open-source Java diagnostic tool by Alibaba. It allows real-time 
global viewing of Java application load, memory, GC, thread status, and without 
modifying application code, it enables viewing method call parameters, 
exceptions, monitoring method exe [...]
+
+![](/blog/ziru/arthas.png)
+
+![](/blog/ziru/advanced.png)
+
+![](/blog/ziru/arthas_log.png)
+
+Therefore, we integrated Arthas into the base image to facilitate runtime 
problem troubleshooting.
+
+```dockerfile
+FROM apache/flink:1.14.5-scala_2.11-java8
+ENV TIME_ZONE=Asia/Shanghai
+COPY ./conf /opt/hadoop/conf
+COPY lib $FLINK_HOME/lib/
+RUN apt-get update --fix-missing && apt-get install -y fontconfig 
--fix-missing && \
+    apt-get install -y openjdk-8-jdk && \
+    apt-get install -y ant && \
+    apt-get clean;
+
+RUN apt-get install sudo -y
+
+# Fix certificate issues
+RUN apt-get update && \
+    apt-get install ca-certificates-java && \
+    apt-get clean && \
+    update-ca-certificates -f;
+
+# Setup JAVA_HOME -- useful for docker commandline
+ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
+RUN export JAVA_HOME
+RUN apt-get install -y unzip
+RUN curl -Lo arthas-packaging-latest-bin.zip  
'https://arthas.aliyun.com/download/latest_version?mirror=aliyun'
+RUN unzip -d arthas-latest-bin arthas-packaging-latest-bin.zip
+```
+
+### **03 Resolution of Dependency Conflicts in Images**
+
+In the process of using StreamPark, we often encounter dependency conflict 
exceptions like NoClassDefFoundError, ClassNotFoundException, and 
NoSuchMethodError in Flink jobs running on base images. The troubleshooting 
approach is to find the package path of the conflicting class indicated in the 
error. For example, if the error class is in org.apache.orc:orc-core, go to the 
corresponding module directory, run `mvn dependency::tree`, search for 
orc-core, see who brought in the dependency,  [...]
+
+#### step1: Clone the flink-shaded project locally👇
+
+```shell
+git clone https://github.com/apache/flink-shaded.git
+```
+
+![](/blog/ziru/flink_shaded.png)
+
+#### step2: Load the project into IDEA
+
+![](/blog/ziru/idea.png)
+
+#### step3: Exclude the conflicting parts and then package them.
+
+### **04 Centralized Job Configuration Example**
+
+One of the great conveniences of using StreamPark is centralized configuration 
management. You can configure all settings in the conf file in the Flink 
directory bound to the platform.
+
+```shell
+cd /flink-1.14.5/conf
+vim flink-conf.yaml
+```
+
+![](/blog/ziru/conf.png)
+
+After completing the configuration, save it. Then go to the platform's 
Setting, and click on the Flink Conf icon.
+
+![](/blog/ziru/flink_conf.png)
+
+Clicking Sync Conf will synchronize the global configuration file, and new 
jobs will be submitted with the new configuration.
+
+![](/blog/ziru/sync_conf.png)
+
+### **05 StreamPark DNS Resolution Configuration**
+
+A correct and reasonable DNS resolution configuration is very important when 
submitting FlinkSQL on the StreamPark platform. It mainly involves the 
following points:
+
+1. Flink jobs' Checkpoint writing to HDFS requires a snapshot write to an HDFS 
node obtained through ResourceManager. If there are expansions in the Hadoop 
cluster in the enterprise, and these new nodes are not covered by the DNS 
resolution service, this will directly lead to Checkpoint failure, affecting 
online stability.
+
+2. Flink jobs typically need to configure connection strings for different 
internal data sources. Configuring the database's real IP address often leads 
to job exits due to IP changes during database migration. Therefore, in 
production, connection strings are often composed of domain names and attribute 
parameters, with DNS services resolving them to real IP addresses for access.
+
+Initially, we maintained DNS configuration through Pod Template.
+
+```shell
+apiVersion: v1
+kind: Pod
+metadata:
+  name: pod-template
+spec:
+  hostAliases:
+    - ip: 10.216.xxx.79
+      hostnames:
+        - handoop1
+    - host
+
+names:
+        - handoop2
+      ip: 10.16.xx.48
+    - hostnames:
+        - handoop3
+      ip: 10.16.xx.49
+    - hostnames:
+        - handoop4
+      ip: 10.16.xx.50
+   .......
+```
+
+Although theoretically feasible, we encountered a series of problems in 
practice. When expanding HDFS, we found failures in Flink's Checkpoint 
function, and database migration also faced connection failures, causing sudden 
online service outages. After in-depth investigation, we found that the root 
cause was in DNS resolution.
+
+Previously, we used hostAliases to maintain the mapping between domain names 
and IP addresses. However, this method was costly in practice, as every update 
of hostAliases required stopping all Flink jobs, undoubtedly increasing our 
operational costs. To seek a more flexible and reliable method to manage DNS 
resolution configuration and ensure the normal operation of Flink jobs, we 
decided to build dnsmasq for bidirectional DNS resolution.
+
+After configuring and installing dnsmasq, we first needed to override the 
resolv.conf configuration file in the /etc directory of the Flink image. 
However, since resolv.conf is a read-only file, if we want to override it, we 
need to use mounting. Therefore, we first configured resolv.conf as a ConfigMap 
for use during the override. This way, we can more flexibly and reliably manage 
DNS resolution configuration, ensuring stable operation of Flink jobs.
+
+```yaml
+apiVersion: v1
+data:
+  resolv.conf: "nameserver  10.216.138.226" //DNS service
+kind: ConfigMap
+metadata:
+  creationTimestamp: "2022-07-13T10:16:18Z"
+  managedFields:
+  name: dns-configmap
+  namespace: native-flink
+```
+
+Mounting it through Pod Template.
+
+![](/blog/ziru/pod_template.png)
+
+This way, DNS related to the big data platform can be maintained on dnsmasq, 
while the host machine running Flink jobs can follow the DNS resolution process.
+
+1. First, check the local hosts file to see if there is a corresponding 
relationship, read the record for resolution, and proceed to the next step if 
not.
+
+2. The operating system checks the local DNS cache, and if not found, moves to 
the next step.
+
+3. The operating system searches the DNS server address defined in our network 
configuration.
+
+This achieves dynamic recognition of DNS changes.
+
+### **06 Multi-Instance Deployment Practice**
+
+In actual production environments, we often need to operate multiple clusters, 
including a set for testing and a set for official online use. Tasks are first 
verified and performance tested in the test cluster, then released to the 
official online cluster after ensuring accuracy.
+
+#### step1: Modify the port number to avoid conflicts between multiple service 
ports
+
+![](/blog/ziru/port_number.png)
+
+#### step2: Modify workspace
+
+Different instance services need to configure different workspaces to avoid 
resource interference leading to strange bugs.
+
+#### step3: Launch multi-instance services
+
+To achieve isolation between production and testing environments, we 
introduced a key step at the beginning of the startup process. We input the 
command (for the Hadoop B cluster):
+
+```shell
+export HADOOP_CONF_DIR=/home/streamx/conf
+```
+
+This effectively cut off the default logic of Flink on K8s loading HDFS 
configuration. This operation ensures that A StreamPark only connects to A 
Hadoop environment, while B StreamPark connects to B Hadoop environment, thus 
achieving complete isolation between testing and production environments.
+
+Specifically, after this command takes effect, we can ensure that Flink jobs 
submitted on port 10002 connect to the B Hadoop environment. Thus, the B Hadoop 
environment is isolated from the Hadoop environment used by Flink jobs 
submitted on port 10000 in the past, effectively preventing interference 
between different environments and ensuring system stability and reliability.
+
+The following content is an analysis of Flink's logic for loading the Hadoop 
environment:
+
+```yaml
+// Process of finding Hadoop configuration files
+//1. First, check if the parameter kubernetes.hadoop.conf.config-map.name is 
added
+@Override
+public Optional<String> get
+
+ExistingHadoopConfigurationConfigMap() {
+    final String existingHadoopConfigMap =
+            
flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
+    if (StringUtils.isBlank(existingHadoopConfigMap)) {
+        return Optional.empty();
+    } else {
+        return Optional.of(existingHadoopConfigMap.trim());
+    }
+}
+
+@Override
+public Optional<String> getLocalHadoopConfigurationDirectory() {
+    // 2. If parameter 1 is not specified, check for the HADOOP_CONF_DIR 
environment variable in the local environment where the native command is 
submitted
+    final String hadoopConfDirEnv = 
System.getenv(Constants.ENV_HADOOP_CONF_DIR);
+    if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
+        return Optional.of(hadoopConfDirEnv);
+    }
+    // 3. If environment variable 2 is not present, continue to check for the 
HADOOP_HOME environment variable
+    final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
+    if (StringUtils.isNotBlank(hadoopHomeEnv)) {
+        // Hadoop 2.2+
+        final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
+        if (hadoop2ConfDir.exists()) {
+            return Optional.of(hadoop2ConfDir.getAbsolutePath());
+        }
+
+        // Hadoop 1.x
+        final File hadoop1ConfDir = a new File(hadoopHomeEnv, "/conf");
+        if (hadoop1ConfDir.exists()) {
+            return Optional.of(hadoop1ConfDir.getAbsolutePath());
+        }
+    }
+
+    return Optional.empty();
+}
+
+final List<File> hadoopConfigurationFileItems = 
getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
+// If 1, 2, 3 are not found, it means there is no Hadoop environment
+if (hadoopConfigurationFileItems.isEmpty()) {
+    LOG.warn(
+            "Found 0 files in directory {}, skip to mount the Hadoop 
Configuration ConfigMap.",
+            localHadoopConfigurationDirectory.get());
+    return flinkPod;
+}
+// If 2 or 3 exists, it will look for core-site.xml and hdfs-site.xml files in 
the path
+private List<File> getHadoopConfigurationFileItems(String 
localHadoopConfigurationDirectory) {
+    final List<String> expectedFileNames = new ArrayList<>();
+    expectedFileNames.add("core-site.xml");
+    expectedFileNames.add("hdfs-site.xml");
+
+    final File directory = new File(localHadoopConfigurationDirectory);
+    if (directory.exists() and directory.isDirectory()) {
+        return Arrays.stream(directory.listFiles())
+                .filter(
+                        file ->
+                                file.isFile()
+                                        and expectedFileNames.stream()
+                                                .anyMatch(name -> 
file.getName().equals(name)))
+                .collect(Collectors.toList());
+    } else {
+        return Collections.emptyList();
+    }
+}
+
+// If a Hadoop environment is present, the above two files will be parsed as 
key-value pairs, and then constructed into a ConfigMap, with the name following 
this naming rule
+public static String getHadoopConfConfigMapName(String clusterId) {
+    return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
+}
+```
+
+Then conduct process port occupancy queries:
+
+```shell
+netstat -tlnp | grep 10000
+netstat -tlnp | grep 10002
+```
+
+## **Benefits Brought**
+
+Our team has been using StreamX (the predecessor of StreamPark) and, after 
more than a year of practice and refinement, StreamPark has significantly 
improved our challenges in developing, managing, and operating Apache Flink 
jobs. As a one-stop service platform, StreamPark greatly simplifies the entire 
development process. Now, we can complete job development, compilation, and 
release directly on the StreamPark platform, not only lowering the management 
and deployment threshold of Flink  [...]
+
+Since deploying StreamPark, we have been using the platform on a large scale 
in a production environment. From initially managing over 50 FlinkSQL jobs to 
nearly 500 jobs now, as shown in the diagram, StreamPark is divided into 7 
teams, each with dozens of jobs. This transformation not only demonstrates 
StreamPark's scalability and efficiency but also fully proves its strong 
practical value in actual business.
+
+![](/blog/ziru/production_environment.png)
+
+## **Future Expectations**
+
+As one of the early users of StreamPark, we have maintained close 
communication with the community, participating in the stability improvement of 
StreamPark. We have submitted bugs encountered in production operation and new 
features to the community. In the future, we hope to manage the metadata 
information of Apache Paimon lake tables and the capability of auxiliary jobs 
for
+
+Paimon's Actions on StreamPark. Based on the Flink engine, by interfacing with 
the Catalog of lake tables and Action jobs, we aim to realize the management 
and optimization of lake table jobs in one integrated capability. Currently, 
StreamPark is working on integrating the capabilities with Paimon data, which 
will greatly assist in real-time data lake ingestion in the future.
+
+We are very grateful for the technical support that the StreamPark team has 
provided us all along. We wish Apache StreamPark continued success, more users, 
and its early graduation to become a top-level Apache project.

Reply via email to