xiaotongwang1 opened a new issue #9367:
URL: https://github.com/apache/pulsar/issues/9367
我最近针对pulsar开展项目预研,主要从安全加固方面做了一些分析并做了一些整改以满足一些基本安全要求。
Recently, I conducted a project pre-research for Pulsar, and analyzed
security hardening and made some improvements to meet some basic security
requirements.
当前已经在项目组内部完成基本测试,为了方便我们后续的版本能够跟随开源社区版本演进,我们期望社区commiter or
PMC帮忙审核下,如果社区能够接纳我们修改的功能点,我将向公司提出申请,提交下面功能源码到开源社区
Basic tests have been completed in the project team. To facilitate future
versions to evolve with the open source community version, we expect the
community committer or PMC to review the changes. If the community accepts the
modified functions, I will submit an application to company and submit the
following function source code to the open source community.
# 侵入式修改/Intrusive modification</a>
## 打包过程显示指定配置文件unix模式/Displaying the Unix Mode of a Specified Configuration
File During Packaging</a>
**文件路径:**/distribution/server/src/assemble/bin.xml
**修改说明:**
<lineEnding\>unix</lineEnding\>
指定unix,避免windows打包后部署linux上运行失败
Specify UNIX to prevent running failures on Linux after Windows is packed.
## 所有zookeeper访问操作java文件(安全相关)/Java files for all ZooKeeper access
operations (security-related)</a>
**文件路径:** 太多了,不一一列举,可以搜索Ids.OPEN\_ACL\_UNSAFE /You can search for
Ids.OPEN\_ACL\_UNSAFE.
**修改说明:**
示例:
原始: zkc.create\(path + "/ledgers", new byte\[0\], Ids.OPEN\_ACL\_UNSAFE,
CreateMode.PERSISTENT\);
修改后:zkc.create\(path +"/ledgers/available", new byte\[0\],
Ids.CREATOR\_ALL\_ACL, CreateMode.PERSISTENT\);
Todo:平滑升级,需要在额外加个System properties开关
Todo: Smooth upgrade requires additional system properties.
## 自动获取本地IP作为advertisedAddress/Automatically obtain the local IP address as
the advertisedAddress.</a>
**文件路径:**pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
**修改说明:**
由于没有内网DNS解析器,设置advertisedAddress为IP
Because there is no intranet DNS resolver, set advertisedAddress to the IP
address.
```
public String getAdvertisedAddress() {
try {
if ("ip".equals(advertisedAddress)) {
return getLocalIp();
}
return advertisedAddress;
} catch (Exception e) {
return advertisedAddress;
}
}
```
## Prometheus客户端IP白名单限制(安全建议)+支持json格式/Prometheus client IP address
trustlist restriction (security suggestion) + JSON format supported</a>
**文件路径:**
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
**修改说明:**
Prometheus 访问的IP限制(安全改进,虽说都是metrics日志,从安全角度,建议非合法客户端访问失败)
IP address restriction for Prometheus access (Security improvement. Although
metric logs are used, it is recommended that unauthorized clients fail to
access the Prometheus.)
Metrics servlet接口支持返回JSON格式,方便除了Prometheus系统外的其他监控、统计系统更好计算
The Metrics servlet interface can return the result in JSON format, which
facilitates calculation by other monitoring and statistics systems except
Prometheus.
示例:
```
// ip limit
if (limitClientIps != null && !limitClientIps.isEmpty()) {
String clientIp = getRemoteAddr(request);
if (StringUtils.isEmpty(clientIp) ||
!limitClientIps.contains(clientIp)) {
log.warn("clientIp {} not in the limitClientIps {}
", clientIp, limitClientIps);
res.setStatus(HttpStatus.FORBIDDEN_403);
res.setContentType("text/plain");
res.getOutputStream().write("illegal access
ip".getBytes());
context.complete();
return;
}
}
//...............
if (!jsonFormat) {
metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
.write("\"} ");
stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
} else {
JSONObject json = new JSONObject();
json.put("plugin_id", "broker");
json.put("ip", LOCAL_IP);
json.put("type", "gauge");
json.put(name, value);
json.put("timestamp", System.currentTimeMillis());
json.put("cluster", cluster);
JSONObject jsonMetrics = new JSONObject();
jsonMetrics.put("namespace", namespace);
jsonMetrics.put("topic", topic);
jsonMetrics.put("subscription", subscription);
jsonMetrics.put("consumer_name", consumerName);
jsonMetrics.put("consumer_id", consumerId);
json.put("metric", jsonMetrics);
stream.write(json.toString());
stream.write('\n');
}
```
## 支持Bookkeeper Client可配置zookeeper session timeout/The ZooKeeper session
timeout can be configured on the Bookkeeper client.</a>
**文件路径:**/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
**修改说明:**
```
+ bkConf.setZkTimeout((int)conf.getZooKeeperSessionTimeoutMillis());
```
## Prometheus统计zookeeper链接数建议加上安全连接数/Prometheus measures the number of
ZooKeeper connections. You are advised to add the number of secure connections
to the number of ZooKeeper connections.</a>
**文件路径:**/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
**修改说明:**
当前zookeeper开启安全模式时,链接数需要加上安全链接数
When the security mode is enabled for the current ZooKeeper, the number of
secure connections must be added to the number of secure connections.
```
Gauge.build().name("zookeeper_server_connections").help("Number of
currently opened connections").create()
.setChild(new Gauge.Child() {
@Override
public double get() {
int connections = -1;
ServerCnxnFactory cnxFactory =
zkServer.getServerCnxnFactory();
if (cnxFactory != null) {
connections +=
cnxFactory.getNumAliveConnections();
}
ServerCnxnFactory secCnxFactory =
zkServer.getSecureServerCnxnFactory();
if (secCnxFactory != null) {
connections +=
secCnxFactory.getNumAliveConnections();
}
return connections;
}
}).register();
```
# 非侵入式修改/Non-intrusive modification</a>
## Zookeeper安全加固/ZooKeeper Security Hardening</a>
**加固说明:**
1. client-server之间开启SASL/DIGEST认证
2. server-server之间开启SASL/DIGEST认证
3. 初始化时加固默认路径
4. 敏感信息加密\(业务自己实现加密解密类)
5、增加Prometheus监听ip限制,类似broker,加ip白名单
**Security Hardening Description:**
1. SASL/DIGEST authentication is enabled between the client and server.
2. SASL/DIGEST authentication is enabled between servers.
3. Harden the default path during initialization.
4. Sensitive information encryption/ (encryption/decryption class
implemented by the service side)
5. Add the Prometheus listening IP address restriction, similar to the
broker, and add the IP address trustlist.
为什么不采用SASL/Kerberos?
zookeeper SASL/kerberos需要引入KDC,增加系统组网复杂度,此外常用的MIT kerberos受美国EAR出口管制,heimdal
kerberos功能测试可用,只是商用案例少
可是SASL/DIGEST-MD5,由于DIGEST签名算法不安全,目前建议开启TLS通道加密,等待zookeeper新版本支持可插拔认证后,再做二次开发(https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
Why not SASL/Kerberos?
KDC is required for ZooKeeper SASL/kerberos to increase the system
networking complexity. In addition, common MIT Kerberos are subject to export
control of the US EAR. The function of the kerberos is available in the test,
but there are few commercial cases.
However, the SASL/DIGEST-MD5 signature algorithm is insecure. Therefore, you
are advised to enable TLS channel encryption. After the new ZooKeeper version
supports pluggable authentication, perform secondary
development.https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
**修改示例:**
1. 环境变量
```
-Djava.security.auth.login.config=xxxx.jaas.conf
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
-Dzookeeper.ssl.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
-Dzookeeper.ssl.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
-Dzookeeper.ssl.protocol=TLSv1.2
-Dzookeeper.ssl.clientAuth=none
-Dzookeeper.ssl.hostnameVerification=false
-Dzookeeper.ssl.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
-Dzookeeper.sslQuorum=true
-Dzookeeper.ssl.quorum.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
-Dzookeeper.ssl.quorum.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
-Dzookeeper.ssl.quorum.protocol=TLSv1.2
-Dzookeeper.ssl.quorum.clientAuth=none
-Dzookeeper.ssl.quorum.hostnameVerification=false
-Dzookeeper.ssl.quorum.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
```
2. zoo.conf
```
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
admin.enableServer=false
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.cnxn.threads.size=20
4lw.commands.whitelist==stat,ruok,mntr,stat
clientPortAddress=127.0.0.1
```
3. jaas.conf
```
Server \{
xxx.DigestLoginModule required
username="admin"
password="xxx"
user\_admin="xxx";
\};
Client \{
xxx.DigestLoginModule required
username="admin"
password="xxx";
\};
QuorumServer \{
xxx.DigestLoginModule required
username="Quorum"
password="xxx"
user\_Quorum="xxx";
\};
QuorumLearner \{
xxx.DigestLoginModule required
username="Quorum"
password="xxx";
\};
```
4. 初始化脚本
```
sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/config
sasl:admin:cdrwa,world:anyone:r
sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/quota
sasl:admin:cdrwa,world:anyone:r
sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper
sasl:admin:cdrwa,world:anyone:r
sh zkCli.sh -server 127.0.0.1:2181 setAcl /
sasl:admin:cdrwa,world:anyone:r
```
## Broker安全加固/Broker Security Hardening</a>
**加固说明:**
1. 新增认证模块:
1.
TCP采用SASL/SCRAM-SHA256(我们将salt、storekey、serverkey、iterations加密后保存到zookeeper )
2. HTTP 管理接口采用OAuth2.0 token模式/或者签名模式(HTTP目前好像不支持一次认证分多次情况,因此可以采用)
**Security Hardening Description:**
1. Added the following authentication module::
1. The TCP uses SASL/SCRAM-SHA256. (The salt, storekey, serverkey, and
iterations are encrypted and saved to the ZooKeeper.)
2. The HTTP management interface uses the OAuth2.0 token mode or
signature mode. (Currently, HTTP does not support multiple authentications.
Therefore, the OAuth2.0 token mode or signature mode can be used.)
为什么要使用SASL/SCRAM?
我们基于以下几点考虑:
1、尽量不要引入额外的组件,导致组网变复杂
2、尽量保证在不开启TLS的时候,尽可能保证认证过程安全
因此我们排除了kerberos、oauth2 token等
Why SASL/SCRAM?
We consider the following points:
1. Do not introduce extra components to complicate the networking.
2. Ensure that the authentication process is secure when TLS is disabled.
Therefore, we exclude kerberos, oauth2 tokens, and so on.
**修改示例:**
**SASL/SCRAM-SHA256示例代码:**
```
class: AuthenticationStateSCRAM implements AuthenticationState
@Override
public AuthData authenticate(AuthData authData) throws
AuthenticationException {
String clientAddress;
if (!authenticationDataSource.hasDataFromPeer()) {
throw new AuthenticationException("pulsar server scrame auth
does not have a client address ");
}
SocketAddress socketAddress =
authenticationDataSource.getPeerAddress();
clientAddress = socketAddress.toString();
if (ScramStage.INIT.equals(stage)) {
//....
return
AuthData.of(serverFirstMessage.getBytes(StandardCharsets.UTF_8));
} else if (ScramStage.FIRST.equals(stage)) {
//....
return
AuthData.of(serverFinalMessage.getBytes(StandardCharsets.UTF_8));
} else if (ScramStage.FINAL.equals(stage)) {
stage = ScramStage.FINISH;
LOG.info("pulsar server scrame auth finish with success");
return null;
} else {
LOG.error("pulsar server scrame auth error with invalid stage");
throw new AuthenticationException(
"Authentication datasource stage is invalid: " + stage + ",
from client: " + credential);
}
}
```
**HTTP Admin Request示例代码:**
```
class :AuthenticationProviderSCRAM implements AuthenticationProvider
@Override
@Override
public String authenticate(AuthenticationDataSource authDataSource)
throws AuthenticationException {
if (!authDataSource.hasDataFromHttp()) {
throw new AuthenticationException("Authentication failed: not a
HTTP request");
}
String clientAddress = authDataSource.getPeerAddress().toString();
String httpHeaderValue =
authDataSource.getHttpHeader(HTTP_REQ_HEADER_NAME);
if ((httpHeaderValue == null) ||
!httpHeaderValue.startsWith(HTTP_REQ_HEADER_VALUE_PREFIX)) {
throw new AuthenticationException(
"Authentication datasource have invalid HTTP Authorization
header: " + clientAddress);
}
String token =
httpHeaderValue.substring(HTTP_REQ_HEADER_VALUE_PREFIX.length());
if (token.isEmpty()) {
throw new AuthenticationException(
"Authentication datasource does not have a client message: "
+ clientAddress);
}
return validateToken(token);
}
```
## Bookkeeper安全加固</a>
**加固说明:**
1. 新增认证模块:SASL/SCRAM-SHA256 / New authentication module: SASL/SCRAM-SHA256
2、metrics 接口设置监听IP、Client IP白名单、支持JSON风格 /Set the listening IP address and
client IP address trustlist for the metrics interface, and support the JSON
style.
**修改示例:**
**SASL/SCRAM-SHA256示例代码:**
```
class SASLBookieAuthProvider implements BookieAuthProvider
@Override
public void process(AuthToken authData,
AuthCallbacks.GenericCallback<AuthToken> cb) {
try {
// 处理客户端首次认证请求
if (ScramStage.INIT.equals(stage)) {
//....
cb.operationComplete(BKException.Code.OK,
AuthToken.wrap(serverFirstMessage.getBytes(StandardCharsets.UTF_8)));
}
// 校验客户端签名,并返回服务端签名
else if (ScramStage.FIRST.equals(stage)) {
//....
cb.operationComplete(BKException.Code.OK,
AuthToken.wrap(serverFinalMessage.getBytes(StandardCharsets.UTF_8)));
}
// 处理成功
else if (ScramStage.FINAL.equals(stage)) {
stage = ScramStage.FINISH;
LOG.info("ScramStage.FINAL:bookie server sasl scrame handle
{}", credential);
completeCb.operationComplete(BKException.Code.OK, null);
} else {
LOG.info("ScramStage.UNKNOW:bookie server sasl scrame handle
failed {}", credential);
cb.operationComplete(BKException.Code.UnauthorizedAccessException,
AuthToken.wrap("error=unknowStage".getBytes(StandardCharsets.UTF_8)));
completeCb.operationComplete(BKException.Code.UnauthorizedAccessException,
null);
}
} catch (Exception err) {
LOG.error("bookie server sasl scrame handle failed {}", err);
cb.operationComplete(BKException.Code.UnauthorizedAccessException,
AuthToken.wrap(("error=" +
err.getMessage()).getBytes(StandardCharsets.UTF_8)));
completeCb.operationComplete(BKException.Code.UnauthorizedAccessException,
null);
}
}
```
**Metrics示例代码:**
```
@Override
public void start(Configuration conf) {
boolean httpEnabled = conf.getBoolean(PROMETHEUS_STATS_HTTP_ENABLE,
DEFAULT_PROMETHEUS_STATS_HTTP_ENABLE);
boolean bkHttpServerEnabled = conf.getBoolean("httpServerEnabled",
false);
// only start its own http server when prometheus http is enabled
and bk http server is not enabled.
if (httpEnabled && !bkHttpServerEnabled) {
int httpPort = conf.getInt(PROMETHEUS_STATS_HTTP_PORT,
DEFAULT_PROMETHEUS_STATS_HTTP_PORT);
String httpHost = conf.getString(PROMETHEUS_STATS_HTTP_HOST,
"0.0.0.0");
InetSocketAddress httpEndpoint =
InetSocketAddress.createUnresolved(httpHost, httpPort);
this.server = new Server(httpEndpoint);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
context.addServlet(new ServletHolder(new PrometheusServlet(this,
false)), "/metrics");
context.addServlet(new ServletHolder(new PrometheusServlet(this,
true)), "/metricsjson");
try {
server.start();
LOG.info("Started Prometheus stats endpoint at {}",
httpEndpoint);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
```
## 其他一些工具类:/Other tool classes:</a>
SCRAM签名、验签算法、敏感信息加密解密AES-GCM算法、curator watch zookeeper
sasl用户信息(salt、storekey、serverkey、iterations)工具
SCRAM signature, signature verification algorithm, sensitive information
encryption and decryption AES-GCM algorithm, curator watch ZooKeeper sasl user
information (salt, storekey, serverkey, and iterations) tool
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]