This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git


The following commit(s) were added to refs/heads/main by this push:
     new 272f24b  BIGTOP-4135: Add Flink component on Bigtop-3.3.0 stack (#47)
272f24b is described below

commit 272f24bbbaa19e178abae7cdf8dc5088c074afbe
Author: Zhiguo Wu <[email protected]>
AuthorDate: Tue Aug 20 16:46:00 2024 +0800

    BIGTOP-4135: Add Flink component on Bigtop-3.3.0 stack (#47)
---
 .../services/flink/configuration/flink-conf.xml    | 367 +++++++++++++++++++++
 .../services/flink/configuration/flink-env.xml     |  33 ++
 .../configuration/flink-log4j-cli-properties.xml   |  83 +++++
 .../flink-log4j-console-properties.xml             |  84 +++++
 .../flink/configuration/flink-log4j-properties.xml |  73 ++++
 .../flink-log4j-session-properties.xml             |  58 ++++
 .../bigtop/3.3.0/services/flink/metainfo.xml       |  80 +++++
 .../stacks/bigtop/3.3.0/services/flink/order.json  |   3 +
 .../zookeeper/configuration/zookeeper-env.xml      |   2 +-
 .../zookeeper/configuration/zookeeper-env.xml      |   2 +-
 .../bigtop/v3_3_0/flink/FlinkClientScript.java     |  43 +++
 .../FlinkHistoryServerScript.java}                 |  34 +-
 .../stack/bigtop/v3_3_0/flink/FlinkParams.java     | 133 ++++++++
 .../stack/bigtop/v3_3_0/flink/FlinkSetup.java      | 102 ++++++
 .../bigtop/v3_3_0/kafka/KafkaBrokerScript.java     |   4 +-
 .../v3_3_0/zookeeper/ZookeeperServerScript.java    |   4 +-
 .../manager/stack/core/executor/StackExecutor.java |   4 +
 17 files changed, 1084 insertions(+), 25 deletions(-)

diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
new file mode 100644
index 0000000..a865f5d
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
@@ -0,0 +1,367 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>jobmanager.archive.fs.dir</name>
+        <value>hdfs:///completed-jobs/</value>
+        <description>Directory for JobManager to store the archives of 
completed jobs.</description>
+    </property>
+    <property>
+        <name>historyserver.archive.fs.dir</name>
+        <value>hdfs:///completed-jobs/</value>
+        <description>Comma separated list of directories to fetch archived 
jobs from.</description>
+    </property>
+    <property>
+        <name>historyserver.web.port</name>
+        <value>8082</value>
+        <description>The port under which the web-based HistoryServer 
listens.</description>
+    </property>
+    <property>
+        <name>historyserver.archive.fs.refresh-interval</name>
+        <value>10000</value>
+        <description>Interval in milliseconds for refreshing the monitored 
directories.</description>
+    </property>
+    <property>
+        <name>security.kerberos.login.keytab</name>
+        <description>Flink keytab path</description>
+        <value>none</value>
+    </property>
+    <property>
+        <name>security.kerberos.login.principal</name>
+        <description>Flink principal name</description>
+        <value>none</value>
+    </property>
+    <!-- flink-conf.yaml -->
+    <property>
+        <name>content</name>
+        <display-name>flink-conf template</display-name>
+        <description>This is the freemarker template for flink-conf.xml 
file</description>
+        <value><![CDATA[
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This 
setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host hostname parameter of the bin/jobmanager.sh 
executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and 
setup
+# the conf/masters file, this will be taken care of automatically. Yarn
+# automatically configure the host name based on the hostname of the node 
where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+# The host interface the JobManager will bind to. My default, this is 
localhost, and will prevent
+# the JobManager from communicating outside the machine/container it is 
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+jobmanager.bind-host: localhost
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1024m
+
+# The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
+# the TaskManager from communicating outside the machine/container it is 
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an 
outside facing network
+# interface, such as 0.0.0.0.
+
+taskmanager.bind-host: localhost
+
+# The address of the host on which the TaskManager runs and can be reached by 
the JobManager and
+# other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
+# the address.
+#
+# Note this address needs to be reachable by the JobManager and forward 
traffic to one of
+# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+#
+# Note also that unless all TaskManagers are running on the same machine, this 
address needs to be
+# configured separately for each TaskManager.
+
+taskmanager.host: localhost
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1024m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and 
Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# JVM and Logging Options
+#==============================================================================
+# Java runtime to use
+env.java.home: ${java_home}
+
+# Path to hadoop configuration directory. It is required to read HDFS and/or 
YARN configuration.
+# You can also set it via environment variable.
+env.hadoop.conf.dir: ${hadoop_conf_dir}
+
+# Defines the directory where the flink-&lt;host&gt;-&lt;process&gt;.pid files 
are saved.
+env.pid.dir: ${flink_pid_dir}
+
+# Defines the directory where the Flink logs are saved.
+env.log.dir: ${flink_log_dir}
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
+# the small ground truth for checkpoint and leader election, this location 
stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
+#
+# Execution checkpointing related parameters. Please refer to CheckpointConfig 
and ExecutionCheckpointingOptions for more details.
+#
+# execution.checkpointing.interval: 3min
+# execution.checkpointing.externalized-checkpoint-retention: 
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
+# execution.checkpointing.max-concurrent-checkpoints: 1
+# execution.checkpointing.min-pause: 0
+# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# execution.checkpointing.timeout: 10min
+# execution.checkpointing.tolerable-failed-checkpoints: 0
+# execution.checkpointing.unaligned: false
+#
+# Supported backends are 'hashmap', 'rocksdb', or the
+# &lt;class-name-of-factory&gt;.
+#
+# state.backend: hashmap
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task 
failures.
+# Only restart tasks that may have been affected by the task failure, which 
typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no 
longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# REST &amp; web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+rest.address: localhost
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST &amp; web server binds to
+# By default, this is localhost, which prevents the REST &amp; web server from
+# being able to communicate outside of the machine/container it is running on.
+#
+# To enable this, set the bind address to one that has access to outside-facing
+# network interface, such as 0.0.0.0.
+#
+rest.bind-address: localhost
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+# Flag to specify whether job cancellation is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.cancel.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's 
default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be 
used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+jobmanager.archive.fs.dir: ${jobmanager_archive_fs_dir}
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+historyserver.web.port: ${historyserver_web_port}
+
+# Comma separated list of directories to monitor for completed jobs.
+historyserver.archive.fs.dir: ${historyserver_archive_fs_dir}
+
+# Interval in milliseconds for refreshing the monitored directories.
+historyserver.archive.fs.refresh-interval: 
${historyserver_archive_fs_refresh_interval}
+]]>
+        </value>
+        <attrs>
+            <type>longtext</type>
+        </attrs>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
new file mode 100644
index 0000000..d754469
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>flink_log_dir</name>
+        <value>/var/log/flink</value>
+        <display-name>Flink Log Dir Prefix</display-name>
+        <description>Log Directories for Flink.</description>
+    </property>
+    <property>
+        <name>flink_pid_dir</name>
+        <display-name>Flink PID directory</display-name>
+        <value>/var/run/flink</value>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
new file mode 100644
index 0000000..d1614cc
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>content</name>
+        <description>Flink log4j-cli Properties</description>
+        <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be 
checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = FileAppender
+
+# Log all infos in the given file
+appender.file.name = FileAppender
+appender.file.type = FILE
+appender.file.append = false
+appender.file.fileName = ${sys:log.file}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Log output from org.apache.flink.yarn to the console. This is used by the
+# CliFrontend class when using a per-job YARN cluster.
+logger.yarn.name = org.apache.flink.yarn
+logger.yarn.level = INFO
+logger.yarn.appenderRef.console.ref = ConsoleAppender
+logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
+logger.yarncli.level = INFO
+logger.yarncli.appenderRef.console.ref = ConsoleAppender
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.hadoop.appenderRef.console.ref = ConsoleAppender
+
+# Make sure hive logs go to the file.
+logger.hive.name = org.apache.hadoop.hive
+logger.hive.level = INFO
+logger.hive.additivity = false
+logger.hive.appenderRef.file.ref = FileAppender
+
+# Log output from org.apache.flink.kubernetes to the console.
+logger.kubernetes.name = org.apache.flink.kubernetes
+logger.kubernetes.level = INFO
+logger.kubernetes.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+
+# suppress the warning that hadoop native libraries are not loaded (irrelevant 
for the client)
+logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
+logger.hadoopnative.level = OFF
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+</#noparse>
+]]>
+        </value>
+        <attrs>
+            <type>longtext</type>
+        </attrs>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
new file mode 100644
index 0000000..4e7f5f3
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>content</name>
+        <description>Flink log4j-console Properties</description>
+        <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be 
checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+rootLogger.appenderRef.rolling.ref = RollingFileAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+
+# Log all infos in the given rolling file
+appender.rolling.name = RollingFileAppender
+appender.rolling.type = RollingFile
+appender.rolling.append = true
+appender.rolling.fileName = ${sys:log.file}
+appender.rolling.filePattern = ${sys:log.file}.%i
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+</#noparse>
+]]>
+        </value>
+        <attrs>
+            <type>longtext</type>
+        </attrs>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
new file mode 100644
index 0000000..a9e40ff
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>content</name>
+        <description>Flink log4j Properties</description>
+        <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be 
checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = MainAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos in the given file
+appender.main.name = MainAppender
+appender.main.type = RollingFile
+appender.main.append = true
+appender.main.fileName = ${sys:log.file}
+appender.main.filePattern = ${sys:log.file}.%i
+appender.main.layout.type = PatternLayout
+appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.main.policies.type = Policies
+appender.main.policies.size.type = SizeBasedTriggeringPolicy
+appender.main.policies.size.size = 100MB
+appender.main.policies.startup.type = OnStartupTriggeringPolicy
+appender.main.strategy.type = DefaultRolloverStrategy
+appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+</#noparse>
+]]>
+        </value>
+        <attrs>
+            <type>longtext</type>
+        </attrs>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
new file mode 100644
index 0000000..0135311
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<configuration>
+    <property>
+        <name>content</name>
+        <description>Flink log4j-session Properties</description>
+        <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be 
checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = WARN
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = WARN
+logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework
+logger.curator.level = WARN
+logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils
+logger.runtimeutils.level = WARN
+logger.runtimeleader.name = 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver
+logger.runtimeleader.level = WARN
+</#noparse>
+]]>
+        </value>
+        <attrs>
+            <type>longtext</type>
+        </attrs>
+    </property>
+</configuration>
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
new file mode 100644
index 0000000..5598716
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    https://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+-->
+
+<metainfo>
+    <service>
+        <name>flink</name>
+        <display-name>Flink</display-name>
+        <desc>Flink is a framework and distributed processing engine for 
stateful computations over unbounded and bounded data streams.</desc>
+        <version>1.16.2-1</version>
+        <user>flink</user>
+
+        <components>
+            <component>
+                <name>flink_history_server</name>
+                <display-name>Flink History Server</display-name>
+                <category>master</category>
+                <cardinality>1+</cardinality>
+                <command-script>
+                    
<script-id>org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink.FlinkHistoryServerScript</script-id>
+                    <script-type>java</script-type>
+                    <timeout>1200</timeout>
+                </command-script>
+                <quick-link>
+                    <display-name>Flink HistoryServer UI</display-name>
+                    
<http-port-property>historyserver.web.port</http-port-property>
+                    <http-port-default>8082</http-port-default>
+                    
<https-port-property>historyserver.web.port</https-port-property>
+                    <https-port-default>8082</https-port-default>
+                </quick-link>
+            </component>
+            <component>
+                <name>flink_client</name>
+                <display-name>Flink Client</display-name>
+                <category>client</category>
+                <cardinality>1+</cardinality>
+                <command-script>
+                    
<script-id>org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink.FlinkClientScript</script-id>
+                    <script-type>java</script-type>
+                    <timeout>1200</timeout>
+                </command-script>
+            </component>
+        </components>
+
+        <os-specifics>
+            <os-specific>
+                <operating-systems>
+                    <os>centos7</os>
+                    <os>rocky8</os>
+                </operating-systems>
+                <architectures>
+                    <arch>x86_64</arch>
+                </architectures>
+                <packages>
+                    <package>flink_3_3_0</package>
+                </packages>
+            </os-specific>
+        </os-specifics>
+
+        <required-services>
+            <service>yarn</service>
+        </required-services>
+    </service>
+</metainfo>
\ No newline at end of file
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
new file mode 100644
index 0000000..459414e
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
@@ -0,0 +1,3 @@
+{
+    "FLINK_HISTORY_SERVER-START": ["NAMENODE-START", "DATANODE-START"]
+}
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
index c963833..a947ff6 100644
--- 
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
+++ 
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
@@ -35,7 +35,7 @@
     <property>
         <name>content</name>
         <display-name>zookeeper-env template</display-name>
-        <description>This is the jinja template for zookeeper-env.sh 
file</description>
+        <description>This is the freemarker template for zookeeper-env.sh 
file</description>
         <value><![CDATA[
 export JAVA_HOME=${java_home!}
 export ZOOKEEPER_HOME=${zookeeper_home!}
diff --git 
a/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
 
b/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
index 44bdc24..b5d1209 100644
--- 
a/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
+++ 
b/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
@@ -35,7 +35,7 @@
     <property>
         <name>content</name>
         <display-name>zookeeper-env template</display-name>
-        <description>This is the jinja template for zookeeper-env.sh 
file</description>
+        <description>This is the freemarker template for zookeeper-env.sh 
file</description>
         <value><![CDATA[
 export JAVA_HOME=${JAVA_HOME!}
 export ZOOKEEPER_HOME=${ZOOKEEPER_HOME!}
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
new file mode 100644
index 0000000..cdda767
--- /dev/null
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.shell.ShellResult;
+import org.apache.bigtop.manager.stack.core.param.Params;
+import org.apache.bigtop.manager.stack.core.spi.script.AbstractClientScript;
+import org.apache.bigtop.manager.stack.core.spi.script.Script;
+import org.apache.bigtop.manager.stack.core.utils.PackageUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(Script.class)
+public class FlinkClientScript extends AbstractClientScript {
+
+    @Override
+    public ShellResult install(Params params) {
+        return PackageUtils.install(params.getPackageList());
+    }
+
+    @Override
+    public ShellResult configure(Params params) {
+        return FlinkSetup.config(params);
+    }
+}
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
similarity index 65%
copy from 
bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
copy to 
bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
index 46a14ed..e4dc063 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bigtop.manager.stack.bigtop.v3_3_0.zookeeper;
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
 
 import org.apache.bigtop.manager.common.shell.ShellResult;
 import org.apache.bigtop.manager.stack.core.exception.StackException;
@@ -27,14 +27,9 @@ import 
org.apache.bigtop.manager.stack.core.utils.PackageUtils;
 import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;
 
 import com.google.auto.service.AutoService;
-import lombok.extern.slf4j.Slf4j;
 
-import java.io.IOException;
-import java.text.MessageFormat;
-
-@Slf4j
 @AutoService(Script.class)
-public class ZookeeperServerScript extends AbstractServerScript {
+public class FlinkHistoryServerScript extends AbstractServerScript {
 
     @Override
     public ShellResult install(Params params) {
@@ -43,36 +38,37 @@ public class ZookeeperServerScript extends 
AbstractServerScript {
 
     @Override
     public ShellResult configure(Params params) {
-        return ZookeeperSetup.config(params);
+        return FlinkSetup.config(params);
     }
 
     @Override
     public ShellResult start(Params params) {
         configure(params);
-        ZookeeperParams zookeeperParams = (ZookeeperParams) params;
-
-        String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh start", 
zookeeperParams.serviceHome());
+        FlinkParams flinkParams = (FlinkParams) params;
+        String hadoopClasspath = flinkParams.stackBinDir() + "/hadoop 
classpath";
+        String cmd = "export HADOOP_CLASSPATH=`" + hadoopClasspath + "`;" + 
flinkParams.stackLibDir()
+                + "/flink/bin/historyserver.sh start";
         try {
-            return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
-        } catch (IOException e) {
+            return LinuxOSUtils.sudoExecCmd(cmd, flinkParams.user());
+        } catch (Exception e) {
             throw new StackException(e);
         }
     }
 
     @Override
     public ShellResult stop(Params params) {
-        ZookeeperParams zookeeperParams = (ZookeeperParams) params;
-        String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh stop", 
zookeeperParams.serviceHome());
+        FlinkParams flinkParams = (FlinkParams) params;
+        String cmd = flinkParams.stackLibDir() + "/flink/bin/historyserver.sh 
stop";
         try {
-            return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
-        } catch (IOException e) {
+            return LinuxOSUtils.sudoExecCmd(cmd, flinkParams.user());
+        } catch (Exception e) {
             throw new StackException(e);
         }
     }
 
     @Override
     public ShellResult status(Params params) {
-        ZookeeperParams zookeeperParams = (ZookeeperParams) params;
-        return 
LinuxOSUtils.checkProcess(zookeeperParams.getZookeeperPidFile());
+        FlinkParams flinkParams = (FlinkParams) params;
+        return 
LinuxOSUtils.checkProcess(flinkParams.getHistoryServerPidFile());
     }
 }
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
new file mode 100644
index 0000000..8ae12cd
--- /dev/null
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
+import org.apache.bigtop.manager.stack.core.annotations.GlobalParams;
+import org.apache.bigtop.manager.stack.core.param.BaseParams;
+import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.text.MessageFormat;
+import java.util.Map;
+
+@Getter
+@Slf4j
+public class FlinkParams extends BaseParams {
+
+    private String flinkLogDir;
+    private String flinkPidDir;
+    private String historyServerPidFile;
+    private String flinkConfContent;
+    private String flinkLog4jPropertiesContent;
+    private String flinkLog4jCLiPropertiesContent;
+    private String flinkLog4jConsolePropertiesContent;
+    private String flinkLog4jSessionPropertiesContent;
+
+    private String jobManagerArchiveFsDir;
+    private String historyServerWebPort;
+    private String historyServerArchiveFsDir;
+    private String historyServerArchiveFsRefreshInterval;
+
+    public FlinkParams(CommandPayload commandPayload) {
+        super(commandPayload);
+        globalParamsMap.put("flink_user", user());
+        globalParamsMap.put("flink_group", group());
+        globalParamsMap.put("java_home", "/usr/local/java");
+        globalParamsMap.put("hadoop_home", hadoopHome());
+        globalParamsMap.put("hadoop_conf_dir", hadoopConfDir());
+
+        globalParamsMap.put("jobmanager_archive_fs_dir", 
jobManagerArchiveFsDir);
+        globalParamsMap.put("historyserver_web_port", historyServerWebPort);
+        globalParamsMap.put("historyserver_archive_fs_dir", 
historyServerArchiveFsDir);
+        globalParamsMap.put("historyserver_archive_fs_refresh_interval", 
historyServerArchiveFsRefreshInterval);
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkConf() {
+        Map<String, Object> configurations = 
LocalSettings.configurations(serviceName(), "flink-conf");
+        flinkConfContent = (String) configurations.get("content");
+
+        jobManagerArchiveFsDir = (String) 
configurations.get("jobmanager.archive.fs.dir");
+        historyServerWebPort = (String) 
configurations.get("historyserver.web.port");
+        historyServerArchiveFsDir = (String) 
configurations.get("historyserver.archive.fs.dir");
+        historyServerArchiveFsRefreshInterval =
+                (String) 
configurations.get("historyserver.archive.fs.refresh-interval");
+        return configurations;
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkEnv() {
+        Map<String, Object> configurations = 
LocalSettings.configurations(serviceName(), "flink-env");
+        flinkLogDir = (String) configurations.get("flink_log_dir");
+        flinkPidDir = (String) configurations.get("flink_pid_dir");
+        historyServerPidFile = 
MessageFormat.format("{0}/flink-{1}-historyserver.pid", flinkPidDir, user());
+        return configurations;
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkLog4jProperties() {
+        Map<String, Object> configurations = 
LocalSettings.configurations(serviceName(), "flink-log4j-properties");
+        flinkLog4jPropertiesContent = (String) configurations.get("content");
+        return configurations;
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkLog4jCLiProperties() {
+        Map<String, Object> configurations = 
LocalSettings.configurations(serviceName(), "flink-log4j-cli-properties");
+        flinkLog4jCLiPropertiesContent = (String) 
configurations.get("content");
+        return configurations;
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkLog4jConsoleProperties() {
+        Map<String, Object> configurations =
+                LocalSettings.configurations(serviceName(), 
"flink-log4j-console-properties");
+        flinkLog4jConsolePropertiesContent = (String) 
configurations.get("content");
+        return configurations;
+    }
+
+    @GlobalParams
+    public Map<String, Object> flinkLog4jSessionProperties() {
+        Map<String, Object> configurations =
+                LocalSettings.configurations(serviceName(), 
"flink-log4j-session-properties");
+        flinkLog4jSessionPropertiesContent = (String) 
configurations.get("content");
+        return configurations;
+    }
+
+    @Override
+    public String confDir() {
+        return "/etc/flink/conf";
+    }
+
+    @Override
+    public String serviceHome() {
+        return stackLibDir() + "/flink";
+    }
+
+    public String hadoopConfDir() {
+        return "/etc/hadoop/conf";
+    }
+
+    public String hadoopHome() {
+        return stackLibDir() + "/hadoop";
+    }
+}
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
new file mode 100644
index 0000000..c47319e
--- /dev/null
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.constants.Constants;
+import org.apache.bigtop.manager.common.shell.ShellResult;
+import org.apache.bigtop.manager.stack.bigtop.utils.HdfsUtil;
+import org.apache.bigtop.manager.stack.core.param.Params;
+import org.apache.bigtop.manager.stack.core.utils.linux.LinuxFileUtils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.text.MessageFormat;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class FlinkSetup {
+
+    public static ShellResult config(Params params) {
+        FlinkParams flinkParams = (FlinkParams) params;
+        String flinkUser = params.user();
+        String flinkGroup = params.group();
+        String confDir = flinkParams.confDir();
+
+        LinuxFileUtils.createDirectories(
+                flinkParams.getFlinkLogDir(), flinkUser, flinkGroup, 
Constants.PERMISSION_755, true);
+        LinuxFileUtils.createDirectories(
+                flinkParams.getFlinkPidDir(), flinkUser, flinkGroup, 
Constants.PERMISSION_755, true);
+
+        // log4j.properties
+        log.info("Generating [{}/log4j.properties] file", confDir);
+        LinuxFileUtils.toFileByTemplate(
+                flinkParams.getFlinkLog4jPropertiesContent(),
+                MessageFormat.format("{0}/log4j.properties", confDir),
+                flinkUser,
+                flinkGroup,
+                Constants.PERMISSION_644,
+                flinkParams.getGlobalParamsMap());
+
+        // log4j-cli.properties
+        log.info("Generating [{}/log4j-cli.properties] file", confDir);
+        LinuxFileUtils.toFileByTemplate(
+                flinkParams.getFlinkLog4jCLiPropertiesContent(),
+                MessageFormat.format("{0}/log4j-cli.properties", confDir),
+                flinkUser,
+                flinkGroup,
+                Constants.PERMISSION_644,
+                flinkParams.getGlobalParamsMap());
+
+        // log4j-console.properties
+        log.info("Generating [{}/log4j-console.properties] file", confDir);
+        LinuxFileUtils.toFileByTemplate(
+                flinkParams.getFlinkLog4jConsolePropertiesContent(),
+                MessageFormat.format("{0}/log4j-console.properties", confDir),
+                flinkUser,
+                flinkGroup,
+                Constants.PERMISSION_644,
+                flinkParams.getGlobalParamsMap());
+
+        // log4j-session.properties
+        log.info("Generating [{}/log4j-session.properties] file", confDir);
+        LinuxFileUtils.toFileByTemplate(
+                flinkParams.getFlinkLog4jSessionPropertiesContent(),
+                MessageFormat.format("{0}/log4j-session.properties", confDir),
+                flinkUser,
+                flinkGroup,
+                Constants.PERMISSION_644,
+                flinkParams.getGlobalParamsMap());
+
+        // flink-conf.yaml
+        log.info("Generating [{}/flink-conf.yaml] file", confDir);
+        LinuxFileUtils.toFileByTemplate(
+                flinkParams.getFlinkConfContent(),
+                MessageFormat.format("{0}/flink-conf.yaml", confDir),
+                flinkUser,
+                flinkGroup,
+                Constants.PERMISSION_644,
+                flinkParams.getGlobalParamsMap());
+
+        HdfsUtil.createDirectory(flinkUser, "/completed-jobs");
+
+        return ShellResult.success("Flink Configure success!");
+    }
+}
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
index 97714ce..7cc29c4 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
@@ -52,7 +52,7 @@ public class KafkaBrokerScript extends AbstractServerScript {
         KafkaParams kafkaParams = (KafkaParams) params;
 
         String cmd = MessageFormat.format(
-                "sh {0}/bin/kafka-server-start.sh {0}/config/server.properties 
> /dev/null 2>&1 & echo -n $!>{1}",
+                "{0}/bin/kafka-server-start.sh {0}/config/server.properties > 
/dev/null 2>&1 & echo -n $!>{1}",
                 kafkaParams.serviceHome(), kafkaParams.getKafkaPidFile());
         try {
             return LinuxOSUtils.sudoExecCmd(cmd, kafkaParams.user());
@@ -64,7 +64,7 @@ public class KafkaBrokerScript extends AbstractServerScript {
     @Override
     public ShellResult stop(Params params) {
         KafkaParams kafkaParams = (KafkaParams) params;
-        String cmd = MessageFormat.format("sh {0}/bin/kafka-server-stop.sh", 
kafkaParams.serviceHome());
+        String cmd = MessageFormat.format("{0}/bin/kafka-server-stop.sh", 
kafkaParams.serviceHome());
         try {
             return LinuxOSUtils.sudoExecCmd(cmd, kafkaParams.user());
         } catch (IOException e) {
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
index 46a14ed..4483094 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
@@ -51,7 +51,7 @@ public class ZookeeperServerScript extends 
AbstractServerScript {
         configure(params);
         ZookeeperParams zookeeperParams = (ZookeeperParams) params;
 
-        String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh start", 
zookeeperParams.serviceHome());
+        String cmd = MessageFormat.format("{0}/bin/zkServer.sh start", 
zookeeperParams.serviceHome());
         try {
             return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
         } catch (IOException e) {
@@ -62,7 +62,7 @@ public class ZookeeperServerScript extends 
AbstractServerScript {
     @Override
     public ShellResult stop(Params params) {
         ZookeeperParams zookeeperParams = (ZookeeperParams) params;
-        String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh stop", 
zookeeperParams.serviceHome());
+        String cmd = MessageFormat.format("{0}/bin/zkServer.sh stop", 
zookeeperParams.serviceHome());
         try {
             return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
         } catch (IOException e) {
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
 
b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
index 47f1015..af23aee 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bigtop.manager.stack.core.executor;
 
+import org.apache.bigtop.manager.common.constants.MessageConstants;
 import org.apache.bigtop.manager.common.enums.Command;
 import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
 import org.apache.bigtop.manager.common.message.entity.pojo.CustomCommandInfo;
@@ -114,6 +115,9 @@ public class StackExecutor {
 
             log.info("Executing {}::{}", script.getName(), method.getName());
             ShellResult result = (ShellResult) method.invoke(script, params);
+            if (result.getExitCode() != MessageConstants.SUCCESS_CODE) {
+                log.error("Error executing script: {}", result.getErrMsg());
+            }
 
             runAfterHook(command, params);
 

Reply via email to