This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66a1b1b Issue 3655: Kerberos authentication for proxy (#3997)
66a1b1b is described below
commit 66a1b1b61a23fecbfbc158556434e04bd8f1c7fc
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Apr 9 22:56:38 2019 +0800
Issue 3655: Kerberos authentication for proxy (#3997)
Fixes #3655
Master Issue: #3491
### Motivation
add support of Kerberos authentication for proxy
### Modifications
add support of Kerberos authentication for proxy ;
add unit test.
### Verifying this change
Ut passed
---
pulsar-broker-auth-sasl/pom.xml | 7 +
.../authentication/AuthenticationProviderSasl.java | 7 +-
.../ProxySaslAuthenticationTest.java | 289 +++++++++++++++++++++
.../authentication/SaslAuthenticateTest.java | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 4 +-
.../client/impl/auth/AuthenticationSasl.java | 8 +-
.../pulsar/client/impl/auth/PulsarSaslClient.java | 7 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 4 +-
.../org/apache/pulsar/common/api/Commands.java | 5 +-
.../apache/pulsar/common/sasl/SaslConstants.java | 13 +-
.../pulsar/proxy/server/DirectProxyHandler.java | 47 +++-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 25 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 26 +-
.../pulsar/proxy/server/ProxyConnection.java | 232 +++++++++++------
14 files changed, 558 insertions(+), 118 deletions(-)
diff --git a/pulsar-broker-auth-sasl/pom.xml b/pulsar-broker-auth-sasl/pom.xml
index cc19661..77aaf1e 100644
--- a/pulsar-broker-auth-sasl/pom.xml
+++ b/pulsar-broker-auth-sasl/pom.xml
@@ -77,6 +77,13 @@
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-proxy</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index d11a0e1..a43cdb0 100644
---
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.authentication;
-import static
org.apache.pulsar.common.sasl.SaslConstants.JAAS_BROKER_SECTION_NAME;
+import static
org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
import static
org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
@@ -53,7 +53,7 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
this.configuration = Maps.newHashMap();
final String allowedIdsPatternRegExp =
config.getSaslJaasClientAllowedIds();
configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
- configuration.put(JAAS_BROKER_SECTION_NAME,
config.getSaslJaasBrokerSectionName());
+ configuration.put(JAAS_SERVER_SECTION_NAME,
config.getSaslJaasServerSectionName());
configuration.put(KINIT_COMMAND, config.getKinitCommand());
try {
@@ -63,7 +63,7 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
throw new IOException(error);
}
- loginContextName = config.getSaslJaasBrokerSectionName();
+ loginContextName = config.getSaslJaasServerSectionName();
if (jaasCredentialsContainer == null) {
log.info("JAAS loginContext is: {}." , loginContextName);
try {
@@ -101,7 +101,6 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
SocketAddress remoteAddress,
SSLSession sslSession) throws
AuthenticationException {
try {
- new PulsarSaslServer(jaasCredentialsContainer.getSubject(),
allowedIdsPattern);
return new SaslAuthenticationState(
new SaslAuthenticationDataSource(
new
PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern)));
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
new file mode 100644
index 0000000..9ab711f
--- /dev/null
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.auth.login.Configuration;
+
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(ProxySaslAuthenticationTest.class);
+ private int webServicePort;
+ private int servicePort;
+
+ public static File kdcDir;
+ public static File kerberosWorkDir;
+
+ private static MiniKdc kdc;
+ private static Properties properties;
+
+ private static String localHostname = "localhost";
+
+ @BeforeClass
+ public static void startMiniKdc() throws Exception {
+ kdcDir = Files.createTempDirectory("test-kdc-dir").toFile();
+ kerberosWorkDir =
Files.createTempDirectory("test-kerberos-work-dir").toFile();
+
+ properties = MiniKdc.createConf();
+ kdc = new MiniKdc(properties, kdcDir);
+ kdc.start();
+
+ String principalBrokerNoRealm = "broker/" + localHostname;
+ String principalBroker = "broker/" + localHostname + "@" +
kdc.getRealm();
+ log.info("principalBroker: " + principalBroker);
+
+ String principalClientNoRealm = "client/" + localHostname;
+ String principalClient = principalClientNoRealm + "@" +
kdc.getRealm();
+ log.info("principalClient: " + principalClient);
+
+ String principalProxyNoRealm = "proxy/" + localHostname;
+ String principalProxy = principalProxyNoRealm + "@" +
kdc.getRealm();
+ log.info("principalProxy: " + principalProxy);
+
+ File keytabClient = new File(kerberosWorkDir,
"pulsarclient.keytab");
+ kdc.createPrincipal(keytabClient, principalClientNoRealm);
+
+ File keytabBroker = new File(kerberosWorkDir,
"pulsarbroker.keytab");
+ kdc.createPrincipal(keytabBroker, principalBrokerNoRealm);
+
+ File keytabProxy = new File(kerberosWorkDir,
"pulsarproxy.keytab");
+ kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
+
+ File jaasFile = new File(kerberosWorkDir, "jaas.properties");
+ try (FileWriter writer = new FileWriter(jaasFile)) {
+ writer.write("\n"
+ + "PulsarBroker {\n"
+ + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" +
keytabBroker.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n" // won't test
useTicketCache=true on JUnit tests
+ + " principal=\"" + principalBroker + "\";\n"
+ + "};\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "PulsarProxy{\n"
+ + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" + keytabProxy.getAbsolutePath()
+ "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n" // won't test
useTicketCache=true on JUnit tests
+ + " principal=\"" + principalProxy + "\";\n"
+ + "};\n"
+ + "\n"
+ + "\n"
+ + "\n"
+ + "PulsarClient {\n"
+ + "
com.sun.security.auth.module.Krb5LoginModule required debug=true\n"
+ + " useKeyTab=true\n"
+ + " keyTab=\"" +
keytabClient.getAbsolutePath() + "\n"
+ + " storeKey=true\n"
+ + " useTicketCache=false\n"
+ + " principal=\"" + principalClient + "\";\n"
+ + "};\n"
+ );
+ }
+
+ File krb5file = new File(kerberosWorkDir, "krb5.properties");
+ try (FileWriter writer = new FileWriter(krb5file)) {
+ String conf = "[libdefaults]\n"
+ + " default_realm = " + kdc.getRealm() + "\n"
+ + " udp_preference_limit = 1\n" // force use TCP
+ + "\n"
+ + "\n"
+ + "[realms]\n"
+ + " " + kdc.getRealm() + " = {\n"
+ + " kdc = " + kdc.getHost() + ":" +
kdc.getPort() + "\n"
+ + " }";
+ writer.write(conf);
+ log.info("krb5.properties:\n" + conf);
+ }
+
+ System.setProperty("java.security.auth.login.config",
jaasFile.getAbsolutePath());
+ System.setProperty("java.security.krb5.properties",
krb5file.getAbsolutePath());
+ Configuration.getConfiguration().refresh();
+
+ // Client config
+
+ log.info("created AuthenticationSasl");
+ }
+
+ @AfterClass
+ public static void stopMiniKdc() {
+ System.clearProperty("java.security.auth.login.config");
+ System.clearProperty("java.security.krb5.properties");
+ if (kdc != null) {
+ kdc.stop();
+ }
+ FileUtils.deleteQuietly(kdcDir);
+ FileUtils.deleteQuietly(kerberosWorkDir);
+ Assert.assertFalse(kdcDir.exists());
+ Assert.assertFalse(kerberosWorkDir.exists());
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ log.info("-- {} --, start at host: {}", methodName,
localHostname);
+ webServicePort = PortManager.nextFreePort();
+ servicePort = PortManager.nextFreePort();
+ isTcpLookup = true;
+ conf.setAdvertisedAddress(localHostname);
+ conf.setAuthenticationEnabled(true);
+ conf.setSaslAuthentication(true);
+ conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
+ conf.setSaslJaasServerSectionName("PulsarBroker");
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderSasl.class.getName());
+ conf.setAuthenticationProviders(providers);
+ conf.setClusterName("test");
+
+ super.init();
+
+ lookupUrl = new URI("broker://" + "localhost" + ":" +
BROKER_PORT);
+
+ super.producerBaseSetup();
+ log.info("-- {} --, end.", methodName);
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ void testAuthentication() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ // Step 1: Create Admin Client
+ //updateAdminClient();
+ final String proxyServiceUrl = "pulsar://localhost:" +
servicePort;
+ // create a client which connects to proxy and pass authData
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(servicePort);
+ proxyConfig.setWebServicePort(webServicePort);
+ proxyConfig.setBrokerServiceURL("pulsar://localhost:" +
BROKER_PORT);
+ proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname +
".*");
+ proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+
+ // proxy connect to broker
+
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
+ /*proxyConfig.setBrokerClientAuthenticationParameters(
+ "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\","
+
+ "\"serverType\": " + "\"broker\"}");*/
+ proxyConfig.setBrokerClientAuthenticationParameters(
+ "{\"saslJaasClientSectionName\": " + "\"PulsarProxy\","
+
+ "\"serverType\": " + "\"broker\"}");
+
+ // proxy as a server, it will use sasl to authn
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderSasl.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+
+ proxyConfig.setForwardAuthorizationCredentials(true);
+ AuthenticationService authenticationService = new
AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig));
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService);
+
+ proxyService.start();
+ log.info("1 proxy service started {}", proxyService);
+
+ // Step 3: Pass correct client params
+ PulsarClient proxyClient = createProxyClient(proxyServiceUrl,
1);
+ log.info("2 create proxy client {}, {}", proxyServiceUrl,
proxyClient);
+
+ Producer<byte[]> producer =
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+ log.info("3 created producer.");
+
+ Consumer<byte[]> consumer =
proxyClient.newConsumer(Schema.BYTES).topic(topicName).subscriptionName("test-sub").subscribe();
+ log.info("4 created consumer.");
+
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ log.info("Produced message: [{}]", message);
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet,
receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ proxyClient.close();
+ proxyService.close();
+ }
+
+ private PulsarClient createProxyClient(String proxyServiceUrl, int
numberOfConnections) throws PulsarClientException {
+ Map<String, String> clientSaslConfig = Maps.newHashMap();
+ clientSaslConfig.put("saslJaasClientSectionName",
"PulsarClient");
+ clientSaslConfig.put("serverType", "proxy");
+ log.info("set client jaas section name: PulsarClient,
serverType: proxy");
+ Authentication authSasl =
AuthenticationFactory.create(AuthenticationSasl.class.getName(),
clientSaslConfig);
+
+ return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+
.authentication(authSasl).connectionsPerBroker(numberOfConnections).build();
+ }
+}
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index 61d2b04..e098bcd 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -156,7 +156,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
conf.setAuthenticationEnabled(true);
conf.setSaslAuthentication(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
- conf.setSaslJaasBrokerSectionName("PulsarBroker");
+ conf.setSaslJaasServerSectionName("PulsarBroker");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f7c0f93..b3262c1 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -226,7 +226,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "(disable default-ttl with value 0)"
)
private int ttlDurationDefaultInSeconds = 0;
-
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable the deletion of inactive topics"
@@ -625,7 +625,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
category = CATEGORY_SASL_AUTH,
doc = "Service Principal, for login context name. Default value is
\"Broker\"."
)
- private String saslJaasBrokerSectionName =
SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
+ private String saslJaasServerSectionName =
SaslConstants.JAAS_DEFAULT_BROKER_SECTION_NAME;
@FieldContext(
category = CATEGORY_SASL_AUTH,
diff --git
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index 7b98be3..b8eca39 100644
---
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -1,4 +1,3 @@
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -53,6 +52,7 @@ public class AuthenticationSasl implements Authentication,
EncodedAuthentication
private Map<String, String> configuration;
private String loginContextName;
+ private String serverType = null;
public AuthenticationSasl() {
}
@@ -63,10 +63,10 @@ public class AuthenticationSasl implements Authentication,
EncodedAuthentication
}
@Override
- public AuthenticationDataProvider getAuthData(String brokerHostName)
throws PulsarClientException {
+ public AuthenticationDataProvider getAuthData(String serverHostname)
throws PulsarClientException {
// reuse this to return a DataProvider which contains a SASL client
try {
- PulsarSaslClient saslClient = new PulsarSaslClient(brokerHostName,
jaasCredentialsContainer.getSubject());
+ PulsarSaslClient saslClient = new PulsarSaslClient(serverHostname,
serverType, jaasCredentialsContainer.getSubject());
return new SaslAuthenticationDataProvider(saslClient);
} catch (Throwable t) {
log.error("Failed create sasl client: {}", t);
@@ -105,6 +105,8 @@ public class AuthenticationSasl implements Authentication,
EncodedAuthentication
// read section from config files of kerberos
this.loginContextName = authParams
.getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME,
SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
+ this.serverType = authParams
+ .getOrDefault(SaslConstants.SASL_SERVER_TYPE,
SaslConstants.SASL_BROKER_PROTOCOL);
// init the static jaasCredentialsContainer that shares amongst client.
if (!initializedJAAS) {
diff --git
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
index 23399a1..59ee47c5e 100644
---
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
+++
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/PulsarSaslClient.java
@@ -49,11 +49,14 @@ public class PulsarSaslClient {
private final SaslClient saslClient;
private final Subject clientSubject;
- public PulsarSaslClient(String serverHostname, Subject subject) throws
SaslException {
+ public PulsarSaslClient(String serverHostname, String serverType, Subject
subject) throws SaslException {
checkArgument(subject != null, "Cannot create SASL client with NULL
JAAS subject");
checkArgument(!Strings.isNullOrEmpty(serverHostname), "Cannot create
SASL client with NUll server name");
+
checkArgument(serverType.equalsIgnoreCase(SaslConstants.SASL_BROKER_PROTOCOL) ||
+ serverType.equalsIgnoreCase(SaslConstants.SASL_PROXY_PROTOCOL),
+ "Server type [" + serverType + "] invalid, should be broker or
proxy");
- String serverPrincipal = SaslConstants.SASL_PULSAR_PROTOCOL + "/" +
serverHostname;
+ String serverPrincipal = serverType.toLowerCase() + "/" +
serverHostname;
this.clientSubject = subject;
if (clientSubject.getPrincipals().isEmpty()) {
throw new SaslException("Cannot create SASL client with empty JAAS
subject principal");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 948c027..2921777 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -123,14 +123,14 @@ public class ClientCnx extends PulsarHandler {
protected String proxyToTargetBrokerAddress = null;
// Remote hostName with which client is connected
- private String remoteHostName = null;
+ protected String remoteHostName = null;
private boolean isTlsHostnameVerificationEnable;
private DefaultHostnameVerifier hostnameVerifier;
private final ScheduledFuture<?> timeoutTask;
// Added for mutual authentication.
- private AuthenticationDataProvider authenticationDataProvider;
+ protected AuthenticationDataProvider authenticationDataProvider;
enum State {
None, SentConnectFrame, Ready, Failed, Connecting
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 0481e18..3692e84 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.api;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFrom;
import static
org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8;
@@ -154,7 +155,7 @@ public class Commands {
}
public static ByteBuf newConnect(String authMethodName, AuthData authData,
int protocolVersion, String libVersion,
- String targetBroker, String
originalPrincipal, String originalAuthData,
+ String targetBroker, String
originalPrincipal, AuthData originalAuthData,
String originalAuthMethod) {
CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
connectBuilder.setClientVersion(libVersion != null ? libVersion :
"Pulsar Client");
@@ -174,7 +175,7 @@ public class Commands {
}
if (originalAuthData != null) {
- connectBuilder.setOriginalAuthData(originalAuthData);
+ connectBuilder.setOriginalAuthData(new
String(originalAuthData.getBytes(), UTF_8));
}
if (originalAuthMethod != null) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
index 749d411..b2e3c3c 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/SaslConstants.java
@@ -29,13 +29,9 @@ public class SaslConstants {
public static final String AUTH_METHOD_NAME = "sasl";
- // service broker Principal
- public static final String JAAS_BROKER_SECTION_NAME =
"saslJaasBrokerSectionName";
+ // service section name, this is broker or proxy Principal
+ public static final String JAAS_SERVER_SECTION_NAME =
"saslJaasServerSectionName";
public static final String JAAS_DEFAULT_BROKER_SECTION_NAME =
"PulsarBroker";
-
- //TODO: for sasl proxy.
- // github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
- public static final String JAAS_PROXY_SECTION_NAME =
"saslJaasProxySectionName";
public static final String JAAS_DEFAULT_PROXY_SECTION_NAME = "PulsarProxy";
// Client principal
@@ -53,8 +49,11 @@ public class SaslConstants {
public static final String KINIT_COMMAND = "kerberos.kinit";
+ // The sasl server type that client will connect to. default value broker,
could also be proxy.
+ public static final String SASL_SERVER_TYPE = "serverType";
// The non-null string name of the protocol for which the authentication
is being performed (e.g., "ldap").
- public static final String SASL_PULSAR_PROTOCOL = "broker";
+ public static final String SASL_BROKER_PROTOCOL = "broker";
+ public static final String SASL_PROXY_PROTOCOL = "proxy";
// The non-null fully-qualified host name of the server to authenticate to.
public static final String SASL_PULSAR_REALM = "EXAMPLE.COM";
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 0f96845..bfe3485 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -19,15 +19,23 @@
package org.apache.pulsar.proxy.server;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.net.URI;
import java.net.URISyntaxException;
import javax.net.ssl.SSLSession;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,20 +55,20 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
-import io.prometheus.client.Counter;
public class DirectProxyHandler {
private Channel inboundChannel;
Channel outboundChannel;
private String originalPrincipal;
- private String clientAuthData;
+ private AuthData clientAuthData;
private String clientAuthMethod;
private int protocolVersion;
public static final String TLS_HANDLER = "tls";
private final Authentication authentication;
private final SslContext sslCtx;
+ private AuthenticationDataProvider authenticationDataProvider;
public DirectProxyHandler(ProxyService service, ProxyConnection
proxyConnection, String targetBrokerUrl,
int protocolVersion, SslContext sslCtx) {
@@ -138,10 +146,8 @@ public class DirectProxyHandler {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Send the Connect command to broker
- String authData = "";
- if (authentication.getAuthData().hasDataFromCommand()) {
- authData = authentication.getAuthData().getCommandData();
- }
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
+ AuthData authData =
authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
ByteBuf command = null;
command = Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion, "Pulsar proxy",
null /* target broker */, originalPrincipal,
clientAuthData, clientAuthMethod);
@@ -177,6 +183,35 @@ public class DirectProxyHandler {
}
@Override
+ protected void handleAuthChallenge(CommandAuthChallenge authChallenge)
{
+ checkArgument(authChallenge.hasChallenge());
+ checkArgument(authChallenge.getChallenge().hasAuthData() &&
authChallenge.getChallenge().hasAuthData());
+
+ // mutual authn. If auth not complete, continue auth; if auth
complete, complete connectionFuture.
+ try {
+ AuthData authData = authenticationDataProvider
+
.authenticate(AuthData.of(authChallenge.getChallenge().getAuthData().toByteArray()));
+
+ checkState(!authData.isComplete());
+
+ ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
+ authData,
+ this.protocolVersion,
+ PulsarVersion.getVersion());
+
+ if (log.isDebugEnabled()) {
+ log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
+ }
+
+ outboundChannel.writeAndFlush(request);
+ outboundChannel.read();
+ } catch (Exception e) {
+ log.error("Error mutual verify: {}", e);
+ return;
+ }
+ }
+
+ @Override
public void operationComplete(Future<Void> future) throws Exception {
// This is invoked when the write operation on the paired
connection
// is completed
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 91e7345..624391e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -22,9 +22,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import org.apache.pulsar.PulsarVersion;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,12 +32,12 @@ import org.slf4j.LoggerFactory;
public class ProxyClientCnx extends ClientCnx {
String clientAuthRole;
- String clientAuthData;
+ AuthData clientAuthData;
String clientAuthMethod;
int protocolVersion;
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, String clientAuthRole,
- String clientAuthData, String clientAuthMethod, int
protocolVersion) {
+ AuthData clientAuthData, String clientAuthMethod,
int protocolVersion) {
super(conf, eventLoopGroup);
this.clientAuthRole = clientAuthRole;
this.clientAuthData = clientAuthData;
@@ -46,19 +46,18 @@ public class ProxyClientCnx extends ClientCnx {
}
@Override
- protected ByteBuf newConnectCommand() throws PulsarClientException {
+ protected ByteBuf newConnectCommand() throws Exception {
if (log.isDebugEnabled()) {
- log.debug(
- "New Connection opened via ProxyClientCnx with params
clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
+ log.debug("New Connection opened via ProxyClientCnx with params
clientAuthRole = {}," +
+ " clientAuthData = {}, clientAuthMethod = {}",
clientAuthRole, clientAuthData, clientAuthMethod);
}
- String authData = null;
- if (authentication.getAuthData().hasDataFromCommand()) {
- authData = authentication.getAuthData().getCommandData();
- }
- return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
- clientAuthMethod);
+
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
+ AuthData authData =
authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ return Commands.newConnect(authentication.getAuthMethodName(),
authData, this.protocolVersion,
+ PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
+ clientAuthMethod);
}
private static final Logger log =
LoggerFactory.getLogger(ProxyClientCnx.class);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 37be744..7b9a2c5 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PropertiesContext;
import org.apache.pulsar.common.configuration.PropertyContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.sasl.SaslConstants;
@Getter
@Setter
@@ -62,6 +63,8 @@ public class ProxyConfiguration implements
PulsarConfiguration {
private static final String CATEGORY_TOKEN_AUTH = "Token Authentication
Provider";
@Category
private static final String CATEGORY_HTTP = "HTTP";
+ @Category
+ private static final String CATEGORY_SASL_AUTH = "SASL Authentication
Provider";
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
@@ -186,6 +189,27 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private boolean forwardAuthorizationCredentials = false;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "This is a regexp, which limits the range of possible ids which
can connect to the Broker using SASL.\n"
+ + " Default value is: \".*pulsar.*\", so only clients whose id
contains 'pulsar' are allowed to connect."
+ )
+ private String saslJaasClientAllowedIds =
SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "Service Principal, for login context name. Default value is
\"PulsarProxy\"."
+ )
+ private String saslJaasServerSectionName =
SaslConstants.JAAS_DEFAULT_PROXY_SECTION_NAME;
+
+ @FieldContext(
+ category = CATEGORY_SASL_AUTH,
+ doc = "kerberos kinit command."
+ )
+ private String kinitCommand = "/usr/bin/kinit";
+
+
@FieldContext(
category = CATEGORY_RATE_LIMITING,
doc = "Max concurrent inbound connections. The proxy will reject
requests beyond that"
@@ -408,4 +432,4 @@ public class ProxyConfiguration implements
PulsarConfiguration {
return String.format("HttpReverseProxyConfig(%s, path=%s,
proxyTo=%s)", name, path, proxyTo);
}
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 90b979e..a1147f6 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -23,21 +23,27 @@ import static
com.google.common.base.Preconditions.checkArgument;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
+import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
@@ -70,12 +76,24 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
private LookupProxyHandler lookupProxyHandler = null;
private DirectProxyHandler directProxyHandler = null;
String clientAuthRole;
- String clientAuthData;
+ AuthData clientAuthData;
String clientAuthMethod;
+ private String authMethod = "none";
+ AuthenticationProvider authenticationProvider;
+ AuthenticationState authState;
+ private ClientConfigurationData clientConf;
+ private boolean hasProxyToBrokerUrl;
+ private int protocolVersionToAdvertise;
+ private String proxyToBrokerUrl;
+
enum State {
Init,
+ // Connecting between user client and proxy server.
+ // Mutual authn needs verify between client and proxy server several
times.
+ Connecting,
+
// Proxy the lookup requests to a random broker
// Follow redirects
ProxyLookupRequests,
@@ -150,6 +168,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
switch (state) {
case Init:
+ case Connecting:
case ProxyLookupRequests:
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
@@ -182,54 +201,160 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
}
}
- /**
- * handles connect request and sends {@code State.Connected} ack to client
- */
+ private void completeConnect() {
+ LOG.info("[{}] complete connection, init proxy handler. authenticated
with {} role {}, hasProxyToBrokerUrl: {}",
+ remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+ if (hasProxyToBrokerUrl) {
+ // Client already knows which broker to connect. Let's open a
+ // connection there and just pass bytes in both directions
+ state = State.ProxyConnectionToBroker;
+ directProxyHandler = new DirectProxyHandler(service, this,
proxyToBrokerUrl,
+ protocolVersionToAdvertise, sslCtx);
+ cancelKeepAliveTask();
+ } else {
+ // Client is doing a lookup, we can consider the handshake complete
+ // and we'll take care of just topics and
+ // partitions metadata lookups
+ state = State.ProxyLookupRequests;
+ lookupProxyHandler = new LookupProxyHandler(service, this);
+
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+ }
+ }
+
+ private void createClientAndCompleteConnect(AuthData clientData)
+ throws PulsarClientException {
+ if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+ this.clientAuthData = clientData;
+ this.clientAuthMethod = authMethod;
+ }
+ this.client = createClient(clientConf, this.clientAuthData,
this.clientAuthMethod, protocolVersionToAdvertise);
+
+ completeConnect();
+ }
+
+ // According to auth result, send newConnected or newAuthChallenge command.
+ private void doAuthentication(AuthData clientData) throws Exception {
+ AuthData brokerData = authState.authenticate(clientData);
+ // authentication has completed, will send newConnected command.
+ if (authState.isComplete()) {
+ clientAuthRole = authState.getAuthRole();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Client successfully authenticated with {} role
{}",
+ remoteAddress, authMethod, clientAuthRole);
+ }
+ createClientAndCompleteConnect(clientData);
+ return;
+ }
+
+ // auth not complete, continue auth with client side.
+ ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
protocolVersionToAdvertise));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Authentication in progress client by method {}.",
+ remoteAddress, authMethod);
+ }
+ state = State.Connecting;
+ return;
+ }
+
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
- remoteEndpointProtocolVersion = connect.getProtocolVersion();
+ this.remoteEndpointProtocolVersion = connect.getProtocolVersion();
+ this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
+ this.protocolVersionToAdvertise =
getProtocolVersionToAdvertise(connect);
+ this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ?
connect.getProxyToBrokerUrl() : "null";
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Received CONNECT from {} proxyToBroker={}",
remoteAddress,
- connect.hasProxyToBrokerUrl() ?
connect.getProxyToBrokerUrl() : "null");
+ LOG.debug("Received CONNECT from {} proxyToBroker={}",
remoteAddress, proxyToBrokerUrl);
+ LOG.debug(
+ "[{}] Protocol version to advertise to broker is {},
clientProtocolVersion={}, proxyProtocolVersion={}",
+ remoteAddress, protocolVersionToAdvertise,
remoteEndpointProtocolVersion,
+ Commands.getCurrentProtocolVersion());
}
- // Client need to do some minimal cooperation logic.
if (remoteEndpointProtocolVersion <
PulsarApi.ProtocolVersion.v10_VALUE) {
LOG.warn("[{}] Client doesn't support connecting through proxy",
remoteAddress);
ctx.close();
return;
}
- int protocolVersionToAdvertise =
getProtocolVersionToAdvertise(connect);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "[{}] Protocol version to advertise to broker is {},
clientProtocolVersion={}, proxyProtocolVersion={}",
- remoteAddress, protocolVersionToAdvertise,
remoteEndpointProtocolVersion,
- Commands.getCurrentProtocolVersion());
- }
+ try {
+ // init authn
+ this.clientConf = createClientConfiguration();
+ this.clientAuthentication = clientConf.getAuthentication();
+ int protocolVersion = getProtocolVersionToAdvertise(connect);
+
+ // authn not enabled, complete
+ if (!service.getConfiguration().isAuthenticationEnabled()) {
+ this.client = new PulsarClientImpl(clientConf,
service.getWorkerGroup(),
+ new ProxyConnectionPool(clientConf,
service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersion)));
+
+ completeConnect();
+ return;
+ }
+
+ AuthData clientData =
AuthData.of(connect.getAuthData().toByteArray());
+ if (connect.hasAuthMethodName()) {
+ authMethod = connect.getAuthMethodName();
+ } else if (connect.hasAuthMethod()) {
+ // Legacy client is passing enum
+ authMethod =
connect.getAuthMethod().name().substring(10).toLowerCase();
+ } else {
+ authMethod = "none";
+ }
+
+ authenticationProvider = service
+ .getAuthenticationService()
+ .getAuthenticationProvider(authMethod);
+
+ // Not find provider named authMethod. Most used for tests.
+ // In AuthenticationDisabled, it will set authMethod "none".
+ if (authenticationProvider == null) {
+ clientAuthRole =
service.getAuthenticationService().getAnonymousUserRole()
+ .orElseThrow(() ->
+ new AuthenticationException("No anonymous role, and no
authentication provider configured"));
- if (!authenticateAndCreateClient(connect)) {
+ createClientAndCompleteConnect(clientData);
+ return;
+ }
+
+ // init authState and other var
+ ChannelHandler sslHandler =
ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
+ SSLSession sslSession = null;
+ if (sslHandler != null) {
+ sslSession = ((SslHandler) sslHandler).engine().getSession();
+ }
+
+ authState = authenticationProvider.newAuthState(clientData,
remoteAddress, sslSession);
+ doAuthentication(clientData);
+ } catch (Exception e) {
+ LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate"));
close();
return;
}
+ }
- if (connect.hasProxyToBrokerUrl()) {
- // Client already knows which broker to connect. Let's open a
- // connection
- // there and just pass bytes in both directions
- state = State.ProxyConnectionToBroker;
- directProxyHandler = new DirectProxyHandler(service, this,
connect.getProxyToBrokerUrl(),
- protocolVersionToAdvertise, sslCtx);
- cancelKeepAliveTask();
- } else {
- // Client is doing a lookup, we can consider the handshake complete
- // and we'll take care of just topics and
- // partitions metadata lookups
- state = State.ProxyLookupRequests;
- lookupProxyHandler = new LookupProxyHandler(service, this);
-
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
+ @Override
+ protected void handleAuthResponse(CommandAuthResponse authResponse) {
+ checkArgument(state == State.Connecting);
+ checkArgument(authResponse.hasResponse());
+ checkArgument(authResponse.getResponse().hasAuthData() &&
authResponse.getResponse().hasAuthMethodName());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received AuthResponse from {}, auth method: {}",
+ remoteAddress, authResponse.getResponse().getAuthMethodName());
+ }
+
+ try {
+ AuthData clientData =
AuthData.of(authResponse.getResponse().getAuthData().toByteArray());
+ doAuthentication(clientData);
+ } catch (Exception e) {
+ String msg = "Unable to handleAuthResponse";
+ LOG.warn("[{}] {} ", remoteAddress, msg, e);
+ ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, msg));
+ close();
}
}
@@ -282,50 +407,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
return clientConf;
}
- private boolean authenticateAndCreateClient(CommandConnect connect) {
- try {
- ClientConfigurationData clientConf = createClientConfiguration();
- this.clientAuthentication = clientConf.getAuthentication();
-
- final int protocolVersion = getProtocolVersionToAdvertise(connect);
- if (!service.getConfiguration().isAuthenticationEnabled()) {
- this.client = new PulsarClientImpl(clientConf,
service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf,
service.getWorkerGroup(),
- () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersion)));
- return true;
- }
-
- String authMethod = "none";
- if (connect.hasAuthMethodName()) {
- authMethod = connect.getAuthMethodName();
- } else if (connect.hasAuthMethod()) {
- // Legacy client is passing enum
- authMethod =
connect.getAuthMethod().name().substring(10).toLowerCase();
- }
- String authData = connect.getAuthData().toStringUtf8();
- ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
- SSLSession sslSession = null;
- if (sslHandler != null) {
- sslSession = ((SslHandler) sslHandler).engine().getSession();
- }
- authenticationData = new AuthenticationDataCommand(authData,
remoteAddress, sslSession);
- clientAuthRole =
service.getAuthenticationService().authenticate(authenticationData, authMethod);
- LOG.info("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod,
- clientAuthRole);
- if
(service.getConfiguration().isForwardAuthorizationCredentials()) {
- this.clientAuthData = authData;
- this.clientAuthMethod = authMethod;
- }
- this.client = createClient(clientConf, this.clientAuthData,
this.clientAuthMethod, protocolVersion);
-
- return true;
- } catch (Exception e) {
- LOG.warn("[{}] Unable to authenticate: {}", remoteAddress,
e.getMessage());
- return false;
- }
- }
-
- private PulsarClientImpl createClient(final ClientConfigurationData
clientConf, final String clientAuthData,
+ private PulsarClientImpl createClient(final ClientConfigurationData
clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws
PulsarClientException {
return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
() -> new ProxyClientCnx(clientConf,