rsamo opened a new issue #4011: Pulsar SQL - Add Support for TLS Auth URL: https://github.com/apache/pulsar/issues/4011 **Features that would be awesome to have:** 1. Allow TLS Auth on Pulsar SQL so that it works in a secure cluster environment 2. Allow the certificates used to be from a specific client(s) so that the user of Presto only has select to their data and not all namespaces, etc. **Attempt to Test** When using a Pulsar cluster that is currently secured via TLS certs, Pulsar SQL (Presto) doesn't seem to support this setup as of 2.4.0-SNAPSHOT. If you edit the org.apache.pulsar.sql.presto.PulsarConnectorConfig class implementation to add tlsCertFile, tlsKeyFile, and tlsTrustCertsFilePath capabilities, and then pass in the admin certificates, it seems to work at first but then replies with: ```sh presto> show tables in pulsar."public/default"; Query 20190409_161104_00000_v2y8t failed: Failed to get tables/topics in public/default: HTTP 500 Internal Server Error ``` Checking the logs of the SQL worker shows no issue. If you attempt to run a query against a topic you receive: ```sh presto> select * from pulsar."public/default".test_topic; Query 20190409_161558_00001_v2y8t failed: org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration.setStickyReadsEnabled(Z)Lorg/apache/pulsar/shade/org/apache/bookkeeper/conf/ClientConfiguration; ``` And the SQL Worker shows: ```sh 2019-04-09T12:15:58.776-0400 INFO query-scheduler-3 stderr Exception in thread "query-scheduler-3" 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr java.lang.NoSuchMethodError: org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration.setStickyReadsEnabled(Z)Lorg/apache/pulsar/shade/org/apache/bookkeeper/conf/ClientConfiguration; 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at org.apache.pulsar.sql.presto.PulsarSplitManager.getManagedLedgerFactory(PulsarSplitManager.java:132) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at org.apache.pulsar.sql.presto.PulsarSplitManager.getSplitsPartitionedTopic(PulsarSplitManager.java:154) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at org.apache.pulsar.sql.presto.PulsarSplitManager.getSplits(PulsarSplitManager.java:115) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.split.SplitManager.getSplits(SplitManager.java:64) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:146) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.DistributedExecutionPlanner$Visitor.visitTableScan(DistributedExecutionPlanner.java:122) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.plan.TableScanNode.accept(TableScanNode.java:136) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.DistributedExecutionPlanner.doPlan(DistributedExecutionPlanner.java:108) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.DistributedExecutionPlanner.doPlan(DistributedExecutionPlanner.java:113) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.sql.planner.DistributedExecutionPlanner.plan(DistributedExecutionPlanner.java:85) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.execution.SqlQueryExecution.planDistribution(SqlQueryExecution.java:385) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at com.facebook.presto.execution.SqlQueryExecution.start(SqlQueryExecution.java:287) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2019-04-09T12:15:58.778-0400 INFO query-scheduler-3 stderr at java.lang.Thread.run(Thread.java:748) 2019-04-09T12:15:58.780-0400 INFO query-execution-3 com.facebook.presto.event.query.QueryMonitor TIMELINE: Query 20190409_161558_00001_v2y8t :: Transaction:[430085ea-a023-486f-812d-486a9f233c37] :: elapsed 485ms :: planning 149ms :: scheduling 336ms :: running 0ms :: finishing 336ms :: begin 2019-04-09T12:15:58.290-04:00 :: end 2019-04-09T12:15:58.775-04:00 ``` **The code** - I attempted to edit the org.apache.pulsar.sql.presto.PulsarConnectorConfig class to enable TLS and force this behavior. - Used 2.4.0-SNAPSHOT source code. - Deployed this code to /opt/pulsar/apache-pulsar-2.2.0/lib/presto/plugin/pulsar-presto-connector - Added TLS Auth params to config file is under /opt/pulsar/apache-pulsar-2.2.0/conf/presto/catalog/pulsar.properties **pulsar.properties** ```sh # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # name of the connector to be displayed in the catalog connector.name=pulsar # the url of Pulsar broker service # pulsar.broker-service-url=http://localhost:8080 pulsar.broker-service-url=https://localhost:8443 # URI of Zookeeper cluster pulsar.zookeeper-uri=zkhost:2181 # minimum number of entries to read at a single time pulsar.max-entry-read-batch-size=100 # default number of splits to use per query pulsar.target-num-splits=2 # max message queue size pulsar.max-split-message-queue-size=10000 # max entry queue size pulsar.max-split-entry-queue-size = 1000 # Path for the trusted TLS certificate file pulsar.tlsCertFile = /opt/pulsar/certificates/admin.cert.pem # Path for the trusted TLS admin client cert file pulsar.tlsKeyFile = /opt/pulsar/certificates/admin.key-pk8.pem # Path for the trusted TLS admin client key file pulsar.tlsTrustCertsFilePath = /opt/pulsar/certificates/ca2.cert.pem ``` **org.apache.pulsar.sql.presto.PulsarConnectorConfig** ```java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pulsar.sql.presto; import com.fasterxml.jackson.databind.ObjectMapper; import io.airlift.configuration.Config; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider; import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class PulsarConnectorConfig implements AutoCloseable { private String tlsCertFile = ""; private String tlsKeyFile = ""; private String tlsTrustCertsFilePath = ""; private String brokerServiceUrl = "http://localhost:8080"; private String zookeeperUri = "localhost:2181"; private int entryReadBatchSize = 100; private int targetNumSplits = 2; private int maxSplitMessageQueueSize = 10000; private int maxSplitEntryQueueSize = 1000; private String statsProvider = NullStatsProvider.class.getName(); private Map<String, String> statsProviderConfigs = new HashMap<>(); private PulsarAdmin pulsarAdmin; @NotNull public String getBrokerServiceUrl() { return brokerServiceUrl; } @Config("pulsar.broker-service-url") public PulsarConnectorConfig setBrokerServiceUrl(String brokerServiceUrl) { this.brokerServiceUrl = brokerServiceUrl; return this; } @NotNull public String getTlsCertFile() { return tlsCertFile; } @Config("pulsar.tlsCertFile") public PulsarConnectorConfig setTlsCertFile(String tlsCertFile) { this.tlsCertFile = tlsCertFile; return this; } @NotNull public String getTlsKeyFile() { return tlsKeyFile; } @Config("pulsar.tlsKeyFile") public PulsarConnectorConfig setTlsKeyFile(String tlsKeyFile) { this.tlsKeyFile = tlsKeyFile; return this; } @NotNull public String getTlsTrustCertsFilePath() { return tlsTrustCertsFilePath; } @Config("pulsar.tlsTrustCertsFilePath") public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { this.tlsTrustCertsFilePath = tlsTrustCertsFilePath; return this; } @NotNull public String getZookeeperUri() { return this.zookeeperUri; } @Config("pulsar.zookeeper-uri") public PulsarConnectorConfig setZookeeperUri(String zookeeperUri) { this.zookeeperUri = zookeeperUri; return this; } @NotNull public int getMaxEntryReadBatchSize() { return this.entryReadBatchSize; } @Config("pulsar.max-entry-read-batch-size") public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) { this.entryReadBatchSize = batchSize; return this; } @NotNull public int getTargetNumSplits() { return this.targetNumSplits; } @Config("pulsar.target-num-splits") public PulsarConnectorConfig setTargetNumSplits(int targetNumSplits) { this.targetNumSplits = targetNumSplits; return this; } @NotNull public int getMaxSplitMessageQueueSize() { return this.maxSplitMessageQueueSize; } @Config("pulsar.max-split-message-queue-size") public PulsarConnectorConfig setMaxSplitMessageQueueSize(int maxSplitMessageQueueSize) { this.maxSplitMessageQueueSize = maxSplitMessageQueueSize; return this; } @NotNull public int getMaxSplitEntryQueueSize() { return this.maxSplitEntryQueueSize; } @Config("pulsar.max-split-entry-queue-size") public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSize) { this.maxSplitEntryQueueSize = maxSplitEntryQueueSize; return this; } @NotNull public String getStatsProvider() { return statsProvider; } @Config("pulsar.stats-provider") public PulsarConnectorConfig setStatsProvider(String statsProvider) { this.statsProvider = statsProvider; return this; } @NotNull public Map<String, String> getStatsProviderConfigs() { return statsProviderConfigs; } @Config("pulsar.stats-provider-configs") public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) throws IOException { this.statsProviderConfigs = new ObjectMapper().readValue(statsProviderConfigs, Map.class); return this; } @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null && (this.tlsCertFile == null || this.tlsKeyFile == null || this.tlsTrustCertsFilePath == null)) { this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build(); } else if(this.pulsarAdmin == null && (this.tlsCertFile != null && this.tlsKeyFile != null && this.tlsTrustCertsFilePath != null)) { // TLS enabled Map<String, String> authParams = new HashMap<>(); authParams.put("tlsCertFile",tlsCertFile); authParams.put("tlsKeyFile",tlsKeyFile); Authentication tlsAuth = AuthenticationFactory.create(AuthenticationTls.class.getName(), authParams); this.pulsarAdmin = PulsarAdmin.builder() .serviceHttpUrl(getBrokerServiceUrl()) .tlsTrustCertsFilePath(getTlsTrustCertsFilePath()) .authentication(tlsAuth) .build(); } return this.pulsarAdmin; } @Override public void close() throws Exception { this.pulsarAdmin.close(); } @Override public String toString() { return "PulsarConnectorConfig{" + "brokerServiceUrl='" + brokerServiceUrl + '\'' + '}'; } } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
