gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661112139
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
##########
@@ -294,9 +294,13 @@ void closeAllConnections() {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost :
remoteAddress))
+ .thenCompose(channel -> channelInitializerHandler
+ .initSocks5IfConfig(channel))
Review comment:
Does the TLS could work well with socks5?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+ private Socks5Server server;
+
+ final String topicName = "persistent://public/default/socks5";
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ baseSetup();
+ initData();
+ }
+
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost",
11080))
+ .socks5ProxyUsername("socks5")
+ .socks5ProxyPassword("pulsar");
+ }
+
+ private void startSocks5Server(boolean enableAuth) {
+ Socks5Config config = new Socks5Config();
+ config.setPort(11080);
+ config.setEnableAuth(enableAuth);
+ server = new Socks5Server(config);
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ e.printStackTrace();
Review comment:
Use log?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
}
}
+ public InetSocketAddress getSocks5ProxyAddress() {
+ if (Objects.nonNull(socks5ProxyAddress)) {
+ return socks5ProxyAddress;
+ }
+ String proxyAddress = System.getProperty("socks5Proxy.address");
Review comment:
Does it need to config client by the system property?
##########
File path: site2/docs/client-libraries-java.md
##########
@@ -121,7 +121,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a
connection to a broker to be
int|`requestTimeoutMs`|Maximum duration for completing a request |60000
int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval |
TimeUnit.MILLISECONDS.toNanos(100);
long|`maxBackoffIntervalNanos`|Maximum duration for a backoff
interval|TimeUnit.SECONDS.toNanos(30)
-
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None
Review comment:
The type is `String`?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
}
}
+ public InetSocketAddress getSocks5ProxyAddress() {
+ if (Objects.nonNull(socks5ProxyAddress)) {
+ return socks5ProxyAddress;
+ }
+ String proxyAddress = System.getProperty("socks5Proxy.address");
+ try {
+ URI uri = URI.create(proxyAddress);
+ return new InetSocketAddress(uri.getHost(), uri.getPort());
+ } catch (Exception ignore) {
+ return null;
Review comment:
Why ignore this exception?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
}
}
+ public InetSocketAddress getSocks5ProxyAddress() {
+ if (Objects.nonNull(socks5ProxyAddress)) {
+ return socks5ProxyAddress;
+ }
+ String proxyAddress = System.getProperty("socks5Proxy.address");
+ try {
+ URI uri = URI.create(proxyAddress);
+ return new InetSocketAddress(uri.getHost(), uri.getPort());
+ } catch (Exception ignore) {
+ return null;
+ }
+ }
+
+ public String getSocks5ProxyUsername() {
+ return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername :
System.getProperty("socks5Proxy.username");
+ }
+ public String getSocks5ProxyPassword() {
+ return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword :
System.getProperty("socks5Proxy.password");
Review comment:
Same as above.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
}
}
+ public InetSocketAddress getSocks5ProxyAddress() {
+ if (Objects.nonNull(socks5ProxyAddress)) {
+ return socks5ProxyAddress;
+ }
+ String proxyAddress = System.getProperty("socks5Proxy.address");
+ try {
+ URI uri = URI.create(proxyAddress);
+ return new InetSocketAddress(uri.getHost(), uri.getPort());
+ } catch (Exception ignore) {
+ return null;
+ }
+ }
+
+ public String getSocks5ProxyUsername() {
+ return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername :
System.getProperty("socks5Proxy.username");
Review comment:
Does it need to config client by the system property?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -105,6 +109,11 @@
@JsonIgnore
private Clock clock = Clock.systemDefaultZone();
+ // socks5
+ private InetSocketAddress socks5ProxyAddress;
Review comment:
Could using the `String` type? We could generate the `InetSocketAddress`
in the `PulsarChannelInitializer.java ` when initializing the socks5 handler.
##########
File path: site2/website/versioned_docs/version-2.8.1/client-libraries-java.md
##########
@@ -122,6 +122,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a
connection to a broker to be
int|`requestTimeoutMs`|Maximum duration for completing a request |60000
int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval |
TimeUnit.MILLISECONDS.toNanos(100);
long|`maxBackoffIntervalNanos`|Maximum duration for a backoff
interval|TimeUnit.SECONDS.toNanos(30)
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None
Review comment:
The type is `String`?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+ private Socks5Server server;
+
+ final String topicName = "persistent://public/default/socks5";
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ baseSetup();
+ initData();
+ }
+
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost",
11080))
+ .socks5ProxyUsername("socks5")
+ .socks5ProxyPassword("pulsar");
+ }
+
+ private void startSocks5Server(boolean enableAuth) {
+ Socks5Config config = new Socks5Config();
+ config.setPort(11080);
+ config.setEnableAuth(enableAuth);
+ server = new Socks5Server(config);
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ server.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ server.shutdown();
+ }
+
+ private void initData() throws PulsarAdminException {
+ admin.tenants().createTenant("public", new TenantInfo() {
+ @Override
+ public Set<String> getAdminRoles() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getAllowedClusters() {
+ Set<String> clusters = new HashSet<>();
+ clusters.add("test");
+ return clusters;
+ }
+ });
+ admin.namespaces().createNamespace("public/default");
+ admin.topics().createNonPartitionedTopic(topicName);
+ }
+
+ @Test
+ public void testSendAndConsumer() throws PulsarClientException {
+ startSocks5Server(true);
+ // init consumer
+ final String subscriptionName = "socks5-subscription";
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ //init producer
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ String msg = "abc";
+ producer.send(msg.getBytes());
+ Message<byte[]> message = consumer.receive();
+
+ assertEquals(new String(message.getData()), msg);
+
+ consumer.unsubscribe();
+ }
+
+ @Test
+ public void testDisableAuth() throws PulsarClientException {
+ startSocks5Server(false);
+ ClientBuilder clientBuilder = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+ PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ String msg = "abc";
+ producer.send(msg.getBytes());
+ }
+
+ @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})
Review comment:
Is there any other clear flag for password error?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/socks5/handler/InitialRequestHandler.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.socks5.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.socksx.SocksVersion;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialRequest;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialResponse;
+import io.netty.handler.codec.socksx.v5.Socks5AuthMethod;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.socks5.config.Socks5Config;
+
+@Slf4j
+public class InitialRequestHandler extends
SimpleChannelInboundHandler<DefaultSocks5InitialRequest> {
+
+ private final Socks5Config socks5Config;
+
+ public InitialRequestHandler(final Socks5Config socks5Config) {
+ this.socks5Config = socks5Config;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx,
DefaultSocks5InitialRequest msg) throws Exception {
+ if (SocksVersion.SOCKS5.equals(msg.version())) {
+ if (msg.decoderResult().isFailure()) {
+ log.warn("decode failure : {}",
msg.decoderResult());
+ ctx.fireChannelRead(msg);
+ } else {
+ if (SocksVersion.SOCKS5.equals(msg.version())) {
Review comment:
It seems that this check is duplicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]