reswqa commented on code in PR #29:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/29#discussion_r1107945178


##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminBuilder.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import 
org.apache.flink.connector.pulsar.common.handler.PulsarAdminInvocationHandler;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.lang.reflect.Proxy;
+
+/**
+ * {@link org.apache.pulsar.client.admin.PulsarAdminBuilder} didn't expose all 
the configurations to
+ * end user. We have to extend the default builder method for adding extra 
configurations.
+ */
+public class PulsarAdminBuilder extends PulsarAdminBuilderImpl {

Review Comment:
   nit: The class name here may be changed. After all, a class inherits its 
`Impl` class, which looks strange. 
   Maybe `PulsarAdminProxyBuilder`?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminBuilder.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import 
org.apache.flink.connector.pulsar.common.handler.PulsarAdminInvocationHandler;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.lang.reflect.Proxy;
+
+/**
+ * {@link org.apache.pulsar.client.admin.PulsarAdminBuilder} didn't expose all 
the configurations to
+ * end user. We have to extend the default builder method for adding extra 
configurations.
+ */
+public class PulsarAdminBuilder extends PulsarAdminBuilderImpl {
+
+    private final PulsarConfiguration configuration;
+
+    public PulsarAdminBuilder(PulsarConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
+     * This is used by the internal implementation of {@link 
AsyncHttpConnector} to set the max
+     * allowed number of request threads.
+     */
+    public void numIoThreads(int numIoThreads) {
+        conf.setNumIoThreads(numIoThreads);
+    }
+
+    /** Wrap the pulsar admin interface into a proxy instance which can retry 
the request. */

Review Comment:
   ```suggestion
       /** Wrap the pulsar admin interface into a proxy instance which can 
retry the request and limit the request rate. */
   ```



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminBuilder.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import 
org.apache.flink.connector.pulsar.common.handler.PulsarAdminInvocationHandler;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.lang.reflect.Proxy;
+
+/**
+ * {@link org.apache.pulsar.client.admin.PulsarAdminBuilder} didn't expose all 
the configurations to
+ * end user. We have to extend the default builder method for adding extra 
configurations.
+ */
+public class PulsarAdminBuilder extends PulsarAdminBuilderImpl {
+
+    private final PulsarConfiguration configuration;
+
+    public PulsarAdminBuilder(PulsarConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
+     * This is used by the internal implementation of {@link 
AsyncHttpConnector} to set the max
+     * allowed number of request threads.
+     */
+    public void numIoThreads(int numIoThreads) {
+        conf.setNumIoThreads(numIoThreads);
+    }
+
+    /** Wrap the pulsar admin interface into a proxy instance which can retry 
the request. */
+    @Override
+    public PulsarAdmin build() throws PulsarClientException {
+        PulsarAdminInvocationHandler handler =
+                new PulsarAdminInvocationHandler(super.build(), configuration);
+        return (PulsarAdmin)
+                Proxy.newProxyInstance(
+                        Thread.currentThread().getContextClassLoader(),

Review Comment:
   Maybe `PulsarAdmin.class.getClassLoader()` is more suitable? WDYT?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java:
##########
@@ -627,4 +628,28 @@ private PulsarOptions() {
                     .defaultValue(300000)
                     .withDescription(
                             "The auto cert refresh time (in ms) if Pulsar 
admin supports TLS authentication.");
+
+    // These config options below are passing to PulsarAdminRequest. A wrapper 
for PulsarAdmin.

Review Comment:
   I can't find the definition of `PulsarAdminRequest`.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.handler;
+
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
+public class PulsarAdminInvocationHandler implements InvocationHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
+
+    private final PulsarAdmin admin;
+    private final int retryTimes;
+    private final long waitMillis;
+    private final RateLimiter rateLimiter;
+    private final Map<String, Object> handlers;
+
+    public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration 
configuration) {
+        this.admin = admin;
+        this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
+        this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
+        this.rateLimiter = 
RateLimiter.create(configuration.get(PULSAR_ADMIN_REQUEST_RATES));
+        this.handlers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        Class<?> returnType = method.getReturnType();
+
+        // No need to proxy the void return type.
+        // The non-interface type is not able to proxy.
+        if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
+            return method.invoke(admin, args);
+        }
+
+        String methodName = method.getName();
+        if (handlers.containsKey(methodName)) {
+            return handlers.get(methodName);
+        }
+
+        Object handler =
+                Proxy.newProxyInstance(
+                        Thread.currentThread().getContextClassLoader(),
+                        new Class[] {returnType},
+                        new RequestHandler(method.invoke(admin, args)));
+        this.handlers.put(methodName, handler);
+
+        return handler;
+    }
+
+    /** A proxy handler with retry support for all the admin request. */
+    @SuppressWarnings({"java:S1193", "java:S1181"})
+    private class RequestHandler implements InvocationHandler {
+
+        private final Object handler;
+
+        public RequestHandler(Object handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            return doInvoke(method, args, retryTimes);
+        }
+
+        private Object doInvoke(Method method, Object[] args, int 
remainingTimes) throws Throwable {
+            // Make sure the request is allowed in the given rates.
+            rateLimiter.acquire();
+
+            try {
+                return method.invoke(handler, args);
+            } catch (Throwable e) {
+                if (e instanceof PulsarAdminException) {
+                    if (e instanceof NotFoundException) {

Review Comment:
   The `PulsarAdminException` never thrown in `method.invoke(handler, args)`, I 
guess you should handle `InvocationTargetException` firstly.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.handler;
+
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
+public class PulsarAdminInvocationHandler implements InvocationHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
+
+    private final PulsarAdmin admin;
+    private final int retryTimes;
+    private final long waitMillis;
+    private final RateLimiter rateLimiter;
+    private final Map<String, Object> handlers;
+
+    public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration 
configuration) {
+        this.admin = admin;
+        this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
+        this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
+        this.rateLimiter = 
RateLimiter.create(configuration.get(PULSAR_ADMIN_REQUEST_RATES));
+        this.handlers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        Class<?> returnType = method.getReturnType();
+
+        // No need to proxy the void return type.
+        // The non-interface type is not able to proxy.
+        if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
+            return method.invoke(admin, args);
+        }
+
+        String methodName = method.getName();
+        if (handlers.containsKey(methodName)) {
+            return handlers.get(methodName);
+        }
+
+        Object handler =
+                Proxy.newProxyInstance(
+                        Thread.currentThread().getContextClassLoader(),
+                        new Class[] {returnType},
+                        new RequestHandler(method.invoke(admin, args)));
+        this.handlers.put(methodName, handler);
+
+        return handler;
+    }
+
+    /** A proxy handler with retry support for all the admin request. */
+    @SuppressWarnings({"java:S1193", "java:S1181"})
+    private class RequestHandler implements InvocationHandler {
+
+        private final Object handler;
+
+        public RequestHandler(Object handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            return doInvoke(method, args, retryTimes);
+        }
+
+        private Object doInvoke(Method method, Object[] args, int 
remainingTimes) throws Throwable {
+            // Make sure the request is allowed in the given rates.
+            rateLimiter.acquire();

Review Comment:
   IIUC, The current working mode is that each task will create `PulsarAdmin` 
independently, so each task is independent limit the request rate. Do we need 
TM granularity global rate limiting?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.handler;
+
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
+public class PulsarAdminInvocationHandler implements InvocationHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
+
+    private final PulsarAdmin admin;
+    private final int retryTimes;
+    private final long waitMillis;
+    private final RateLimiter rateLimiter;
+    private final Map<String, Object> handlers;
+
+    public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration 
configuration) {
+        this.admin = admin;
+        this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
+        this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
+        this.rateLimiter = 
RateLimiter.create(configuration.get(PULSAR_ADMIN_REQUEST_RATES));
+        this.handlers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        Class<?> returnType = method.getReturnType();
+
+        // No need to proxy the void return type.
+        // The non-interface type is not able to proxy.
+        if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
+            return method.invoke(admin, args);
+        }
+
+        String methodName = method.getName();
+        if (handlers.containsKey(methodName)) {
+            return handlers.get(methodName);
+        }
+
+        Object handler =
+                Proxy.newProxyInstance(
+                        Thread.currentThread().getContextClassLoader(),
+                        new Class[] {returnType},
+                        new RequestHandler(method.invoke(admin, args)));
+        this.handlers.put(methodName, handler);
+
+        return handler;
+    }
+
+    /** A proxy handler with retry support for all the admin request. */
+    @SuppressWarnings({"java:S1193", "java:S1181"})
+    private class RequestHandler implements InvocationHandler {
+
+        private final Object handler;
+
+        public RequestHandler(Object handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            return doInvoke(method, args, retryTimes);
+        }
+
+        private Object doInvoke(Method method, Object[] args, int 
remainingTimes) throws Throwable {
+            // Make sure the request is allowed in the given rates.
+            rateLimiter.acquire();
+
+            try {
+                return method.invoke(handler, args);
+            } catch (Throwable e) {
+                if (e instanceof PulsarAdminException) {
+                    if (e instanceof NotFoundException) {
+                        // No need to retry on such exceptions.
+                        throw e;
+                    }
+
+                    remainingTimes--;
+                    LOG.warn("Request error in Admin API, remain times: {}", 
remainingTimes, e);
+                    if (remainingTimes == 0) {
+                        throw e;
+                    } else {
+                        // Sleep for the given times before executing the next 
query.
+                        sleepUninterruptibly(waitMillis, MILLISECONDS);
+                        return doInvoke(method, args, remainingTimes);

Review Comment:
   If possible, can we convert recursion into a loop to avoid too deep call 
stack?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java:
##########
@@ -52,11 +62,80 @@ public TopicPatternSubscriber(Pattern topicPattern, 
RegexSubscriptionMode subscr
         TopicName destination = TopicName.get(topicPattern.pattern());
         NamespaceName namespaceName = destination.getNamespaceObject();
         this.namespace = namespaceName.toString();
+        this.subscriptionMode = convertRegexSubscriptionMode(subscriptionMode);
+        this.useBinaryProtocol = true;
     }
 
     @Override
     public Set<TopicPartition> getSubscribedTopicPartitions(
             RangeGenerator generator, int parallelism) throws Exception {
+        Set<String> topics;
+        if (useBinaryProtocol) {
+            try {
+                topics = queryTopicsByInternalProtocols();

Review Comment:
   Can the current test case use the new protocol? If not, it needs to be 
covered. In addition, it may be necessary to test the case that new protocol is 
not compatible while fallback to the old protocol.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.handler;
+
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
+public class PulsarAdminInvocationHandler implements InvocationHandler {

Review Comment:
   We do need some test to cover these changes, including but not limited to 
the logic of request retry and rate limiting.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.handler;
+
+import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
+public class PulsarAdminInvocationHandler implements InvocationHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
+
+    private final PulsarAdmin admin;
+    private final int retryTimes;
+    private final long waitMillis;
+    private final RateLimiter rateLimiter;
+    private final Map<String, Object> handlers;
+
+    public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration 
configuration) {
+        this.admin = admin;
+        this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
+        this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
+        this.rateLimiter = 
RateLimiter.create(configuration.get(PULSAR_ADMIN_REQUEST_RATES));
+        this.handlers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        Class<?> returnType = method.getReturnType();
+
+        // No need to proxy the void return type.
+        // The non-interface type is not able to proxy.
+        if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
+            return method.invoke(admin, args);
+        }
+
+        String methodName = method.getName();
+        if (handlers.containsKey(methodName)) {
+            return handlers.get(methodName);
+        }
+
+        Object handler =
+                Proxy.newProxyInstance(
+                        Thread.currentThread().getContextClassLoader(),
+                        new Class[] {returnType},
+                        new RequestHandler(method.invoke(admin, args)));
+        this.handlers.put(methodName, handler);
+
+        return handler;
+    }
+
+    /** A proxy handler with retry support for all the admin request. */
+    @SuppressWarnings({"java:S1193", "java:S1181"})
+    private class RequestHandler implements InvocationHandler {
+
+        private final Object handler;
+
+        public RequestHandler(Object handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            return doInvoke(method, args, retryTimes);
+        }
+
+        private Object doInvoke(Method method, Object[] args, int 
remainingTimes) throws Throwable {
+            // Make sure the request is allowed in the given rates.
+            rateLimiter.acquire();
+
+            try {
+                return method.invoke(handler, args);
+            } catch (Throwable e) {
+                if (e instanceof PulsarAdminException) {
+                    if (e instanceof NotFoundException) {
+                        // No need to retry on such exceptions.
+                        throw e;
+                    }
+
+                    remainingTimes--;
+                    LOG.warn("Request error in Admin API, remain times: {}", 
remainingTimes, e);
+                    if (remainingTimes == 0) {
+                        throw e;
+                    } else {
+                        // Sleep for the given times before executing the next 
query.
+                        sleepUninterruptibly(waitMillis, MILLISECONDS);

Review Comment:
   I'm just a little curious about whether we can replace fixed sleep with 
waiting based on some backoff algorithm, but at present, fixed sleep is also 
make sense to me.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##########
@@ -210,6 +210,10 @@ public PulsarSourceBuilder<OUT> setTopics(List<String> 
topics) {
      * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
      * {@link #setTopics} or {@link #setTopicPattern} in this builder.
      *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular

Review Comment:
   This also applies to the changes of `VisibleForTesting` part in the first 
commit. Maybe it should belong to a hot-fix commit before all commits.



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java:
##########
@@ -210,6 +210,10 @@ public PulsarSourceBuilder<OUT> setTopics(List<String> 
topics) {
      * Set a topic pattern to consume from the java regex str. You can set 
topics once either with
      * {@link #setTopics} or {@link #setTopicPattern} in this builder.
      *
+     * <p>Remember that we will only subscribe to one tenant and one namespace 
by using regular

Review Comment:
   Emm, It seems that the changes in the class should not be included in the 
first commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to