xiaotongwang1 opened a new issue #9367:
URL: https://github.com/apache/pulsar/issues/9367


   我最近针对pulsar开展项目预研,主要从安全加固方面做了一些分析并做了一些整改以满足一些基本安全要求。
   Recently, I conducted a project pre-research for Pulsar, and analyzed 
security hardening and made some improvements to meet some basic security 
requirements.
   
   当前已经在项目组内部完成基本测试,为了方便我们后续的版本能够跟随开源社区版本演进,我们期望社区commiter or 
PMC帮忙审核下,如果社区能够接纳我们修改的功能点,我将向公司提出申请,提交下面功能源码到开源社区
   Basic tests have been completed in the project team. To facilitate future 
versions to evolve with the open source community version, we expect the 
community committer or PMC to review the changes. If the community accepts the 
modified functions, I will submit an application to company and submit the 
following function source code to the open source community.
   
   # 侵入式修改/Intrusive modification</a>
   
   
   ## 打包过程显示指定配置文件unix模式/Displaying the Unix Mode of a Specified Configuration 
File During Packaging</a>
   
   **文件路径:**/distribution/server/src/assemble/bin.xml
   
   **修改说明:**
   
   <lineEnding\>unix</lineEnding\>
   
   指定unix,避免windows打包后部署linux上运行失败
   Specify UNIX to prevent running failures on Linux after Windows is packed.
   
   ## 所有zookeeper访问操作java文件(安全相关)/Java files for all ZooKeeper access 
operations (security-related)</a>
   
   **文件路径:** 太多了,不一一列举,可以搜索Ids.OPEN\_ACL\_UNSAFE /You can search for 
Ids.OPEN\_ACL\_UNSAFE.
   
   **修改说明:**
   
   示例:
   
   原始: zkc.create\(path + "/ledgers", new byte\[0\], Ids.OPEN\_ACL\_UNSAFE, 
CreateMode.PERSISTENT\);
   
   修改后:zkc.create\(path +"/ledgers/available", new byte\[0\], 
Ids.CREATOR\_ALL\_ACL, CreateMode.PERSISTENT\);
   
   Todo:平滑升级,需要在额外加个System properties开关
   Todo:  Smooth upgrade requires additional system properties.
   
   ## 自动获取本地IP作为advertisedAddress/Automatically obtain the local IP address as 
the advertisedAddress.</a>
   
   
**文件路径:**pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
   
   **修改说明:**
   
   由于没有内网DNS解析器,设置advertisedAddress为IP
   Because there is no intranet DNS resolver, set advertisedAddress to the IP 
address.
   
   ```
         public String getAdvertisedAddress() {
             try {
               if ("ip".equals(advertisedAddress)) {
                   return getLocalIp();
               }
               return advertisedAddress;
             } catch (Exception e) {
               return advertisedAddress;
             }
         }
   
   ```
   ## Prometheus客户端IP白名单限制(安全建议)+支持json格式/Prometheus client IP address 
trustlist restriction (security suggestion) + JSON format supported</a>
   
   **文件路径:**
   
   
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
   
   
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
   
   **修改说明:**
   
   Prometheus 访问的IP限制(安全改进,虽说都是metrics日志,从安全角度,建议非合法客户端访问失败)
   IP address restriction for Prometheus access (Security improvement. Although 
metric logs are used, it is recommended that unauthorized clients fail to 
access the Prometheus.)
   
   Metrics servlet接口支持返回JSON格式,方便除了Prometheus系统外的其他监控、统计系统更好计算
   The Metrics servlet interface can return the result in JSON format, which 
facilitates calculation by other monitoring and statistics systems except 
Prometheus.
   
   示例:
   ```
                   // ip limit
                   if (limitClientIps != null && !limitClientIps.isEmpty()) {
                       String clientIp = getRemoteAddr(request);
                       if (StringUtils.isEmpty(clientIp) || 
!limitClientIps.contains(clientIp)) {
                           log.warn("clientIp {} not in the limitClientIps {} 
", clientIp, limitClientIps);
   
                           res.setStatus(HttpStatus.FORBIDDEN_403);
                           res.setContentType("text/plain");
                           res.getOutputStream().write("illegal access 
ip".getBytes());
                           context.complete();
                           return;
                       }
                   }
   
   //...............
   
           if (!jsonFormat) {
   
               metricType(stream, name);
               
stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
                       
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                       
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
                       .write("\"} ");
               stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
   
           } else {
               JSONObject json = new JSONObject();
               json.put("plugin_id", "broker");
               json.put("ip", LOCAL_IP);
               json.put("type", "gauge");
               json.put(name, value);
               json.put("timestamp", System.currentTimeMillis());
               json.put("cluster", cluster);
   
               JSONObject jsonMetrics = new JSONObject();
               jsonMetrics.put("namespace", namespace);
               jsonMetrics.put("topic", topic);
               jsonMetrics.put("subscription", subscription);
               jsonMetrics.put("consumer_name", consumerName);
               jsonMetrics.put("consumer_id", consumerId);
               json.put("metric", jsonMetrics);
               stream.write(json.toString());
               stream.write('\n');
           }
   ```
   ## 支持Bookkeeper Client可配置zookeeper session timeout/The ZooKeeper session 
timeout can be configured on the Bookkeeper client.</a>
   
   
**文件路径:**/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
   
   **修改说明:**
   ```
   +          bkConf.setZkTimeout((int)conf.getZooKeeperSessionTimeoutMillis());
   ```
   
   ## Prometheus统计zookeeper链接数建议加上安全连接数/Prometheus measures the number of 
ZooKeeper connections. You are advised to add the number of secure connections 
to the number of ZooKeeper connections.</a>
   
   
**文件路径:**/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
   
   **修改说明:**
   
   当前zookeeper开启安全模式时,链接数需要加上安全链接数
   When the security mode is enabled for the current ZooKeeper, the number of 
secure connections must be added to the number of secure connections.
   
   ```
           Gauge.build().name("zookeeper_server_connections").help("Number of 
currently opened connections").create()
                   .setChild(new Gauge.Child() {
                       @Override
                       public double get() {
                           int connections = -1;
       
                           ServerCnxnFactory cnxFactory = 
zkServer.getServerCnxnFactory();
                           if (cnxFactory != null) {
                               connections += 
cnxFactory.getNumAliveConnections();
                           }
       
                           ServerCnxnFactory secCnxFactory = 
zkServer.getSecureServerCnxnFactory();
                           if (secCnxFactory != null) {
                               connections += 
secCnxFactory.getNumAliveConnections();
                           }
                           return connections;
                       }
                   }).register();
   
   ```
   # 非侵入式修改/Non-intrusive modification</a>
   
   ## Zookeeper安全加固/ZooKeeper Security Hardening</a>
   
   **加固说明:**
   
   1.  client-server之间开启SASL/DIGEST认证
   2.  server-server之间开启SASL/DIGEST认证
   3.  初始化时加固默认路径
   4.  敏感信息加密\(业务自己实现加密解密类)
   5、增加Prometheus监听ip限制,类似broker,加ip白名单
   
   **Security Hardening Description:**
   1. SASL/DIGEST authentication is enabled between the client and server.
   2. SASL/DIGEST authentication is enabled between servers.
   3. Harden the default path during initialization.
   4. Sensitive information encryption/ (encryption/decryption class 
implemented by the service side)
   5. Add the Prometheus listening IP address restriction, similar to the 
broker, and add the IP address trustlist.
   
   为什么不采用SASL/Kerberos?
   
   zookeeper SASL/kerberos需要引入KDC,增加系统组网复杂度,此外常用的MIT kerberos受美国EAR出口管制,heimdal 
kerberos功能测试可用,只是商用案例少
   
   
可是SASL/DIGEST-MD5,由于DIGEST签名算法不安全,目前建议开启TLS通道加密,等待zookeeper新版本支持可插拔认证后,再做二次开发(https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
   
   
   Why not SASL/Kerberos?
   
   KDC is required for ZooKeeper SASL/kerberos to increase the system 
networking complexity. In addition, common MIT Kerberos are subject to export 
control of the US EAR. The function of the kerberos is available in the test, 
but there are few commercial cases.
   
   However, the SASL/DIGEST-MD5 signature algorithm is insecure. Therefore, you 
are advised to enable TLS channel encryption. After the new ZooKeeper version 
supports pluggable authentication, perform secondary 
development.https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
   
   
   **修改示例:**
   
   1.  环境变量
   ```
       -Djava.security.auth.login.config=xxxx.jaas.conf
   
       
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
   
       
-Dzookeeper.ssl.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
   
       
-Dzookeeper.ssl.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
   
       -Dzookeeper.ssl.protocol=TLSv1.2
   
       -Dzookeeper.ssl.clientAuth=none
   
       -Dzookeeper.ssl.hostnameVerification=false 
-Dzookeeper.ssl.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
   
       -Dzookeeper.sslQuorum=true
   
       
-Dzookeeper.ssl.quorum.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
   
       
-Dzookeeper.ssl.quorum.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
   
       -Dzookeeper.ssl.quorum.protocol=TLSv1.2
   
       -Dzookeeper.ssl.quorum.clientAuth=none
   
       -Dzookeeper.ssl.quorum.hostnameVerification=false
   
       
-Dzookeeper.ssl.quorum.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
   ```
   2.  zoo.conf
   ```
       
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
   
       requireClientAuthScheme=sasl
   
       jaasLoginRenew=3600000
   
       admin.enableServer=false
   
       quorum.auth.enableSasl=true
   
       quorum.auth.learnerRequireSasl=true
   
       quorum.auth.serverRequireSasl=true
   
       quorum.auth.learner.loginContext=QuorumLearner
   
       quorum.auth.server.loginContext=QuorumServer
   
       quorum.cnxn.threads.size=20
   
       4lw.commands.whitelist==stat,ruok,mntr,stat
   
       clientPortAddress=127.0.0.1
   ```
   3.  jaas.conf
   ```
       Server \{
   
       xxx.DigestLoginModule required
   
       username="admin"
   
       password="xxx"
   
       user\_admin="xxx";
   
       \};
   
       Client \{
   
       xxx.DigestLoginModule required
   
       username="admin"
   
       password="xxx";
   
       \};
   
       QuorumServer \{
   
       xxx.DigestLoginModule required
   
       username="Quorum"
   
       password="xxx"
   
       user\_Quorum="xxx";
   
       \};
   
       QuorumLearner \{
   
       xxx.DigestLoginModule required
   
       username="Quorum"
   
       password="xxx";
   
       \};
   ```
   4.  初始化脚本
   ```
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/config 
sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/quota 
sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper 
sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl / 
sasl:admin:cdrwa,world:anyone:r
   ```
   
   ## Broker安全加固/Broker Security Hardening</a>
   
   **加固说明:**
   
   1.  新增认证模块:
       1.  
TCP采用SASL/SCRAM-SHA256(我们将salt、storekey、serverkey、iterations加密后保存到zookeeper )
       2.  HTTP 管理接口采用OAuth2.0 token模式/或者签名模式(HTTP目前好像不支持一次认证分多次情况,因此可以采用)
    
   **Security Hardening Description:**
   1.  Added the following authentication module::
      1. The TCP uses SASL/SCRAM-SHA256. (The salt, storekey, serverkey, and 
iterations are encrypted and saved to the ZooKeeper.)
      2. The HTTP management interface uses the OAuth2.0 token mode or 
signature mode. (Currently, HTTP does not support multiple authentications. 
Therefore, the OAuth2.0 token mode or signature mode can be used.)
   
   为什么要使用SASL/SCRAM?
   
   我们基于以下几点考虑:
   
   1、尽量不要引入额外的组件,导致组网变复杂
   
   2、尽量保证在不开启TLS的时候,尽可能保证认证过程安全
   
   因此我们排除了kerberos、oauth2 token等
   
   Why SASL/SCRAM?
   
   We consider the following points:
   
   1. Do not introduce extra components to complicate the networking.
   
   2. Ensure that the authentication process is secure when TLS is disabled.
   
   Therefore, we exclude kerberos, oauth2 tokens, and so on.
   
   
   **修改示例:**
   
   **SASL/SCRAM-SHA256示例代码:**
   ```
   class: AuthenticationStateSCRAM implements AuthenticationState
        @Override
       public AuthData authenticate(AuthData authData) throws 
AuthenticationException {
   
           String clientAddress;
   
           if (!authenticationDataSource.hasDataFromPeer()) {
               throw new AuthenticationException("pulsar server scrame auth 
does not have a client address ");
           }
   
           SocketAddress socketAddress = 
authenticationDataSource.getPeerAddress();
           clientAddress = socketAddress.toString();
   
           if (ScramStage.INIT.equals(stage)) {
                        //....
               return 
AuthData.of(serverFirstMessage.getBytes(StandardCharsets.UTF_8));
   
           } else if (ScramStage.FIRST.equals(stage)) {
    
                        //....
               return 
AuthData.of(serverFinalMessage.getBytes(StandardCharsets.UTF_8));
    
   
           } else if (ScramStage.FINAL.equals(stage)) {
               stage = ScramStage.FINISH;
               LOG.info("pulsar server scrame auth finish with success");
               return null;
           } else {
               LOG.error("pulsar server scrame auth error with invalid stage");
               throw new AuthenticationException(
                   "Authentication datasource stage is invalid: " + stage + ", 
from client: " + credential);
           }
       }
   ```
   
   
   **HTTP Admin Request示例代码:**
   
   
   ```
   class :AuthenticationProviderSCRAM implements AuthenticationProvider
   
   @Override
       @Override
       public String authenticate(AuthenticationDataSource authDataSource) 
throws AuthenticationException {
   
           if (!authDataSource.hasDataFromHttp()) {
               throw new AuthenticationException("Authentication failed: not a 
HTTP request");
           }
   
           String clientAddress = authDataSource.getPeerAddress().toString();
   
           String httpHeaderValue = 
authDataSource.getHttpHeader(HTTP_REQ_HEADER_NAME);
           if ((httpHeaderValue == null) || 
!httpHeaderValue.startsWith(HTTP_REQ_HEADER_VALUE_PREFIX)) {
               throw new AuthenticationException(
                   "Authentication datasource have invalid HTTP Authorization 
header: " + clientAddress);
           }
           String token = 
httpHeaderValue.substring(HTTP_REQ_HEADER_VALUE_PREFIX.length());
           if (token.isEmpty()) {
               throw new AuthenticationException(
                   "Authentication datasource does not have a client message: " 
+ clientAddress);
           }
   
           return validateToken(token);
       }
        
   ```
   
   ## Bookkeeper安全加固</a>
   
   **加固说明:**
   
   1.  新增认证模块:SASL/SCRAM-SHA256  / New authentication module: SASL/SCRAM-SHA256
   
   2、metrics 接口设置监听IP、Client IP白名单、支持JSON风格 /Set the listening IP address and 
client IP address trustlist for the metrics interface, and support the JSON 
style.
   
   
   
   **修改示例:**
   
   **SASL/SCRAM-SHA256示例代码:**
   
   ```
   class SASLBookieAuthProvider implements BookieAuthProvider
   
       @Override
       public void process(AuthToken authData, 
AuthCallbacks.GenericCallback<AuthToken> cb) {
           try {
               // 处理客户端首次认证请求
               if (ScramStage.INIT.equals(stage)) {
                                //....
   
                   cb.operationComplete(BKException.Code.OK,
                       
AuthToken.wrap(serverFirstMessage.getBytes(StandardCharsets.UTF_8)));
   
               }
               // 校验客户端签名,并返回服务端签名
               else if (ScramStage.FIRST.equals(stage)) {
   
                                //....
   
                   cb.operationComplete(BKException.Code.OK,
                       
AuthToken.wrap(serverFinalMessage.getBytes(StandardCharsets.UTF_8)));
   
               }
               // 处理成功
               else if (ScramStage.FINAL.equals(stage)) {
                   stage = ScramStage.FINISH;
   
                   LOG.info("ScramStage.FINAL:bookie server sasl scrame handle 
{}", credential);
                   completeCb.operationComplete(BKException.Code.OK, null);
               } else {
                   LOG.info("ScramStage.UNKNOW:bookie server sasl scrame handle 
failed {}", credential);
                   
cb.operationComplete(BKException.Code.UnauthorizedAccessException,
                       
AuthToken.wrap("error=unknowStage".getBytes(StandardCharsets.UTF_8)));
                   
completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, 
null);
               }
   
           } catch (Exception err) {
               LOG.error("bookie server sasl scrame handle failed {}", err);
               
cb.operationComplete(BKException.Code.UnauthorizedAccessException,
                   AuthToken.wrap(("error=" + 
err.getMessage()).getBytes(StandardCharsets.UTF_8)));
               
completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, 
null);
           }
       }
        
   ```
   
   **Metrics示例代码:**
   ```
       @Override
       public void start(Configuration conf) {
           boolean httpEnabled = conf.getBoolean(PROMETHEUS_STATS_HTTP_ENABLE, 
DEFAULT_PROMETHEUS_STATS_HTTP_ENABLE);
           boolean bkHttpServerEnabled = conf.getBoolean("httpServerEnabled", 
false);
           // only start its own http server when prometheus http is enabled 
and bk http server is not enabled.
           if (httpEnabled && !bkHttpServerEnabled) {
               int httpPort = conf.getInt(PROMETHEUS_STATS_HTTP_PORT, 
DEFAULT_PROMETHEUS_STATS_HTTP_PORT);
               String httpHost = conf.getString(PROMETHEUS_STATS_HTTP_HOST, 
"0.0.0.0");
               InetSocketAddress httpEndpoint = 
InetSocketAddress.createUnresolved(httpHost, httpPort);
               this.server = new Server(httpEndpoint);
               ServletContextHandler context = new ServletContextHandler();
               context.setContextPath("/");
               server.setHandler(context);
   
               context.addServlet(new ServletHolder(new PrometheusServlet(this, 
false)), "/metrics");
               context.addServlet(new ServletHolder(new PrometheusServlet(this, 
true)), "/metricsjson");
   
               try {
                   server.start();
                   LOG.info("Started Prometheus stats endpoint at {}", 
httpEndpoint);
               } catch (Exception e) {
                   throw new RuntimeException(e);
               }
           }
   ```
   ## 其他一些工具类:/Other tool classes:</a>
   
   SCRAM签名、验签算法、敏感信息加密解密AES-GCM算法、curator watch zookeeper 
sasl用户信息(salt、storekey、serverkey、iterations)工具
   
   SCRAM signature, signature verification algorithm, sensitive information 
encryption and decryption AES-GCM algorithm, curator watch ZooKeeper sasl user 
information (salt, storekey, serverkey, and iterations) tool
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to