This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git
The following commit(s) were added to refs/heads/main by this push:
new 272f24b BIGTOP-4135: Add Flink component on Bigtop-3.3.0 stack (#47)
272f24b is described below
commit 272f24bbbaa19e178abae7cdf8dc5088c074afbe
Author: Zhiguo Wu <[email protected]>
AuthorDate: Tue Aug 20 16:46:00 2024 +0800
BIGTOP-4135: Add Flink component on Bigtop-3.3.0 stack (#47)
---
.../services/flink/configuration/flink-conf.xml | 367 +++++++++++++++++++++
.../services/flink/configuration/flink-env.xml | 33 ++
.../configuration/flink-log4j-cli-properties.xml | 83 +++++
.../flink-log4j-console-properties.xml | 84 +++++
.../flink/configuration/flink-log4j-properties.xml | 73 ++++
.../flink-log4j-session-properties.xml | 58 ++++
.../bigtop/3.3.0/services/flink/metainfo.xml | 80 +++++
.../stacks/bigtop/3.3.0/services/flink/order.json | 3 +
.../zookeeper/configuration/zookeeper-env.xml | 2 +-
.../zookeeper/configuration/zookeeper-env.xml | 2 +-
.../bigtop/v3_3_0/flink/FlinkClientScript.java | 43 +++
.../FlinkHistoryServerScript.java} | 34 +-
.../stack/bigtop/v3_3_0/flink/FlinkParams.java | 133 ++++++++
.../stack/bigtop/v3_3_0/flink/FlinkSetup.java | 102 ++++++
.../bigtop/v3_3_0/kafka/KafkaBrokerScript.java | 4 +-
.../v3_3_0/zookeeper/ZookeeperServerScript.java | 4 +-
.../manager/stack/core/executor/StackExecutor.java | 4 +
17 files changed, 1084 insertions(+), 25 deletions(-)
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
new file mode 100644
index 0000000..a865f5d
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-conf.xml
@@ -0,0 +1,367 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>jobmanager.archive.fs.dir</name>
+ <value>hdfs:///completed-jobs/</value>
+ <description>Directory for JobManager to store the archives of
completed jobs.</description>
+ </property>
+ <property>
+ <name>historyserver.archive.fs.dir</name>
+ <value>hdfs:///completed-jobs/</value>
+ <description>Comma separated list of directories to fetch archived
jobs from.</description>
+ </property>
+ <property>
+ <name>historyserver.web.port</name>
+ <value>8082</value>
+ <description>The port under which the web-based HistoryServer
listens.</description>
+ </property>
+ <property>
+ <name>historyserver.archive.fs.refresh-interval</name>
+ <value>10000</value>
+ <description>Interval in milliseconds for refreshing the monitored
directories.</description>
+ </property>
+ <property>
+ <name>security.kerberos.login.keytab</name>
+ <description>Flink keytab path</description>
+ <value>none</value>
+ </property>
+ <property>
+ <name>security.kerberos.login.principal</name>
+ <description>Flink principal name</description>
+ <value>none</value>
+ </property>
+ <!-- flink-conf.yaml -->
+ <property>
+ <name>content</name>
+ <display-name>flink-conf template</display-name>
+ <description>This is the freemarker template for flink-conf.xml
file</description>
+ <value><![CDATA[
+#==============================================================================
+# Common
+#==============================================================================
+
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This
setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host hostname parameter of the bin/jobmanager.sh
executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and
setup
+# the conf/masters file, this will be taken care of automatically. Yarn
+# automatically configure the host name based on the hostname of the node
where the
+# JobManager runs.
+
+jobmanager.rpc.address: localhost
+
+# The RPC port where the JobManager is reachable.
+
+jobmanager.rpc.port: 6123
+
+# The host interface the JobManager will bind to. My default, this is
localhost, and will prevent
+# the JobManager from communicating outside the machine/container it is
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an
outside facing network
+# interface, such as 0.0.0.0.
+
+jobmanager.bind-host: localhost
+
+
+# The total process memory size for the JobManager.
+#
+# Note this accounts for all memory usage within the JobManager process,
including JVM metaspace and other overhead.
+
+jobmanager.memory.process.size: 1024m
+
+# The host interface the TaskManager will bind to. By default, this is
localhost, and will prevent
+# the TaskManager from communicating outside the machine/container it is
running on.
+# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
+# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+#
+# To enable this, set the bind-host address to one that has access to an
outside facing network
+# interface, such as 0.0.0.0.
+
+taskmanager.bind-host: localhost
+
+# The address of the host on which the TaskManager runs and can be reached by
the JobManager and
+# other TaskManagers. If not specified, the TaskManager will try different
strategies to identify
+# the address.
+#
+# Note this address needs to be reachable by the JobManager and forward
traffic to one of
+# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+#
+# Note also that unless all TaskManagers are running on the same machine, this
address needs to be
+# configured separately for each TaskManager.
+
+taskmanager.host: localhost
+
+# The total process memory size for the TaskManager.
+#
+# Note this accounts for all memory usage within the TaskManager process,
including JVM metaspace and other overhead.
+
+taskmanager.memory.process.size: 1024m
+
+# To exclude JVM metaspace and overhead, please, use total Flink memory size
instead of 'taskmanager.memory.process.size'.
+# It is not recommended to set both 'taskmanager.memory.process.size' and
Flink memory.
+#
+# taskmanager.memory.flink.size: 1280m
+
+# The number of task slots that each TaskManager offers. Each slot runs one
parallel pipeline.
+
+taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
+parallelism.default: 1
+
+# The default file system scheme and authority.
+#
+# By default file paths without scheme are interpreted relative to the local
+# root file system 'file:///'. Use this to override the default and interpret
+# relative paths relative to a different file system,
+# for example 'hdfs://mynamenode:12345'
+#
+# fs.default-scheme
+
+#==============================================================================
+# JVM and Logging Options
+#==============================================================================
+# Java runtime to use
+env.java.home: ${java_home}
+
+# Path to hadoop configuration directory. It is required to read HDFS and/or
YARN configuration.
+# You can also set it via environment variable.
+env.hadoop.conf.dir: ${hadoop_conf_dir}
+
+# Defines the directory where the flink-<host>-<process>.pid files
are saved.
+env.pid.dir: ${flink_pid_dir}
+
+# Defines the directory where the Flink logs are saved.
+env.log.dir: ${flink_log_dir}
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#
+# high-availability: zookeeper
+
+# The path where metadata for master recovery is persisted. While ZooKeeper
stores
+# the small ground truth for checkpoint and leader election, this location
stores
+# the larger objects, like persisted dataflow graphs.
+#
+# Must be a durable file system that is accessible from all nodes
+# (like HDFS, S3, Ceph, nfs, ...)
+#
+# high-availability.storageDir: hdfs:///flink/ha/
+
+# The list of ZooKeeper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form:
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#
+# high-availability.zookeeper.quorum: localhost:2181
+
+
+# ACL options are based on
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open"
(ZOO_OPEN_ACL_UNSAFE)
+# The default value is "open" and it can be changed to "creator" if ZK
security is enabled
+#
+# high-availability.zookeeper.client.acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when
execution.checkpointing.interval > 0.
+#
+# Execution checkpointing related parameters. Please refer to CheckpointConfig
and ExecutionCheckpointingOptions for more details.
+#
+# execution.checkpointing.interval: 3min
+# execution.checkpointing.externalized-checkpoint-retention:
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
+# execution.checkpointing.max-concurrent-checkpoints: 1
+# execution.checkpointing.min-pause: 0
+# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# execution.checkpointing.timeout: 10min
+# execution.checkpointing.tolerable-failed-checkpoints: 0
+# execution.checkpointing.unaligned: false
+#
+# Supported backends are 'hashmap', 'rocksdb', or the
+# <class-name-of-factory>.
+#
+# state.backend: hashmap
+
+# Directory for checkpoints filesystem, when using any of the default bundled
+# state backends.
+#
+# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
+
+# Default target directory for savepoints, optional.
+#
+# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
+
+# Flag to enable/disable incremental checkpoints for backends that
+# support incremental checkpoints (like the RocksDB state backend).
+#
+# state.backend.incremental: false
+
+# The failover strategy, i.e., how the job computation recovers from task
failures.
+# Only restart tasks that may have been affected by the task failure, which
typically includes
+# downstream tasks and potentially upstream tasks if their produced data is no
longer available for consumption.
+
+jobmanager.execution.failover-strategy: region
+
+#==============================================================================
+# REST & web frontend
+#==============================================================================
+
+# The port to which the REST client connects to. If rest.bind-port has
+# not been specified, then the server will bind to this port as well.
+#
+#rest.port: 8081
+
+# The address to which the REST client will connect to
+#
+rest.address: localhost
+
+# Port range for the REST and web server to bind to.
+#
+#rest.bind-port: 8080-8090
+
+# The address that the REST & web server binds to
+# By default, this is localhost, which prevents the REST & web server from
+# being able to communicate outside of the machine/container it is running on.
+#
+# To enable this, set the bind address to one that has access to outside-facing
+# network interface, such as 0.0.0.0.
+#
+rest.bind-address: localhost
+
+# Flag to specify whether job submission is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.submit.enable: false
+
+# Flag to specify whether job cancellation is enabled from the web-based
+# runtime monitor. Uncomment to disable.
+
+#web.cancel.enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# Override the directories for temporary files. If not specified, the
+# system-specific Java temporary directory (java.io.tmpdir property) is taken.
+#
+# For framework setups on Yarn, Flink will automatically pick up the
+# containers' temp directories without any need for configuration.
+#
+# Add a delimited list for multiple directories, using the system directory
+# delimiter (colon ':' on unix) or a comma, e.g.:
+# /data1/tmp:/data2/tmp:/data3/tmp
+#
+# Note: Each directory entry is read from and written to by a different I/O
+# thread. You can include the same directory multiple times in order to create
+# multiple I/O threads against that directory. This is for example relevant for
+# high-throughput RAIDs.
+#
+# io.tmp.dirs: /tmp
+
+# The classloading resolve order. Possible values are 'child-first' (Flink's
default)
+# and 'parent-first' (Java's default).
+#
+# Child first classloading allows users to use different dependency/library
+# versions in their application than those in the classpath. Switching back
+# to 'parent-first' may help with debugging dependency issues.
+#
+# classloader.resolve-order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager.memory.network.fraction: 0.1
+# taskmanager.memory.network.min: 64mb
+# taskmanager.memory.network.max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# The below configure how Kerberos credentials are provided. A keytab will be
used instead of
+# a ticket cache if the keytab path and principal are set.
+
+# security.kerberos.login.use-ticket-cache: true
+# security.kerberos.login.keytab: /path/to/kerberos/keytab
+# security.kerberos.login.principal: flink-user
+# The configuration below defines which JAAS login contexts
+
+# security.kerberos.login.contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# Below configurations are applicable if ZK ensemble is configured for security
+
+# Override below configuration to provide custom ZK service name if configured
+# zookeeper.sasl.service-name: zookeeper
+
+# The configuration below must match one of the values set in
"security.kerberos.login.contexts"
+# zookeeper.sasl.login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh
(start|stop)
+
+# Directory to upload completed jobs to. Add this directory to the list of
+# monitored directories of the HistoryServer as well (see below).
+jobmanager.archive.fs.dir: ${jobmanager_archive_fs_dir}
+
+# The address under which the web-based HistoryServer listens.
+#historyserver.web.address: 0.0.0.0
+
+# The port under which the web-based HistoryServer listens.
+historyserver.web.port: ${historyserver_web_port}
+
+# Comma separated list of directories to monitor for completed jobs.
+historyserver.archive.fs.dir: ${historyserver_archive_fs_dir}
+
+# Interval in milliseconds for refreshing the monitored directories.
+historyserver.archive.fs.refresh-interval:
${historyserver_archive_fs_refresh_interval}
+]]>
+ </value>
+ <attrs>
+ <type>longtext</type>
+ </attrs>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
new file mode 100644
index 0000000..d754469
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-env.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>flink_log_dir</name>
+ <value>/var/log/flink</value>
+ <display-name>Flink Log Dir Prefix</display-name>
+ <description>Log Directories for Flink.</description>
+ </property>
+ <property>
+ <name>flink_pid_dir</name>
+ <display-name>Flink PID directory</display-name>
+ <value>/var/run/flink</value>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
new file mode 100644
index 0000000..d1614cc
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-cli-properties.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>content</name>
+ <description>Flink log4j-cli Properties</description>
+ <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = FileAppender
+
+# Log all infos in the given file
+appender.file.name = FileAppender
+appender.file.type = FILE
+appender.file.append = false
+appender.file.fileName = ${sys:log.file}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# Log output from org.apache.flink.yarn to the console. This is used by the
+# CliFrontend class when using a per-job YARN cluster.
+logger.yarn.name = org.apache.flink.yarn
+logger.yarn.level = INFO
+logger.yarn.appenderRef.console.ref = ConsoleAppender
+logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
+logger.yarncli.level = INFO
+logger.yarncli.appenderRef.console.ref = ConsoleAppender
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.hadoop.appenderRef.console.ref = ConsoleAppender
+
+# Make sure hive logs go to the file.
+logger.hive.name = org.apache.hadoop.hive
+logger.hive.level = INFO
+logger.hive.additivity = false
+logger.hive.appenderRef.file.ref = FileAppender
+
+# Log output from org.apache.flink.kubernetes to the console.
+logger.kubernetes.name = org.apache.flink.kubernetes
+logger.kubernetes.level = INFO
+logger.kubernetes.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n
+
+# suppress the warning that hadoop native libraries are not loaded (irrelevant
for the client)
+logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
+logger.hadoopnative.level = OFF
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+</#noparse>
+]]>
+ </value>
+ <attrs>
+ <type>longtext</type>
+ </attrs>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
new file mode 100644
index 0000000..4e7f5f3
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-console-properties.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>content</name>
+ <description>Flink log4j-console Properties</description>
+ <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+rootLogger.appenderRef.rolling.ref = RollingFileAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n
+
+# Log all infos in the given rolling file
+appender.rolling.name = RollingFileAppender
+appender.rolling.type = RollingFile
+appender.rolling.append = true
+appender.rolling.fileName = ${sys:log.file}
+appender.rolling.filePattern = ${sys:log.file}.%i
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100MB
+appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+</#noparse>
+]]>
+ </value>
+ <attrs>
+ <type>longtext</type>
+ </attrs>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
new file mode 100644
index 0000000..a9e40ff
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-properties.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>content</name>
+ <description>Flink log4j Properties</description>
+ <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
+monitorInterval=30
+
+# This affects logging for both user code and Flink
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = MainAppender
+
+# Uncomment this if you want to _only_ change Flink's logging
+#logger.flink.name = org.apache.flink
+#logger.flink.level = INFO
+
+# The following lines keep the log level of common libraries/connectors on
+# log level INFO. The root logger does not override this. You have to manually
+# change the log levels here.
+logger.akka.name = akka
+logger.akka.level = INFO
+logger.kafka.name= org.apache.kafka
+logger.kafka.level = INFO
+logger.hadoop.name = org.apache.hadoop
+logger.hadoop.level = INFO
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = INFO
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = INFO
+
+# Log all infos in the given file
+appender.main.name = MainAppender
+appender.main.type = RollingFile
+appender.main.append = true
+appender.main.fileName = ${sys:log.file}
+appender.main.filePattern = ${sys:log.file}.%i
+appender.main.layout.type = PatternLayout
+appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.main.policies.type = Policies
+appender.main.policies.size.type = SizeBasedTriggeringPolicy
+appender.main.policies.size.size = 100MB
+appender.main.policies.startup.type = OnStartupTriggeringPolicy
+appender.main.strategy.type = DefaultRolloverStrategy
+appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
+</#noparse>
+]]>
+ </value>
+ <attrs>
+ <type>longtext</type>
+ </attrs>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
new file mode 100644
index 0000000..0135311
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/configuration/flink-log4j-session-properties.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<configuration>
+ <property>
+ <name>content</name>
+ <description>Flink log4j-session Properties</description>
+ <value><![CDATA[
+<#noparse>
+# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
+monitorInterval=30
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n
+
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = OFF
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = WARN
+logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
+logger.shaded_zookeeper.level = WARN
+logger.curator.name = org.apache.flink.shaded.org.apache.curator.framework
+logger.curator.level = WARN
+logger.runtimeutils.name= org.apache.flink.runtime.util.ZooKeeperUtils
+logger.runtimeutils.level = WARN
+logger.runtimeleader.name =
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver
+logger.runtimeleader.level = WARN
+</#noparse>
+]]>
+ </value>
+ <attrs>
+ <type>longtext</type>
+ </attrs>
+ </property>
+</configuration>
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
new file mode 100644
index 0000000..5598716
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/metainfo.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ https://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+<metainfo>
+ <service>
+ <name>flink</name>
+ <display-name>Flink</display-name>
+ <desc>Flink is a framework and distributed processing engine for
stateful computations over unbounded and bounded data streams.</desc>
+ <version>1.16.2-1</version>
+ <user>flink</user>
+
+ <components>
+ <component>
+ <name>flink_history_server</name>
+ <display-name>Flink History Server</display-name>
+ <category>master</category>
+ <cardinality>1+</cardinality>
+ <command-script>
+
<script-id>org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink.FlinkHistoryServerScript</script-id>
+ <script-type>java</script-type>
+ <timeout>1200</timeout>
+ </command-script>
+ <quick-link>
+ <display-name>Flink HistoryServer UI</display-name>
+
<http-port-property>historyserver.web.port</http-port-property>
+ <http-port-default>8082</http-port-default>
+
<https-port-property>historyserver.web.port</https-port-property>
+ <https-port-default>8082</https-port-default>
+ </quick-link>
+ </component>
+ <component>
+ <name>flink_client</name>
+ <display-name>Flink Client</display-name>
+ <category>client</category>
+ <cardinality>1+</cardinality>
+ <command-script>
+
<script-id>org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink.FlinkClientScript</script-id>
+ <script-type>java</script-type>
+ <timeout>1200</timeout>
+ </command-script>
+ </component>
+ </components>
+
+ <os-specifics>
+ <os-specific>
+ <operating-systems>
+ <os>centos7</os>
+ <os>rocky8</os>
+ </operating-systems>
+ <architectures>
+ <arch>x86_64</arch>
+ </architectures>
+ <packages>
+ <package>flink_3_3_0</package>
+ </packages>
+ </os-specific>
+ </os-specifics>
+
+ <required-services>
+ <service>yarn</service>
+ </required-services>
+ </service>
+</metainfo>
\ No newline at end of file
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
new file mode 100644
index 0000000..459414e
--- /dev/null
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/flink/order.json
@@ -0,0 +1,3 @@
+{
+ "FLINK_HISTORY_SERVER-START": ["NAMENODE-START", "DATANODE-START"]
+}
diff --git
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
index c963833..a947ff6 100644
---
a/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
+++
b/bigtop-manager-server/src/main/resources/stacks/bigtop/3.3.0/services/zookeeper/configuration/zookeeper-env.xml
@@ -35,7 +35,7 @@
<property>
<name>content</name>
<display-name>zookeeper-env template</display-name>
- <description>This is the jinja template for zookeeper-env.sh
file</description>
+ <description>This is the freemarker template for zookeeper-env.sh
file</description>
<value><![CDATA[
export JAVA_HOME=${java_home!}
export ZOOKEEPER_HOME=${zookeeper_home!}
diff --git
a/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
b/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
index 44bdc24..b5d1209 100644
---
a/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
+++
b/bigtop-manager-server/src/main/resources/stacks/nop/1.0.0/services/zookeeper/configuration/zookeeper-env.xml
@@ -35,7 +35,7 @@
<property>
<name>content</name>
<display-name>zookeeper-env template</display-name>
- <description>This is the jinja template for zookeeper-env.sh
file</description>
+ <description>This is the freemarker template for zookeeper-env.sh
file</description>
<value><![CDATA[
export JAVA_HOME=${JAVA_HOME!}
export ZOOKEEPER_HOME=${ZOOKEEPER_HOME!}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
new file mode 100644
index 0000000..cdda767
--- /dev/null
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkClientScript.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.shell.ShellResult;
+import org.apache.bigtop.manager.stack.core.param.Params;
+import org.apache.bigtop.manager.stack.core.spi.script.AbstractClientScript;
+import org.apache.bigtop.manager.stack.core.spi.script.Script;
+import org.apache.bigtop.manager.stack.core.utils.PackageUtils;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(Script.class)
+public class FlinkClientScript extends AbstractClientScript {
+
+ @Override
+ public ShellResult install(Params params) {
+ return PackageUtils.install(params.getPackageList());
+ }
+
+ @Override
+ public ShellResult configure(Params params) {
+ return FlinkSetup.config(params);
+ }
+}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
similarity index 65%
copy from
bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
copy to
bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
index 46a14ed..e4dc063 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkHistoryServerScript.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bigtop.manager.stack.bigtop.v3_3_0.zookeeper;
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
import org.apache.bigtop.manager.common.shell.ShellResult;
import org.apache.bigtop.manager.stack.core.exception.StackException;
@@ -27,14 +27,9 @@ import
org.apache.bigtop.manager.stack.core.utils.PackageUtils;
import org.apache.bigtop.manager.stack.core.utils.linux.LinuxOSUtils;
import com.google.auto.service.AutoService;
-import lombok.extern.slf4j.Slf4j;
-import java.io.IOException;
-import java.text.MessageFormat;
-
-@Slf4j
@AutoService(Script.class)
-public class ZookeeperServerScript extends AbstractServerScript {
+public class FlinkHistoryServerScript extends AbstractServerScript {
@Override
public ShellResult install(Params params) {
@@ -43,36 +38,37 @@ public class ZookeeperServerScript extends
AbstractServerScript {
@Override
public ShellResult configure(Params params) {
- return ZookeeperSetup.config(params);
+ return FlinkSetup.config(params);
}
@Override
public ShellResult start(Params params) {
configure(params);
- ZookeeperParams zookeeperParams = (ZookeeperParams) params;
-
- String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh start",
zookeeperParams.serviceHome());
+ FlinkParams flinkParams = (FlinkParams) params;
+ String hadoopClasspath = flinkParams.stackBinDir() + "/hadoop
classpath";
+ String cmd = "export HADOOP_CLASSPATH=`" + hadoopClasspath + "`;" +
flinkParams.stackLibDir()
+ + "/flink/bin/historyserver.sh start";
try {
- return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
- } catch (IOException e) {
+ return LinuxOSUtils.sudoExecCmd(cmd, flinkParams.user());
+ } catch (Exception e) {
throw new StackException(e);
}
}
@Override
public ShellResult stop(Params params) {
- ZookeeperParams zookeeperParams = (ZookeeperParams) params;
- String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh stop",
zookeeperParams.serviceHome());
+ FlinkParams flinkParams = (FlinkParams) params;
+ String cmd = flinkParams.stackLibDir() + "/flink/bin/historyserver.sh
stop";
try {
- return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
- } catch (IOException e) {
+ return LinuxOSUtils.sudoExecCmd(cmd, flinkParams.user());
+ } catch (Exception e) {
throw new StackException(e);
}
}
@Override
public ShellResult status(Params params) {
- ZookeeperParams zookeeperParams = (ZookeeperParams) params;
- return
LinuxOSUtils.checkProcess(zookeeperParams.getZookeeperPidFile());
+ FlinkParams flinkParams = (FlinkParams) params;
+ return
LinuxOSUtils.checkProcess(flinkParams.getHistoryServerPidFile());
}
}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
new file mode 100644
index 0000000..8ae12cd
--- /dev/null
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkParams.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
+import org.apache.bigtop.manager.stack.core.annotations.GlobalParams;
+import org.apache.bigtop.manager.stack.core.param.BaseParams;
+import org.apache.bigtop.manager.stack.core.utils.LocalSettings;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.text.MessageFormat;
+import java.util.Map;
+
+@Getter
+@Slf4j
+public class FlinkParams extends BaseParams {
+
+ private String flinkLogDir;
+ private String flinkPidDir;
+ private String historyServerPidFile;
+ private String flinkConfContent;
+ private String flinkLog4jPropertiesContent;
+ private String flinkLog4jCLiPropertiesContent;
+ private String flinkLog4jConsolePropertiesContent;
+ private String flinkLog4jSessionPropertiesContent;
+
+ private String jobManagerArchiveFsDir;
+ private String historyServerWebPort;
+ private String historyServerArchiveFsDir;
+ private String historyServerArchiveFsRefreshInterval;
+
+ public FlinkParams(CommandPayload commandPayload) {
+ super(commandPayload);
+ globalParamsMap.put("flink_user", user());
+ globalParamsMap.put("flink_group", group());
+ globalParamsMap.put("java_home", "/usr/local/java");
+ globalParamsMap.put("hadoop_home", hadoopHome());
+ globalParamsMap.put("hadoop_conf_dir", hadoopConfDir());
+
+ globalParamsMap.put("jobmanager_archive_fs_dir",
jobManagerArchiveFsDir);
+ globalParamsMap.put("historyserver_web_port", historyServerWebPort);
+ globalParamsMap.put("historyserver_archive_fs_dir",
historyServerArchiveFsDir);
+ globalParamsMap.put("historyserver_archive_fs_refresh_interval",
historyServerArchiveFsRefreshInterval);
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkConf() {
+ Map<String, Object> configurations =
LocalSettings.configurations(serviceName(), "flink-conf");
+ flinkConfContent = (String) configurations.get("content");
+
+ jobManagerArchiveFsDir = (String)
configurations.get("jobmanager.archive.fs.dir");
+ historyServerWebPort = (String)
configurations.get("historyserver.web.port");
+ historyServerArchiveFsDir = (String)
configurations.get("historyserver.archive.fs.dir");
+ historyServerArchiveFsRefreshInterval =
+ (String)
configurations.get("historyserver.archive.fs.refresh-interval");
+ return configurations;
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkEnv() {
+ Map<String, Object> configurations =
LocalSettings.configurations(serviceName(), "flink-env");
+ flinkLogDir = (String) configurations.get("flink_log_dir");
+ flinkPidDir = (String) configurations.get("flink_pid_dir");
+ historyServerPidFile =
MessageFormat.format("{0}/flink-{1}-historyserver.pid", flinkPidDir, user());
+ return configurations;
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkLog4jProperties() {
+ Map<String, Object> configurations =
LocalSettings.configurations(serviceName(), "flink-log4j-properties");
+ flinkLog4jPropertiesContent = (String) configurations.get("content");
+ return configurations;
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkLog4jCLiProperties() {
+ Map<String, Object> configurations =
LocalSettings.configurations(serviceName(), "flink-log4j-cli-properties");
+ flinkLog4jCLiPropertiesContent = (String)
configurations.get("content");
+ return configurations;
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkLog4jConsoleProperties() {
+ Map<String, Object> configurations =
+ LocalSettings.configurations(serviceName(),
"flink-log4j-console-properties");
+ flinkLog4jConsolePropertiesContent = (String)
configurations.get("content");
+ return configurations;
+ }
+
+ @GlobalParams
+ public Map<String, Object> flinkLog4jSessionProperties() {
+ Map<String, Object> configurations =
+ LocalSettings.configurations(serviceName(),
"flink-log4j-session-properties");
+ flinkLog4jSessionPropertiesContent = (String)
configurations.get("content");
+ return configurations;
+ }
+
+ @Override
+ public String confDir() {
+ return "/etc/flink/conf";
+ }
+
+ @Override
+ public String serviceHome() {
+ return stackLibDir() + "/flink";
+ }
+
+ public String hadoopConfDir() {
+ return "/etc/hadoop/conf";
+ }
+
+ public String hadoopHome() {
+ return stackLibDir() + "/hadoop";
+ }
+}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
new file mode 100644
index 0000000..c47319e
--- /dev/null
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/flink/FlinkSetup.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.stack.bigtop.v3_3_0.flink;
+
+import org.apache.bigtop.manager.common.constants.Constants;
+import org.apache.bigtop.manager.common.shell.ShellResult;
+import org.apache.bigtop.manager.stack.bigtop.utils.HdfsUtil;
+import org.apache.bigtop.manager.stack.core.param.Params;
+import org.apache.bigtop.manager.stack.core.utils.linux.LinuxFileUtils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.text.MessageFormat;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class FlinkSetup {
+
+ public static ShellResult config(Params params) {
+ FlinkParams flinkParams = (FlinkParams) params;
+ String flinkUser = params.user();
+ String flinkGroup = params.group();
+ String confDir = flinkParams.confDir();
+
+ LinuxFileUtils.createDirectories(
+ flinkParams.getFlinkLogDir(), flinkUser, flinkGroup,
Constants.PERMISSION_755, true);
+ LinuxFileUtils.createDirectories(
+ flinkParams.getFlinkPidDir(), flinkUser, flinkGroup,
Constants.PERMISSION_755, true);
+
+ // log4j.properties
+ log.info("Generating [{}/log4j.properties] file", confDir);
+ LinuxFileUtils.toFileByTemplate(
+ flinkParams.getFlinkLog4jPropertiesContent(),
+ MessageFormat.format("{0}/log4j.properties", confDir),
+ flinkUser,
+ flinkGroup,
+ Constants.PERMISSION_644,
+ flinkParams.getGlobalParamsMap());
+
+ // log4j-cli.properties
+ log.info("Generating [{}/log4j-cli.properties] file", confDir);
+ LinuxFileUtils.toFileByTemplate(
+ flinkParams.getFlinkLog4jCLiPropertiesContent(),
+ MessageFormat.format("{0}/log4j-cli.properties", confDir),
+ flinkUser,
+ flinkGroup,
+ Constants.PERMISSION_644,
+ flinkParams.getGlobalParamsMap());
+
+ // log4j-console.properties
+ log.info("Generating [{}/log4j-console.properties] file", confDir);
+ LinuxFileUtils.toFileByTemplate(
+ flinkParams.getFlinkLog4jConsolePropertiesContent(),
+ MessageFormat.format("{0}/log4j-console.properties", confDir),
+ flinkUser,
+ flinkGroup,
+ Constants.PERMISSION_644,
+ flinkParams.getGlobalParamsMap());
+
+ // log4j-session.properties
+ log.info("Generating [{}/log4j-session.properties] file", confDir);
+ LinuxFileUtils.toFileByTemplate(
+ flinkParams.getFlinkLog4jSessionPropertiesContent(),
+ MessageFormat.format("{0}/log4j-session.properties", confDir),
+ flinkUser,
+ flinkGroup,
+ Constants.PERMISSION_644,
+ flinkParams.getGlobalParamsMap());
+
+ // flink-conf.yaml
+ log.info("Generating [{}/flink-conf.yaml] file", confDir);
+ LinuxFileUtils.toFileByTemplate(
+ flinkParams.getFlinkConfContent(),
+ MessageFormat.format("{0}/flink-conf.yaml", confDir),
+ flinkUser,
+ flinkGroup,
+ Constants.PERMISSION_644,
+ flinkParams.getGlobalParamsMap());
+
+ HdfsUtil.createDirectory(flinkUser, "/completed-jobs");
+
+ return ShellResult.success("Flink Configure success!");
+ }
+}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
index 97714ce..7cc29c4 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaBrokerScript.java
@@ -52,7 +52,7 @@ public class KafkaBrokerScript extends AbstractServerScript {
KafkaParams kafkaParams = (KafkaParams) params;
String cmd = MessageFormat.format(
- "sh {0}/bin/kafka-server-start.sh {0}/config/server.properties
> /dev/null 2>&1 & echo -n $!>{1}",
+ "{0}/bin/kafka-server-start.sh {0}/config/server.properties >
/dev/null 2>&1 & echo -n $!>{1}",
kafkaParams.serviceHome(), kafkaParams.getKafkaPidFile());
try {
return LinuxOSUtils.sudoExecCmd(cmd, kafkaParams.user());
@@ -64,7 +64,7 @@ public class KafkaBrokerScript extends AbstractServerScript {
@Override
public ShellResult stop(Params params) {
KafkaParams kafkaParams = (KafkaParams) params;
- String cmd = MessageFormat.format("sh {0}/bin/kafka-server-stop.sh",
kafkaParams.serviceHome());
+ String cmd = MessageFormat.format("{0}/bin/kafka-server-stop.sh",
kafkaParams.serviceHome());
try {
return LinuxOSUtils.sudoExecCmd(cmd, kafkaParams.user());
} catch (IOException e) {
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
index 46a14ed..4483094 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/zookeeper/ZookeeperServerScript.java
@@ -51,7 +51,7 @@ public class ZookeeperServerScript extends
AbstractServerScript {
configure(params);
ZookeeperParams zookeeperParams = (ZookeeperParams) params;
- String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh start",
zookeeperParams.serviceHome());
+ String cmd = MessageFormat.format("{0}/bin/zkServer.sh start",
zookeeperParams.serviceHome());
try {
return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
} catch (IOException e) {
@@ -62,7 +62,7 @@ public class ZookeeperServerScript extends
AbstractServerScript {
@Override
public ShellResult stop(Params params) {
ZookeeperParams zookeeperParams = (ZookeeperParams) params;
- String cmd = MessageFormat.format("sh {0}/bin/zkServer.sh stop",
zookeeperParams.serviceHome());
+ String cmd = MessageFormat.format("{0}/bin/zkServer.sh stop",
zookeeperParams.serviceHome());
try {
return LinuxOSUtils.sudoExecCmd(cmd, zookeeperParams.user());
} catch (IOException e) {
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
index 47f1015..af23aee 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.bigtop.manager.stack.core.executor;
+import org.apache.bigtop.manager.common.constants.MessageConstants;
import org.apache.bigtop.manager.common.enums.Command;
import org.apache.bigtop.manager.common.message.entity.payload.CommandPayload;
import org.apache.bigtop.manager.common.message.entity.pojo.CustomCommandInfo;
@@ -114,6 +115,9 @@ public class StackExecutor {
log.info("Executing {}::{}", script.getName(), method.getName());
ShellResult result = (ShellResult) method.invoke(script, params);
+ if (result.getExitCode() != MessageConstants.SUCCESS_CODE) {
+ log.error("Error executing script: {}", result.getErrMsg());
+ }
runAfterHook(command, params);