rsamo opened a new issue #4011: Pulsar SQL - Add Support for TLS Auth
URL: https://github.com/apache/pulsar/issues/4011
 
 
   **Features that would be awesome to have:**
   
   1.  Allow TLS Auth on Pulsar SQL so that it works in a secure cluster 
environment
   2. Allow the certificates used to be from a specific client(s) so that the 
user of Presto only has select to their data and not all namespaces, etc.
   
   **Attempt to Test**
   When using a Pulsar cluster that is currently secured via TLS certs, Pulsar 
SQL (Presto) doesn't seem to support this setup as of 2.4.0-SNAPSHOT. If you 
edit the org.apache.pulsar.sql.presto.PulsarConnectorConfig class 
implementation to add tlsCertFile, tlsKeyFile, and tlsTrustCertsFilePath 
capabilities, and then pass in the admin certificates, it seems to work at 
first but then replies with:
   ```sh
   presto> show tables in pulsar."public/default";
   Query 20190409_161104_00000_v2y8t failed: Failed to get tables/topics in 
public/default: HTTP 500 Internal Server Error
   ```
   Checking the logs of the SQL worker shows no issue. If you attempt to run a 
query against a topic you receive:
   ```sh
   presto> select * from pulsar."public/default".test_topic;
   Query 20190409_161558_00001_v2y8t failed: 
org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration.setStickyReadsEnabled(Z)Lorg/apache/pulsar/shade/org/apache/bookkeeper/conf/ClientConfiguration;
   ```
   And the SQL Worker shows:
   ```sh
   2019-04-09T12:15:58.776-0400    INFO    query-scheduler-3       stderr  
Exception in thread "query-scheduler-3"
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr  
java.lang.NoSuchMethodError: 
org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration.setStickyReadsEnabled(Z)Lorg/apache/pulsar/shade/org/apache/bookkeeper/conf/ClientConfiguration;
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
org.apache.pulsar.sql.presto.PulsarSplitManager.getManagedLedgerFactory(PulsarSplitManager.java:132)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
org.apache.pulsar.sql.presto.PulsarSplitManager.getSplitsPartitionedTopic(PulsarSplitManager.java:154)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
org.apache.pulsar.sql.presto.PulsarSplitManager.getSplits(PulsarSplitManager.java:115)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at com.facebook.presto.split.SplitManager.getSplits(SplitManager.java:64)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:146)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:122)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.plan.TableScanNode.accept(TableScanNode.java:136)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.DistributedExecutionPlanner.doPlan(DistributedExecutionPlanner.java:108)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.DistributedExecutionPlanner.doPlan(DistributedExecutionPlanner.java:113)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.sql.planner.DistributedExecutionPlanner.plan(DistributedExecutionPlanner.java:85)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.execution.SqlQueryExecution.planDistribution(SqlQueryExecution.java:385)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
com.facebook.presto.execution.SqlQueryExecution.start(SqlQueryExecution.java:287)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2019-04-09T12:15:58.778-0400    INFO    query-scheduler-3       stderr       
   at java.lang.Thread.run(Thread.java:748)
   2019-04-09T12:15:58.780-0400    INFO    query-execution-3       
com.facebook.presto.event.query.QueryMonitor    TIMELINE: Query 
20190409_161558_00001_v2y8t :: 
Transaction:[430085ea-a023-486f-812d-486a9f233c37] :: elapsed 485ms :: planning 
149ms :: scheduling 336ms :: running 0ms :: finishing 336ms :: begin 
2019-04-09T12:15:58.290-04:00 :: end 2019-04-09T12:15:58.775-04:00
   ```
   **The code**
   - I attempted to edit the org.apache.pulsar.sql.presto.PulsarConnectorConfig 
class to enable TLS and force this behavior. 
   - Used 2.4.0-SNAPSHOT source code. 
   - Deployed this code to 
/opt/pulsar/apache-pulsar-2.2.0/lib/presto/plugin/pulsar-presto-connector
   - Added TLS Auth params to config file is under 
/opt/pulsar/apache-pulsar-2.2.0/conf/presto/catalog/pulsar.properties
   
   **pulsar.properties**
   ```sh
   #
   # Licensed to the Apache Software Foundation (ASF) under one
   # or more contributor license agreements.  See the NOTICE file
   # distributed with this work for additional information
   # regarding copyright ownership.  The ASF licenses this file
   # to you under the Apache License, Version 2.0 (the
   # "License"); you may not use this file except in compliance
   # with the License.  You may obtain a copy of the License at
   #
   #   http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   # software distributed under the License is distributed on an
   # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   # KIND, either express or implied.  See the License for the
   # specific language governing permissions and limitations
   # under the License.
   #
   
   # name of the connector to be displayed in the catalog
   connector.name=pulsar
   # the url of Pulsar broker service
   # pulsar.broker-service-url=http://localhost:8080
   pulsar.broker-service-url=https://localhost:8443
   # URI of Zookeeper cluster
   pulsar.zookeeper-uri=zkhost:2181
   # minimum number of entries to read at a single time
   pulsar.max-entry-read-batch-size=100
   # default number of splits to use per query
   pulsar.target-num-splits=2
   # max message queue size
   pulsar.max-split-message-queue-size=10000
   # max entry queue size
   pulsar.max-split-entry-queue-size = 1000
   # Path for the trusted TLS certificate file
   pulsar.tlsCertFile = /opt/pulsar/certificates/admin.cert.pem
   # Path for the trusted TLS admin client cert file
   pulsar.tlsKeyFile = /opt/pulsar/certificates/admin.key-pk8.pem
   # Path for the trusted TLS admin client key file
   pulsar.tlsTrustCertsFilePath = /opt/pulsar/certificates/ca2.cert.pem
   ```
   **org.apache.pulsar.sql.presto.PulsarConnectorConfig**
   ```java
   /**
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *   http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   package org.apache.pulsar.sql.presto;
   
   import com.fasterxml.jackson.databind.ObjectMapper;
   import io.airlift.configuration.Config;
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.api.Authentication;
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.impl.auth.AuthenticationTls;
   import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
   
   import javax.validation.constraints.NotNull;
   import java.io.IOException;
   import java.util.HashMap;
   import java.util.Map;
   
   public class PulsarConnectorConfig implements AutoCloseable {
   
       private String tlsCertFile = "";
       private String tlsKeyFile = "";
       private String tlsTrustCertsFilePath = "";
       private String brokerServiceUrl = "http://localhost:8080";;
       private String zookeeperUri = "localhost:2181";
       private int entryReadBatchSize = 100;
       private int targetNumSplits = 2;
       private int maxSplitMessageQueueSize = 10000;
       private int maxSplitEntryQueueSize = 1000;
       private String statsProvider = NullStatsProvider.class.getName();
       private Map<String, String> statsProviderConfigs = new HashMap<>();
       private PulsarAdmin pulsarAdmin;
   
       @NotNull
       public String getBrokerServiceUrl() {
           return brokerServiceUrl;
       }
   
       @Config("pulsar.broker-service-url")
       public PulsarConnectorConfig setBrokerServiceUrl(String 
brokerServiceUrl) {
           this.brokerServiceUrl = brokerServiceUrl;
           return this;
       }
   
       @NotNull
       public String getTlsCertFile() {
           return tlsCertFile;
       }
   
       @Config("pulsar.tlsCertFile")
       public PulsarConnectorConfig setTlsCertFile(String tlsCertFile) {
           this.tlsCertFile = tlsCertFile;
           return this;
       }
   
       @NotNull
       public String getTlsKeyFile() {
           return tlsKeyFile;
       }
   
       @Config("pulsar.tlsKeyFile")
       public PulsarConnectorConfig setTlsKeyFile(String tlsKeyFile) {
           this.tlsKeyFile = tlsKeyFile;
           return this;
       }
   
       @NotNull
       public String getTlsTrustCertsFilePath() {
           return tlsTrustCertsFilePath;
       }
   
       @Config("pulsar.tlsTrustCertsFilePath")
       public PulsarConnectorConfig setTlsTrustCertsFilePath(String 
tlsTrustCertsFilePath) {
           this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
   
           return this;
       }
   
       @NotNull
       public String getZookeeperUri() {
           return this.zookeeperUri;
       }
   
       @Config("pulsar.zookeeper-uri")
       public PulsarConnectorConfig setZookeeperUri(String zookeeperUri) {
           this.zookeeperUri = zookeeperUri;
           return this;
       }
   
       @NotNull
       public int getMaxEntryReadBatchSize() {
           return this.entryReadBatchSize;
       }
   
       @Config("pulsar.max-entry-read-batch-size")
       public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) {
           this.entryReadBatchSize = batchSize;
           return this;
       }
   
       @NotNull
       public int getTargetNumSplits() {
           return this.targetNumSplits;
       }
   
       @Config("pulsar.target-num-splits")
       public PulsarConnectorConfig setTargetNumSplits(int targetNumSplits) {
           this.targetNumSplits = targetNumSplits;
           return this;
       }
   
       @NotNull
       public int getMaxSplitMessageQueueSize() {
           return this.maxSplitMessageQueueSize;
       }
   
       @Config("pulsar.max-split-message-queue-size")
       public PulsarConnectorConfig setMaxSplitMessageQueueSize(int 
maxSplitMessageQueueSize) {
           this.maxSplitMessageQueueSize = maxSplitMessageQueueSize;
           return this;
       }
   
       @NotNull
       public int getMaxSplitEntryQueueSize() {
           return this.maxSplitEntryQueueSize;
       }
   
       @Config("pulsar.max-split-entry-queue-size")
       public PulsarConnectorConfig setMaxSplitEntryQueueSize(int 
maxSplitEntryQueueSize) {
           this.maxSplitEntryQueueSize = maxSplitEntryQueueSize;
           return this;
       }
   
       @NotNull
       public String getStatsProvider() {
           return statsProvider;
       }
   
       @Config("pulsar.stats-provider")
       public PulsarConnectorConfig setStatsProvider(String statsProvider) {
           this.statsProvider = statsProvider;
           return this;
       }
   
       @NotNull
       public Map<String, String> getStatsProviderConfigs() {
           return statsProviderConfigs;
       }
   
       @Config("pulsar.stats-provider-configs")
       public PulsarConnectorConfig setStatsProviderConfigs(String 
statsProviderConfigs) throws IOException {
           this.statsProviderConfigs = new 
ObjectMapper().readValue(statsProviderConfigs, Map.class);
           return this;
       }
   
       @NotNull
       public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
           if (this.pulsarAdmin == null
                   && (this.tlsCertFile == null
                   || this.tlsKeyFile == null
                   || this.tlsTrustCertsFilePath == null)) {
               this.pulsarAdmin = 
PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
   
           } else if(this.pulsarAdmin == null
                   && (this.tlsCertFile != null
                   && this.tlsKeyFile != null
                   && this.tlsTrustCertsFilePath != null)) { // TLS enabled
   
               Map<String, String> authParams = new HashMap<>();
               authParams.put("tlsCertFile",tlsCertFile);
               authParams.put("tlsKeyFile",tlsKeyFile);
   
               Authentication tlsAuth = 
AuthenticationFactory.create(AuthenticationTls.class.getName(), authParams);
               this.pulsarAdmin = PulsarAdmin.builder()
                       .serviceHttpUrl(getBrokerServiceUrl())
                       .tlsTrustCertsFilePath(getTlsTrustCertsFilePath())
                       .authentication(tlsAuth)
                       .build();
           }
   
           return this.pulsarAdmin;
       }
   
       @Override
       public void close() throws Exception {
           this.pulsarAdmin.close();
       }
   
       @Override
       public String toString() {
           return "PulsarConnectorConfig{" +
                   "brokerServiceUrl='" + brokerServiceUrl + '\'' +
                   '}';
       }
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to