This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 78d00ea  [FLINK-32938] Replace pulsar admin calls (#59)
78d00ea is described below

commit 78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858
Author: Neng Lu <[email protected]>
AuthorDate: Wed Sep 20 20:14:43 2023 -0700

    [FLINK-32938] Replace pulsar admin calls (#59)
---
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |  14 +-
 .../common/config/PulsarAdminProxyBuilder.java     |  64 -----
 .../pulsar/common/config/PulsarClientFactory.java  |  52 +---
 .../pulsar/common/config/PulsarOptions.java        |  81 +-----
 .../handler/PulsarAdminInvocationHandler.java      | 145 ----------
 .../flink/connector/pulsar/sink/PulsarSink.java    |   1 -
 .../connector/pulsar/sink/PulsarSinkBuilder.java   |  13 +-
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |   2 -
 .../sink/writer/context/PulsarSinkContext.java     |   4 +-
 .../sink/writer/context/PulsarSinkContextImpl.java |   4 +-
 .../pulsar/sink/writer/topic/MetadataListener.java |  51 ++--
 .../connector/pulsar/source/PulsarSource.java      |   1 -
 .../pulsar/source/PulsarSourceBuilder.java         |  11 +-
 .../source/config/PulsarSourceConfigUtils.java     |   2 -
 .../source/enumerator/PulsarSourceEnumerator.java  |  18 +-
 .../source/enumerator/cursor/CursorPosition.java   |  77 +++--
 .../source/enumerator/cursor/StopCursor.java       |   4 +-
 .../cursor/start/MessageIdStartCursor.java         |   2 +-
 .../cursor/stop/LatestMessageStopCursor.java       |  27 +-
 .../enumerator/subscriber/PulsarSubscriber.java    |   4 +-
 .../subscriber/impl/BasePulsarSubscriber.java      |  34 +--
 .../subscriber/impl/TopicListSubscriber.java       |   5 +-
 .../source/reader/PulsarPartitionSplitReader.java  |  14 +-
 .../pulsar/source/reader/PulsarSourceReader.java   |   9 -
 .../pulsar/source/split/PulsarPartitionSplit.java  |   6 +-
 .../connector/pulsar/table/PulsarTableFactory.java |   9 +-
 .../connector/pulsar/table/PulsarTableOptions.java |  21 --
 .../pulsar/table/UpsertPulsarTableFactory.java     |   9 +-
 .../handler/PulsarAdminInvocationHandlerTest.java  | 309 ---------------------
 .../pulsar/sink/PulsarSinkBuilderTest.java         |   5 +-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |   1 -
 .../pulsar/source/PulsarSourceBuilderTest.java     |   4 -
 .../source/enumerator/cursor/StopCursorTest.java   |   1 -
 .../subscriber/PulsarSubscriberTest.java           |  14 +-
 .../reader/PulsarPartitionSplitReaderTest.java     |   1 -
 .../GenericRecordDeserializationSchemaTest.java    |   1 -
 .../PulsarDeserializationSchemaTest.java           |   1 -
 .../pulsar/table/PulsarChangelogTableITCase.java   |   6 -
 .../pulsar/table/PulsarTableFactoryTest.java       |   9 -
 .../connector/pulsar/table/PulsarTableITCase.java  |  24 --
 .../pulsar/table/PulsarTableOptionsTest.java       |   6 +-
 .../pulsar/table/UpsertPulsarTableITCase.java      |  12 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   |   2 -
 .../testutils/sink/PulsarSinkTestContext.java      |   1 -
 .../testutils/source/PulsarSourceTestContext.java  |   1 -
 45 files changed, 131 insertions(+), 951 deletions(-)

diff --git 
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
 
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 1638c6a..7a71d25 100644
--- 
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ 
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -9,16 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase 
does not satisfy: on
 * reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only 
one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
annotated with @TestEnv\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: 
only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and 
annotated with @TestEnv\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
deleted file mode 100644
index 5c8da6b..0000000
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.config;
-
-import 
org.apache.flink.connector.pulsar.common.handler.PulsarAdminInvocationHandler;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import java.lang.reflect.Proxy;
-
-/**
- * {@link org.apache.pulsar.client.admin.PulsarAdminBuilder} didn't expose all 
the configurations to
- * end user. We have to extend the default builder method for adding extra 
configurations.
- */
-public class PulsarAdminProxyBuilder extends PulsarAdminBuilderImpl {
-
-    private final PulsarConfiguration configuration;
-
-    public PulsarAdminProxyBuilder(PulsarConfiguration configuration) {
-        this.configuration = configuration;
-    }
-
-    /**
-     * This is used by the internal implementation of {@link 
AsyncHttpConnector} to set the max
-     * allowed number of request threads.
-     */
-    public void numIoThreads(int numIoThreads) {
-        conf.setNumIoThreads(numIoThreads);
-    }
-
-    /**
-     * Wrap the pulsar admin interface into a proxy instance which can retry 
the request and limit
-     * the request rate.
-     */
-    @Override
-    public PulsarAdmin build() throws PulsarClientException {
-        PulsarAdminInvocationHandler handler =
-                new PulsarAdminInvocationHandler(super.build(), configuration);
-        return (PulsarAdmin)
-                Proxy.newProxyInstance(
-                        PulsarAdmin.class.getClassLoader(),
-                        new Class[] {PulsarAdmin.class},
-                        handler);
-    }
-}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
index 7b45ef1..caaf05e 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.common.config;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -39,16 +38,13 @@ import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_MAX_IDLE_SECONDS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_DNS_LOOKUP_BIND_ADDRESS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
@@ -66,8 +62,6 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SOCKS5_PROXY_ADDRESS;
@@ -187,50 +181,8 @@ public final class PulsarClientFactory {
     }
 
     /**
-     * PulsarAdmin shares almost the same configuration with PulsarClient, but 
we separate this
-     * creating method for directly use it.
-     */
-    public static PulsarAdmin createAdmin(PulsarConfiguration configuration)
-            throws PulsarClientException {
-        PulsarAdminProxyBuilder builder = new 
PulsarAdminProxyBuilder(configuration);
-
-        // Create the authentication instance for the Pulsar client.
-        builder.authentication(createAuthentication(configuration));
-
-        configuration.useOption(PULSAR_ADMIN_URL, builder::serviceHttpUrl);
-        configuration.useOption(PULSAR_TLS_KEY_FILE_PATH, 
builder::tlsKeyFilePath);
-        configuration.useOption(PULSAR_TLS_CERTIFICATE_FILE_PATH, 
builder::tlsCertificateFilePath);
-        configuration.useOption(PULSAR_TLS_TRUST_CERTS_FILE_PATH, 
builder::tlsTrustCertsFilePath);
-        configuration.useOption(
-                PULSAR_TLS_ALLOW_INSECURE_CONNECTION, 
builder::allowTlsInsecureConnection);
-        configuration.useOption(
-                PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE, 
builder::enableTlsHostnameVerification);
-        configuration.useOption(PULSAR_USE_KEY_STORE_TLS, 
builder::useKeyStoreTls);
-        configuration.useOption(PULSAR_SSL_PROVIDER, builder::sslProvider);
-        configuration.useOption(PULSAR_TLS_KEY_STORE_TYPE, 
builder::tlsKeyStoreType);
-        configuration.useOption(PULSAR_TLS_KEY_STORE_PATH, 
builder::tlsKeyStorePath);
-        configuration.useOption(PULSAR_TLS_KEY_STORE_PASSWORD, 
builder::tlsKeyStorePassword);
-        configuration.useOption(PULSAR_TLS_TRUST_STORE_TYPE, 
builder::tlsTrustStoreType);
-        configuration.useOption(PULSAR_TLS_TRUST_STORE_PATH, 
builder::tlsTrustStorePath);
-        configuration.useOption(PULSAR_TLS_TRUST_STORE_PASSWORD, 
builder::tlsTrustStorePassword);
-        configuration.useOption(PULSAR_TLS_CIPHERS, TreeSet::new, 
builder::tlsCiphers);
-        configuration.useOption(PULSAR_TLS_PROTOCOLS, TreeSet::new, 
builder::tlsProtocols);
-        configuration.useOption(
-                PULSAR_CONNECT_TIMEOUT, v -> builder.connectionTimeout(v, 
MILLISECONDS));
-        configuration.useOption(PULSAR_READ_TIMEOUT, v -> 
builder.readTimeout(v, MILLISECONDS));
-        configuration.useOption(
-                PULSAR_REQUEST_TIMEOUT, v -> builder.requestTimeout(v, 
MILLISECONDS));
-        configuration.useOption(
-                PULSAR_AUTO_CERT_REFRESH_TIME, v -> 
builder.autoCertRefreshTime(v, MILLISECONDS));
-        configuration.useOption(PULSAR_NUM_IO_THREADS, builder::numIoThreads);
-
-        return builder.build();
-    }
-
-    /**
-     * Create the {@link Authentication} instance for both {@code 
PulsarClient} and {@code
-     * PulsarAdmin}. If the user didn't provide configuration, a {@link 
AuthenticationDisabled}
-     * instance would be returned.
+     * Create the {@link Authentication} instance for {@code PulsarClient}. If 
the user didn't
+     * provide configuration, a {@link AuthenticationDisabled} instance would 
be returned.
      *
      * <p>This method behavior is the same as the pulsar command line tools.
      */
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
index 9fdc1cf..c27e30a 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.description.Description;
 
 import org.apache.pulsar.client.api.ProxyProtocol;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.ADMIN_CONFIG_PREFIX;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
 
 /**
@@ -43,17 +41,11 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIE
  * table.
  */
 @PublicEvolving
-@ConfigGroups(
-        groups = {
-            @ConfigGroup(name = "PulsarClient", keyPrefix = 
CLIENT_CONFIG_PREFIX),
-            @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
-        })
+@ConfigGroups(groups = {@ConfigGroup(name = "PulsarClient", keyPrefix = 
CLIENT_CONFIG_PREFIX)})
 public final class PulsarOptions {
 
     // Pulsar client API config prefix.
     public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
-    // Pulsar admin API config prefix.
-    public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";
 
     private PulsarOptions() {
         // This is a constant class
@@ -582,75 +574,4 @@ public final class PulsarOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("Password of SOCKS5 proxy.");
-
-    
///////////////////////////////////////////////////////////////////////////////
-    //
-    // The configuration for PulsarAdmin part.
-    // All the configuration listed below should have the pulsar.admin prefix.
-    //
-    
///////////////////////////////////////////////////////////////////////////////
-
-    public static final ConfigOption<String> PULSAR_ADMIN_URL =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "adminUrl")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The Pulsar service HTTP URL for 
the admin endpoint. For example, %s, or %s for TLS.",
-                                            
code("http://my-broker.example.com:8080";),
-                                            
code("https://my-broker.example.com:8443";))
-                                    .build());
-
-    public static final ConfigOption<Integer> PULSAR_CONNECT_TIMEOUT =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout")
-                    .intType()
-                    .defaultValue(60000)
-                    .withDescription("The connection time out (in ms) for the 
PulsarAdmin client.");
-
-    public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout")
-                    .intType()
-                    .defaultValue(60000)
-                    .withDescription(
-                            "The server response read timeout (in ms) for the 
PulsarAdmin client for any request.");
-
-    public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout")
-                    .intType()
-                    .defaultValue(300000)
-                    .withDescription(
-                            "The server request timeout (in ms) for the 
PulsarAdmin client for any request.");
-
-    public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime")
-                    .intType()
-                    .defaultValue(300000)
-                    .withDescription(
-                            "The auto cert refresh time (in ms) if Pulsar 
admin supports TLS authentication.");
-
-    // These config options below are passing to PulsarAdminInvocationHandler.
-    // A wrapper for PulsarAdmin.
-
-    public static final ConfigOption<Integer> PULSAR_ADMIN_REQUEST_RETRIES =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestRetries")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription(
-                            "For PulsarAdmin request, it will retry until we 
get a success response,"
-                                    + " fail if we exhausted retry count.");
-
-    public static final ConfigOption<Long> PULSAR_ADMIN_REQUEST_WAIT_MILLIS =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestWaitMillis")
-                    .longType()
-                    .defaultValue(Duration.ofSeconds(3).toMillis())
-                    .withDescription(
-                            "For PulsarAdmin request, We will sleep the given 
time before retrying the failed request.");
-
-    public static final ConfigOption<Integer> PULSAR_ADMIN_REQUEST_RATES =
-            ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestRates")
-                    .intType()
-                    .defaultValue(5)
-                    .withDescription(
-                            "It will add ratelimit for PulsarAdmin metadata 
requests, stands for requests per second.");
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
deleted file mode 100644
index 14d5ad2..0000000
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.handler;
-
-import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import org.apache.pulsar.shade.com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
-import static 
org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
-/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate 
limit support. */
-public class PulsarAdminInvocationHandler implements InvocationHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
-
-    @SuppressWarnings("java:S3077")
-    private static volatile RateLimiter rateLimiter;
-
-    private final PulsarAdmin admin;
-    private final int retryTimes;
-    private final long waitMillis;
-    private final int requestRates;
-    private final Map<String, Object> handlers;
-
-    public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration 
configuration) {
-        this.admin = admin;
-        this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
-        this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
-        this.requestRates = configuration.get(PULSAR_ADMIN_REQUEST_RATES);
-        this.handlers = new ConcurrentHashMap<>();
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-        Class<?> returnType = method.getReturnType();
-
-        // No need to proxy the void return type.
-        // The non-interface type is not able to proxy.
-        if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
-            return method.invoke(admin, args);
-        }
-
-        String methodName = method.getName();
-        if (handlers.containsKey(methodName)) {
-            return handlers.get(methodName);
-        }
-
-        Object handler =
-                Proxy.newProxyInstance(
-                        Thread.currentThread().getContextClassLoader(),
-                        new Class[] {returnType},
-                        new RequestHandler(method.invoke(admin, args)));
-        this.handlers.put(methodName, handler);
-
-        return handler;
-    }
-
-    /** A proxy handler with retry support for all the admin request. */
-    @SuppressWarnings({"java:S1193", "java:S1181", "java:S3776", "java:S112"})
-    private class RequestHandler implements InvocationHandler {
-
-        private final Object handler;
-
-        public RequestHandler(Object handler) {
-            this.handler = handler;
-        }
-
-        @Override
-        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
-            return doInvoke(method, args, retryTimes);
-        }
-
-        private Object doInvoke(Method method, Object[] args, int 
remainingTimes) throws Throwable {
-            while (true) {
-                // Make sure the request is allowed in the given rates.
-                requestRateLimit(requestRates);
-
-                try {
-                    return method.invoke(handler, args);
-                } catch (InvocationTargetException e) {
-                    Throwable throwable = e.getTargetException();
-                    if (throwable instanceof NotFoundException) {
-                        // No need to retry on such exceptions.
-                        throw throwable;
-                    } else if (throwable instanceof PulsarAdminException) {
-                        remainingTimes--;
-                        LOG.warn("Request error in Admin API, remain times: 
{}", remainingTimes, e);
-                        if (remainingTimes == 0) {
-                            throw throwable;
-                        } else {
-                            // Sleep for the given times before executing the 
next query.
-                            sleepUninterruptibly(waitMillis, MILLISECONDS);
-                        }
-                    } else {
-                        throw throwable;
-                    }
-                }
-            }
-        }
-    }
-
-    /** Request a global ratelimit which limits the total admin API request 
rates. */
-    private static void requestRateLimit(int requestRates) {
-        if (rateLimiter == null) {
-            synchronized (PulsarAdminInvocationHandler.class) {
-                if (rateLimiter == null) {
-                    rateLimiter = RateLimiter.create(requestRates);
-                }
-            }
-        }
-
-        rateLimiter.acquire();
-    }
-}
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
index 1d4e92b..559da3a 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -52,7 +52,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <pre>{@code
  * PulsarSink<String> sink = PulsarSink.builder()
  *      .setServiceUrl(operator().serviceUrl())
- *      .setAdminUrl(operator().adminUrl())
  *      .setTopic(topic)
  *      .setSerializationSchema(Schema.STRING)
  *      .build();
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index e3e7b1b..ee6e7db 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -46,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -75,15 +74,14 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * <pre>{@code
  * PulsarSink<String> sink = PulsarSink.builder()
  *     .setServiceUrl(operator().serviceUrl())
- *     .setAdminUrl(operator().adminUrl())
  *     .setTopics(topic)
  *     .setSerializationSchema(Schema.STRING)
  *     .build();
  * }</pre>
  *
- * <p>The service url, admin url, and the record serializer are required 
fields that must be set. If
- * you don't set the topics, make sure you have provided a custom {@link 
TopicRouter}. Otherwise,
- * you must provide the topics to produce.
+ * <p>The service url and the record serializer are required fields that must 
be set. If you don't
+ * set the topics, make sure you have provided a custom {@link TopicRouter}. 
Otherwise, you must
+ * provide the topics to produce.
  *
  * <p>To specify the delivery guarantees of PulsarSink, one can call {@link
  * #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the 
delivery guarantee is {@link
@@ -93,7 +91,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <pre>{@code
  * PulsarSink<String> sink = PulsarSink.builder()
  *     .setServiceUrl(operator().serviceUrl())
- *     .setAdminUrl(operator().adminUrl())
  *     .setTopics(topic)
  *     .setSerializationSchema(Schema.STRING)
  *     .setDeliveryGuarantee(deliveryGuarantee)
@@ -126,9 +123,11 @@ public class PulsarSinkBuilder<IN> {
      *
      * @param adminUrl The url for the PulsarAdmin.
      * @return this PulsarSinkBuilder.
+     * @deprecated this method will return builder directly
      */
+    @Deprecated
     public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) {
-        return setConfig(PULSAR_ADMIN_URL, adminUrl);
+        return this;
     }
 
     /**
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 9d0a6c1..da03ff2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -31,7 +31,6 @@ import java.util.UUID;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
@@ -62,7 +61,6 @@ public final class PulsarSinkConfigUtils {
     public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR =
             PulsarConfigValidator.builder()
                     .requiredOption(PULSAR_SERVICE_URL)
-                    .requiredOption(PULSAR_ADMIN_URL)
                     .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
                     .build();
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
index 6799677..cbc7b00 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
 import java.util.Optional;
 
 /** This context provides information on the pulsar record target location. */
@@ -56,5 +54,5 @@ public interface PulsarSinkContext {
      *
      * @return Return {@link Optional#empty()} if the topic doesn't exist.
      */
-    Optional<TopicMetadata> topicMetadata(String topic) throws 
PulsarAdminException;
+    Optional<TopicMetadata> topicMetadata(String topic);
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
index 5df0026..8c827db 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
@@ -25,8 +25,6 @@ import 
org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
 import java.util.Optional;
 
 /** An implementation that would contain all the required context. */
@@ -71,7 +69,7 @@ public class PulsarSinkContextImpl implements 
PulsarSinkContext {
     }
 
     @Override
-    public Optional<TopicMetadata> topicMetadata(String topic) throws 
PulsarAdminException {
+    public Optional<TopicMetadata> topicMetadata(String topic) {
         return metadataListener.queryTopicMetadata(topic);
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
index 9745dc6..b324efe 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
@@ -25,13 +25,10 @@ import 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
@@ -52,7 +49,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.emptyList;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
@@ -72,7 +69,7 @@ public class MetadataListener implements Serializable, 
Closeable {
     private ImmutableList<TopicPartition> availablePartitions;
 
     // Dynamic fields.
-    private transient PulsarAdmin pulsarAdmin;
+    private transient PulsarClientImpl clientImpl;
     private transient Long topicMetadataRefreshInterval;
     private transient ProcessingTimeService timeService;
     private transient LoadingCache<String, Optional<Integer>> 
topicPartitionCache;
@@ -102,7 +99,7 @@ public class MetadataListener implements Serializable, 
Closeable {
     public void open(SinkConfiguration sinkConfiguration, 
ProcessingTimeService timeService)
             throws PulsarClientException {
         // Initialize listener properties.
-        this.pulsarAdmin = createAdmin(sinkConfiguration);
+        this.clientImpl = (PulsarClientImpl) createClient(sinkConfiguration);
         this.topicMetadataRefreshInterval = 
sinkConfiguration.getTopicMetadataRefreshInterval();
         this.timeService = timeService;
         this.topicPartitionCache =
@@ -113,25 +110,15 @@ public class MetadataListener implements Serializable, 
Closeable {
                                     @Override
                                     @ParametersAreNonnullByDefault
                                     public Optional<Integer> load(String topic)
-                                            throws PulsarAdminException {
-                                        try {
-                                            PartitionedTopicMetadata metadata =
-                                                    pulsarAdmin
-                                                            .topics()
-                                                            
.getPartitionedTopicMetadata(topic);
-                                            return 
Optional.of(metadata.partitions);
-                                        } catch (NotFoundException e) {
-                                            return Optional.empty();
-                                        }
+                                            throws ExecutionException, 
InterruptedException {
+                                        PartitionedTopicMetadata metadata =
+                                                
clientImpl.getPartitionedTopicMetadata(topic).get();
+                                        return 
Optional.of(metadata.partitions);
                                     }
                                 });
 
         // Initialize the topic metadata. Quit if fail to connect to Pulsar.
-        try {
-            updateTopicMetadata();
-        } catch (PulsarAdminException e) {
-            throw new FlinkRuntimeException(e);
-        }
+        updateTopicMetadata();
 
         // Register time service for update the topic metadata.
         if (topics.isEmpty()) {
@@ -156,7 +143,7 @@ public class MetadataListener implements Serializable, 
Closeable {
      *
      * @return Return {@link Optional#empty()} if the topic doesn't exist.
      */
-    public Optional<TopicMetadata> queryTopicMetadata(String topic) throws 
PulsarAdminException {
+    public Optional<TopicMetadata> queryTopicMetadata(String topic) {
         if (isPartition(topic)) {
             return Optional.of(new TopicMetadata(topic, NON_PARTITIONED));
         }
@@ -164,13 +151,7 @@ public class MetadataListener implements Serializable, 
Closeable {
         try {
             return topicPartitionCache.get(topic).map(size -> new 
TopicMetadata(topic, size));
         } catch (ExecutionException e) {
-            Optional<PulsarAdminException> optional =
-                    ExceptionUtils.findThrowable(e, 
PulsarAdminException.class);
-            if (optional.isPresent()) {
-                throw optional.get();
-            } else {
-                throw new FlinkRuntimeException(e);
-            }
+            throw new FlinkRuntimeException(e);
         }
     }
 
@@ -181,8 +162,8 @@ public class MetadataListener implements Serializable, 
Closeable {
 
     @Override
     public void close() throws IOException {
-        if (pulsarAdmin != null) {
-            pulsarAdmin.close();
+        if (clientImpl != null) {
+            clientImpl.close();
         }
     }
 
@@ -194,10 +175,10 @@ public class MetadataListener implements Serializable, 
Closeable {
     }
 
     private void triggerNextTopicMetadataUpdate() {
-        // Try to update the topic metadata, ignore the pulsar admin exception.
+        // Try to update the topic metadata.
         try {
             updateTopicMetadata();
-        } catch (PulsarAdminException e) {
+        } catch (FlinkRuntimeException e) {
             LOG.warn("", e);
         }
 
@@ -205,7 +186,7 @@ public class MetadataListener implements Serializable, 
Closeable {
         registerNextTopicMetadataUpdateTimer();
     }
 
-    private void updateTopicMetadata() throws PulsarAdminException {
+    private void updateTopicMetadata() throws FlinkRuntimeException {
         ImmutableList.Builder<TopicPartition> parititonsBuilder = 
ImmutableList.builder();
 
         for (String topic : topics) {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index c038f32..447867c 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -55,7 +55,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
  *     .builder()
  *     .setTopics(TOPIC1, TOPIC2)
  *     .setServiceUrl(getServiceUrl())
- *     .setAdminUrl(getAdminUrl())
  *     .setSubscriptionName("test")
  *     .setDeserializationSchema(new SimpleStringSchema())
  *     .setBounded(StopCursor::defaultStopCursor)
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index a31f185..687e9b8 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -59,7 +59,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -86,15 +85,14 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * PulsarSource<String> source = PulsarSource
  *     .builder()
  *     .setServiceUrl(PULSAR_BROKER_URL)
- *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
  *     .setSubscriptionName("flink-source-1")
  *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
  *     .setDeserializationSchema(new SimpleStringSchema())
  *     .build();
  * }</pre>
  *
- * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
- * are required fields that must be set.
+ * <p>The service url, subscription name, topics to consume, and the record 
deserializer are
+ * required fields that must be set.
  *
  * <p>To specify the starting position of PulsarSource, one can call {@link
  * #setStartCursor(StartCursor)}.
@@ -115,7 +113,6 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * PulsarSource<String> source = PulsarSource
  *     .builder()
  *     .setServiceUrl(PULSAR_BROKER_URL)
- *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
  *     .setSubscriptionName("flink-source-1")
  *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
  *     .setDeserializationSchema(new SimpleStringSchema())
@@ -152,9 +149,11 @@ public final class PulsarSourceBuilder<OUT> {
      *
      * @param adminUrl the url for the PulsarAdmin.
      * @return this PulsarSourceBuilder.
+     * @deprecated this method will return builder directly
      */
+    @Deprecated
     public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
-        return setConfig(PULSAR_ADMIN_URL, adminUrl);
+        return this;
     }
 
     /**
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 5f379d3..eba6175 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
@@ -84,7 +83,6 @@ public final class PulsarSourceConfigUtils {
     public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR =
             PulsarConfigValidator.builder()
                     .requiredOption(PULSAR_SERVICE_URL)
-                    .requiredOption(PULSAR_ADMIN_URL)
                     .requiredOption(PULSAR_SUBSCRIPTION_NAME)
                     .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
                     .build();
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 819f9cd..57bb72b 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -33,8 +33,6 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
@@ -47,7 +45,6 @@ import java.util.List;
 import java.util.Set;
 
 import static java.util.Collections.singletonList;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner.createAssigner;
@@ -60,7 +57,6 @@ public class PulsarSourceEnumerator
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceEnumerator.class);
 
     private final PulsarClient pulsarClient;
-    private final PulsarAdmin pulsarAdmin;
     private final PulsarSubscriber subscriber;
     private final StartCursor startCursor;
     private final RangeGenerator rangeGenerator;
@@ -97,7 +93,6 @@ public class PulsarSourceEnumerator
             PulsarSourceEnumState enumState)
             throws PulsarClientException {
         this.pulsarClient = createClient(sourceConfiguration);
-        this.pulsarAdmin = createAdmin(sourceConfiguration);
         this.subscriber = subscriber;
         this.startCursor = startCursor;
         this.rangeGenerator = rangeGenerator;
@@ -109,7 +104,7 @@ public class PulsarSourceEnumerator
 
     @Override
     public void start() {
-        subscriber.open(pulsarClient, pulsarAdmin);
+        subscriber.open(pulsarClient);
         rangeGenerator.open(sourceConfiguration);
 
         // Expose the split assignment metrics if Flink has supported.
@@ -178,9 +173,6 @@ public class PulsarSourceEnumerator
         if (pulsarClient != null) {
             pulsarClient.close();
         }
-        if (pulsarAdmin != null) {
-            pulsarAdmin.close();
-        }
     }
 
     // ----------------- private methods -------------------
@@ -228,12 +220,8 @@ public class PulsarSourceEnumerator
                     startCursor.position(partition.getTopic(), 
partition.getPartitionId());
 
             try {
-                if (sourceConfiguration.isResetSubscriptionCursor()) {
-                    position.seekPosition(pulsarAdmin, topic, 
subscriptionName);
-                } else {
-                    position.createInitialPosition(pulsarAdmin, topic, 
subscriptionName);
-                }
-            } catch (PulsarAdminException e) {
+                position.setupSubPosition(pulsarClient, topic, 
subscriptionName);
+            } catch (PulsarClientException e) {
                 throw new FlinkRuntimeException(e);
             }
         }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index 8d8ee2c..72fc614 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -21,15 +21,18 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.cursor;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import 
org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 
 import java.io.Serializable;
-import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -78,51 +81,43 @@ public final class CursorPosition implements Serializable {
 
     /** This method is used to create the initial position in {@link 
PulsarSourceEnumerator}. */
     @Internal
-    public boolean createInitialPosition(
-            PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
-            throws PulsarAdminException {
-        List<String> subscriptions = 
pulsarAdmin.topics().getSubscriptions(topicName);
-
-        if (!subscriptions.contains(subscriptionName)) {
-            pulsarAdmin
-                    .topics()
-                    .createSubscription(topicName, subscriptionName, 
MessageId.earliest);
-
+    public void setupSubPosition(PulsarClient client, String topicName, String 
subscriptionName)
+            throws PulsarClientException {
+        try (Consumer<GenericRecord> consumer =
+                client.newConsumer(new AutoConsumeSchema())
+                        .topic(topicName)
+                        .subscriptionName(subscriptionName)
+                        .subscribe()) {
             // Reset cursor to desired position.
-            MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
-            pulsarAdmin
-                    .topics()
-                    .resetCursor(topicName, subscriptionName, initialPosition, 
!include);
-
-            return true;
+            if (type == Type.TIMESTAMP) {
+                consumer.seek(getActualTimestamp(this.timestamp));
+            } else if (messageId instanceof ChunkMessageIdImpl) {
+                MessageIdAdv msgId = ((ChunkMessageIdImpl) 
messageId).getFirstChunkMessageId();
+                consumer.seek(getActualMessageId(msgId));
+            } else {
+                consumer.seek(getActualMessageId((MessageIdAdv) messageId));
+            }
         }
-
-        return false;
     }
 
-    /**
-     * This method is used to reset the consuming position in {@link 
PulsarPartitionSplitReader}.
-     */
-    @Internal
-    public void seekPosition(PulsarAdmin pulsarAdmin, String topicName, String 
subscriptionName)
-            throws PulsarAdminException {
-        if (!createInitialPosition(pulsarAdmin, topicName, subscriptionName)) {
-            // Reset cursor to desired position.
-            MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
-            pulsarAdmin
-                    .topics()
-                    .resetCursor(topicName, subscriptionName, initialPosition, 
!include);
+    private MessageId getActualMessageId(MessageIdAdv messageIdImpl) {
+        if (include) {
+            return messageIdImpl;
+        } else {
+            // if the (ledgerId, entryId + 1) is not valid
+            // pulsar broker will automatically set the cursor to the next 
valid message
+            return new MessageIdImpl(
+                    messageIdImpl.getLedgerId(),
+                    messageIdImpl.getEntryId() + 1,
+                    messageIdImpl.getPartitionIndex());
         }
     }
 
-    private MessageId getMessageId(PulsarAdmin pulsarAdmin, String topicName)
-            throws PulsarAdminException {
-        if (type == Type.TIMESTAMP) {
-            return pulsarAdmin.topics().getMessageIdByTimestamp(topicName, 
timestamp);
-        } else if (messageId instanceof ChunkMessageIdImpl) {
-            return ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId();
+    private long getActualTimestamp(long timestamp) {
+        if (include) {
+            return timestamp;
         } else {
-            return messageId;
+            return timestamp + 1;
         }
     }
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index 1af875e..3ba4ad2 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -26,9 +26,9 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStop
 import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.PublishTimestampStopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 
 import java.io.Serializable;
 
@@ -42,7 +42,7 @@ import java.io.Serializable;
 public interface StopCursor extends Serializable {
 
     /** The open method for the cursor initializer. This method could be 
executed multiple times. */
-    default void open(PulsarAdmin admin, TopicPartition partition) throws 
Exception {}
+    default void open(PulsarClient client, TopicPartition partition) throws 
Exception {}
 
     /** Determine whether to pause consumption on the current message by the 
returned enum. */
     StopCondition shouldStop(Message<?> message);
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index b180262..fb0a916 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -36,7 +36,7 @@ public class MessageIdStartCursor implements StartCursor {
     /**
      * The default {@code inclusive} behavior should be controlled in {@link
      * ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and 
doesn't support this
-     * feature currently. We have to use admin API to reset the cursor instead.
+     * feature currently. So we have to implement this feature by ourselves.
      *
      * @param messageId The message id for start position.
      * @param inclusive Whether we include the start message id in the 
consuming result. This works
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index def12b1..4c86977 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -21,10 +21,13 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 
 import java.util.Objects;
 
@@ -36,6 +39,8 @@ import java.util.Objects;
 public class LatestMessageStopCursor implements StopCursor {
     private static final long serialVersionUID = 1702059838323965723L;
 
+    private static final String SUBSCRIPTION_NAME = 
LatestMessageStopCursor.class.getSimpleName();
+
     private MessageId messageId;
     private final boolean inclusive;
 
@@ -50,10 +55,22 @@ public class LatestMessageStopCursor implements StopCursor {
     }
 
     @Override
-    public void open(PulsarAdmin admin, TopicPartition partition) throws 
PulsarAdminException {
+    public void open(PulsarClient client, TopicPartition partition) throws 
PulsarClientException {
         if (messageId == null) {
-            String topic = partition.getFullTopicName();
-            this.messageId = admin.topics().getLastMessageId(topic);
+            Consumer<GenericRecord> consumer = null;
+            try {
+                consumer =
+                        client.newConsumer(new AutoConsumeSchema())
+                                .topic(partition.getFullTopicName())
+                                .subscriptionName(SUBSCRIPTION_NAME)
+                                .subscribe();
+                this.messageId = consumer.getLastMessageId();
+            } finally {
+                if (consumer != null) {
+                    consumer.unsubscribe();
+                    consumer.close();
+                }
+            }
         }
     }
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
index 653a9ec..a7b08b7 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.Topic
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 
@@ -63,9 +62,8 @@ public interface PulsarSubscriber extends Serializable {
      * Initialize the topic subscriber.
      *
      * @param client The client interface for querying the topics by regex 
pattern.
-     * @param admin The admin interface used to retrieve subscribed topic 
partitions.
      */
-    void open(PulsarClient client, PulsarAdmin admin);
+    void open(PulsarClient client);
 
     // ----------------- factory methods --------------
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
index 28ee9d8..30a24df 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
@@ -24,19 +24,19 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
-/** PulsarSubscriber abstract class to simplify Pulsar admin related 
operations. */
+/** PulsarSubscriber abstract class to simplify Pulsar metadata related 
operations. */
 public abstract class BasePulsarSubscriber implements PulsarSubscriber {
     private static final long serialVersionUID = 2053021503331058888L;
 
@@ -45,33 +45,24 @@ public abstract class BasePulsarSubscriber implements 
PulsarSubscriber {
     private static final Set<String> NON_PARTITIONED_TOPICS = 
ConcurrentHashMap.newKeySet();
 
     protected transient PulsarClient client;
-    protected transient PulsarAdmin admin;
 
-    protected TopicMetadata queryTopicMetadata(String topic) throws 
PulsarAdminException {
+    protected TopicMetadata queryTopicMetadata(String topic)
+            throws ExecutionException, InterruptedException {
         if (NON_PARTITIONED_TOPICS.contains(topic)) {
             return new TopicMetadata(topic, NON_PARTITIONED);
         }
 
-        try {
-            PartitionedTopicMetadata metadata = 
admin.topics().getPartitionedTopicMetadata(topic);
-            if (metadata.partitions == NON_PARTITIONED) {
-                NON_PARTITIONED_TOPICS.add(topic);
-            }
-            return new TopicMetadata(topic, metadata.partitions);
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == 404) {
-                // Return null for skipping the topic metadata query.
-                return null;
-            } else {
-                // This method would cause failure for subscribers.
-                throw e;
-            }
+        PulsarClientImpl clientImpl = (PulsarClientImpl) client;
+        PartitionedTopicMetadata metadata = 
clientImpl.getPartitionedTopicMetadata(topic).get();
+        if (metadata.partitions == NON_PARTITIONED) {
+            NON_PARTITIONED_TOPICS.add(topic);
         }
+        return new TopicMetadata(topic, metadata.partitions);
     }
 
     protected Set<TopicPartition> createTopicPartitions(
             Set<String> topics, RangeGenerator generator, int parallelism)
-            throws PulsarAdminException {
+            throws ExecutionException, InterruptedException {
         Set<TopicPartition> results = new HashSet<>();
 
         for (String topic : topics) {
@@ -94,8 +85,7 @@ public abstract class BasePulsarSubscriber implements 
PulsarSubscriber {
     }
 
     @Override
-    public void open(PulsarClient client, PulsarAdmin admin) {
+    public void open(PulsarClient client) {
         this.client = client;
-        this.admin = admin;
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
index b1923df..920e831 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
@@ -23,12 +23,12 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
 
@@ -54,7 +54,8 @@ public class TopicListSubscriber extends BasePulsarSubscriber 
{
 
     @Override
     public Set<TopicPartition> getSubscribedTopicPartitions(
-            RangeGenerator generator, int parallelism) throws 
PulsarAdminException {
+            RangeGenerator generator, int parallelism)
+            throws ExecutionException, InterruptedException {
 
         // Query topics from Pulsar.
         Set<TopicPartition> results = createTopicPartitions(fullTopicNames, 
generator, parallelism);
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 4f4403f..38a7adb 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -38,8 +38,6 @@ import 
org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -94,7 +92,6 @@ public class PulsarPartitionSplitReader
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
 
     private final PulsarClient pulsarClient;
-    private final PulsarAdmin pulsarAdmin;
     private final SourceConfiguration sourceConfiguration;
     private final Schema<byte[]> schema;
     private final PulsarCrypto pulsarCrypto;
@@ -105,13 +102,11 @@ public class PulsarPartitionSplitReader
 
     public PulsarPartitionSplitReader(
             PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
             Schema<byte[]> schema,
             PulsarCrypto pulsarCrypto,
             SourceReaderMetricGroup metricGroup) {
         this.pulsarClient = pulsarClient;
-        this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
         this.schema = schema;
         this.pulsarCrypto = pulsarCrypto;
@@ -192,7 +187,7 @@ public class PulsarPartitionSplitReader
 
         // Open stop cursor.
         try {
-            registeredSplit.open(pulsarAdmin);
+            registeredSplit.open(pulsarClient);
         } catch (Exception e) {
             throw new FlinkRuntimeException(e);
         }
@@ -215,11 +210,8 @@ public class PulsarPartitionSplitReader
                 String topicName = 
registeredSplit.getPartition().getFullTopicName();
                 String subscriptionName = 
sourceConfiguration.getSubscriptionName();
 
-                // Remove Consumer.seek() here for waiting for 
pulsar-client-all 2.12.0
-                // See https://github.com/apache/pulsar/issues/16757 for more 
details.
-
-                cursorPosition.seekPosition(pulsarAdmin, topicName, 
subscriptionName);
-            } catch (PulsarAdminException e) {
+                cursorPosition.setupSubPosition(pulsarClient, topicName, 
subscriptionName);
+            } catch (PulsarClientException e) {
                 if (sourceConfiguration.getVerifyInitialOffsets() == 
FAIL_ON_MISMATCH) {
                     throw new IllegalArgumentException(e);
                 } else {
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
index e5243a8..014d303 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.util.FlinkRuntimeException;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -62,7 +61,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
 
 /**
@@ -79,7 +77,6 @@ public class PulsarSourceReader<OUT>
 
     private final SourceConfiguration sourceConfiguration;
     private final PulsarClient pulsarClient;
-    private final PulsarAdmin pulsarAdmin;
     @VisibleForTesting final SortedMap<Long, Map<TopicPartition, MessageId>> 
cursorsToCommit;
     private final ConcurrentMap<TopicPartition, MessageId> 
cursorsOfFinishedSplits;
     private final AtomicReference<Throwable> cursorCommitThrowable;
@@ -92,7 +89,6 @@ public class PulsarSourceReader<OUT>
             PulsarDeserializationSchema<OUT> deserializationSchema,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin,
             SourceReaderContext context) {
         super(
                 elementsQueue,
@@ -103,7 +99,6 @@ public class PulsarSourceReader<OUT>
 
         this.sourceConfiguration = sourceConfiguration;
         this.pulsarClient = pulsarClient;
-        this.pulsarAdmin = pulsarAdmin;
 
         this.cursorsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
@@ -221,7 +216,6 @@ public class PulsarSourceReader<OUT>
 
         // Close shared pulsar resources.
         pulsarClient.shutdown();
-        pulsarAdmin.close();
     }
 
     // ----------------- helper methods --------------
@@ -264,7 +258,6 @@ public class PulsarSourceReader<OUT>
                 new FutureCompletingBlockingQueue<>(queueCapacity);
 
         PulsarClient pulsarClient = createClient(sourceConfiguration);
-        PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
 
         // Initialize the deserialization schema before creating the pulsar 
reader.
         PulsarDeserializationSchemaInitializationContext initializationContext 
=
@@ -287,7 +280,6 @@ public class PulsarSourceReader<OUT>
                 () ->
                         new PulsarPartitionSplitReader(
                                 pulsarClient,
-                                pulsarAdmin,
                                 sourceConfiguration,
                                 schema,
                                 pulsarCrypto,
@@ -303,7 +295,6 @@ public class PulsarSourceReader<OUT>
                 deserializationSchema,
                 sourceConfiguration,
                 pulsarClient,
-                pulsarAdmin,
                 readerContext);
     }
 }
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 3044e9c..2e21fa8 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -24,8 +24,8 @@ import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 
 import javax.annotation.Nullable;
@@ -94,8 +94,8 @@ public class PulsarPartitionSplit implements SourceSplit, 
Serializable {
     }
 
     /** Open stop cursor. */
-    public void open(PulsarAdmin admin) throws Exception {
-        stopCursor.open(admin, partition);
+    public void open(PulsarClient client) throws Exception {
+        stopCursor.open(client, partition);
     }
 
     @Override
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
index b867e18..ec64ec8 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
@@ -53,7 +53,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
@@ -70,7 +69,6 @@ import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.get
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRoutingMode;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
-import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.EXPLICIT;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
@@ -121,7 +119,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
         helper.validateExcept(
                 PulsarOptions.CLIENT_CONFIG_PREFIX,
-                PulsarOptions.ADMIN_CONFIG_PREFIX,
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -140,7 +137,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
         // Forward source configs
         final Properties properties = getPulsarProperties(tableOptions);
-        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
         properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
         // Set random subscriptionName if not provided
         properties.setProperty(
@@ -194,7 +190,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
         helper.validateExcept(
                 PulsarOptions.CLIENT_CONFIG_PREFIX,
-                PulsarOptions.ADMIN_CONFIG_PREFIX,
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -215,7 +210,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
         // Forward sink configs
         final Properties properties = getPulsarProperties(tableOptions);
-        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
         properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
 
         // Retrieve physical DataType (not including computed or metadata 
fields)
@@ -254,7 +248,7 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
-        return Stream.of(TOPICS, ADMIN_URL, 
SERVICE_URL).collect(Collectors.toSet());
+        return Stream.of(TOPICS, SERVICE_URL).collect(Collectors.toSet());
     }
 
     @Override
@@ -284,7 +278,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     public Set<ConfigOption<?>> forwardOptions() {
         return Stream.of(
                         TOPICS,
-                        ADMIN_URL,
                         SERVICE_URL,
                         SOURCE_SUBSCRIPTION_TYPE,
                         SOURCE_SUBSCRIPTION_NAME,
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
index 702f130..3db2bd4 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
@@ -214,27 +214,6 @@ public final class PulsarTableOptions {
     // Pulsar Options
     // 
--------------------------------------------------------------------------------------------
 
-    /**
-     * Exactly same as {@link
-     * 
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_ADMIN_URL}.
 Copied here
-     * because it is a required config option and should not be included in 
the {@link
-     * 
org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)}
 method.
-     *
-     * <p>By default all {@link 
org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
-     * included in the validateExcept() method./p>
-     */
-    public static final ConfigOption<String> ADMIN_URL =
-            ConfigOptions.key("admin-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The Pulsar service HTTP URL for 
the admin endpoint. For example, %s, or %s for TLS.",
-                                            
code("http://my-broker.example.com:8080";),
-                                            
code("https://my-broker.example.com:8443";))
-                                    .build());
-
     /**
      * Exactly same as {@link
      * 
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_SERVICE_URL}.
 Copied
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
index a972de0..7a769d0 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
@@ -55,7 +55,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
@@ -71,7 +70,6 @@ import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.get
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRouter;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
-import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.EXPLICIT;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
@@ -120,7 +118,6 @@ public class UpsertPulsarTableFactory
         // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
         helper.validateExcept(
                 PulsarOptions.CLIENT_CONFIG_PREFIX,
-                PulsarOptions.ADMIN_CONFIG_PREFIX,
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -134,7 +131,6 @@ public class UpsertPulsarTableFactory
 
         // Forward source configs
         final Properties properties = getPulsarProperties(tableOptions);
-        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
         properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
         // Set random subscriptionName if not provided
         properties.setProperty(
@@ -196,7 +192,6 @@ public class UpsertPulsarTableFactory
         // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
         helper.validateExcept(
                 PulsarOptions.CLIENT_CONFIG_PREFIX,
-                PulsarOptions.ADMIN_CONFIG_PREFIX,
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -211,7 +206,6 @@ public class UpsertPulsarTableFactory
 
         // Forward sink configs
         final Properties properties = getPulsarProperties(tableOptions);
-        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
         properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
 
         // Retrieve physical DataType (not including computed or metadata 
fields)
@@ -263,7 +257,7 @@ public class UpsertPulsarTableFactory
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
-        return Stream.of(TOPICS, ADMIN_URL, 
SERVICE_URL).collect(Collectors.toSet());
+        return Stream.of(TOPICS, SERVICE_URL).collect(Collectors.toSet());
     }
 
     @Override
@@ -293,7 +287,6 @@ public class UpsertPulsarTableFactory
     public Set<ConfigOption<?>> forwardOptions() {
         return Stream.of(
                         TOPICS,
-                        ADMIN_URL,
                         SERVICE_URL,
                         SOURCE_SUBSCRIPTION_NAME,
                         SOURCE_START_FROM_MESSAGE_ID,
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
deleted file mode 100644
index 3c76edb..0000000
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.handler;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
-
-import org.apache.pulsar.client.admin.Bookies;
-import org.apache.pulsar.client.admin.BrokerStats;
-import org.apache.pulsar.client.admin.Brokers;
-import org.apache.pulsar.client.admin.Clusters;
-import org.apache.pulsar.client.admin.Functions;
-import org.apache.pulsar.client.admin.Lookup;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.NonPersistentTopics;
-import org.apache.pulsar.client.admin.Packages;
-import org.apache.pulsar.client.admin.Properties;
-import org.apache.pulsar.client.admin.ProxyStats;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import org.apache.pulsar.client.admin.ResourceGroups;
-import org.apache.pulsar.client.admin.ResourceQuotas;
-import org.apache.pulsar.client.admin.Schemas;
-import org.apache.pulsar.client.admin.Sink;
-import org.apache.pulsar.client.admin.Sinks;
-import org.apache.pulsar.client.admin.Source;
-import org.apache.pulsar.client.admin.Sources;
-import org.apache.pulsar.client.admin.Tenants;
-import org.apache.pulsar.client.admin.TopicPolicies;
-import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.admin.Transactions;
-import org.apache.pulsar.client.admin.Worker;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Unit test of {@link PulsarAdminInvocationHandler}. */
-class PulsarAdminInvocationHandlerTest {
-
-    @Test
-    void retryWithFixedTimesOnRequestFailure() {
-        int retryTimes = 5 + ThreadLocalRandom.current().nextInt(5);
-        String errorMsg = "Always failed with reason: " + 
randomAlphanumeric(10);
-
-        PulsarAdminTestImpl admin = new PulsarAdminTestImpl(errorMsg);
-
-        Configuration configuration = new Configuration();
-        configuration.set(PULSAR_ADMIN_REQUEST_RETRIES, retryTimes);
-        configuration.set(PULSAR_ADMIN_REQUEST_WAIT_MILLIS, 50L);
-        configuration.set(PULSAR_ADMIN_REQUEST_RATES, 1000);
-        SinkConfiguration sinkConfiguration = new 
SinkConfiguration(configuration);
-
-        PulsarAdminInvocationHandler handler =
-                new PulsarAdminInvocationHandler(admin, sinkConfiguration);
-
-        PulsarAdmin proxyAdmin =
-                (PulsarAdmin)
-                        Proxy.newProxyInstance(
-                                PulsarAdmin.class.getClassLoader(),
-                                new Class[] {PulsarAdmin.class},
-                                handler);
-
-        assertThatThrownBy(() -> 
proxyAdmin.lookups().lookupPartitionedTopic("aa"))
-                .isInstanceOf(PulsarAdminException.class)
-                .hasMessage(errorMsg);
-        assertThat(admin.lookup.callingTimes()).isEqualTo(retryTimes);
-    }
-
-    @Test
-    void didNotRetryOnNotFoundException() {
-        PulsarAdminTestImpl admin = new PulsarAdminTestImpl("not found");
-
-        PulsarAdminInvocationHandler handler =
-                new PulsarAdminInvocationHandler(admin, new 
SinkConfiguration(new Configuration()));
-
-        PulsarAdmin proxyAdmin =
-                (PulsarAdmin)
-                        Proxy.newProxyInstance(
-                                PulsarAdmin.class.getClassLoader(),
-                                new Class[] {PulsarAdmin.class},
-                                handler);
-
-        assertThatThrownBy(() -> proxyAdmin.lookups().lookupTopic("some"))
-                .isInstanceOf(NotFoundException.class)
-                .hasMessage("not found");
-        assertThat(admin.lookup.callingTimes()).isEqualTo(1);
-    }
-
-    /** Test implementation for PulsarAdmin. */
-    private static final class PulsarAdminTestImpl implements PulsarAdmin {
-
-        private final LookupTestImpl lookup;
-
-        private PulsarAdminTestImpl(String errorMsg) {
-            this.lookup = new LookupTestImpl(errorMsg);
-        }
-
-        @Override
-        public Clusters clusters() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Brokers brokers() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Tenants tenants() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public ResourceGroups resourcegroups() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Properties properties() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Namespaces namespaces() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Topics topics() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public TopicPolicies topicPolicies() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public TopicPolicies topicPolicies(boolean isGlobal) {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Bookies bookies() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public NonPersistentTopics nonPersistentTopics() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public ResourceQuotas resourceQuotas() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Lookup lookups() {
-            lookup.resetTimes();
-            return lookup;
-        }
-
-        @Override
-        public Functions functions() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Source source() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Sources sources() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Sink sink() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Sinks sinks() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Worker worker() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public BrokerStats brokerStats() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public ProxyStats proxyStats() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public String getServiceUrl() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Schemas schemas() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Packages packages() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Transactions transactions() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public void close() {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-    }
-
-    /** Test implementation for Lookup. */
-    private static final class LookupTestImpl implements Lookup {
-
-        private final String errorMsg;
-        private int times;
-
-        private LookupTestImpl(String errorMsg) {
-            this.errorMsg = errorMsg;
-        }
-
-        @Override
-        public String lookupTopic(String topic) throws PulsarAdminException {
-            times++;
-            throw new NotFoundException(
-                    new IllegalArgumentException("not found"), "not found", 
404);
-        }
-
-        @Override
-        public CompletableFuture<String> lookupTopicAsync(String topic) {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public Map<String, String> lookupPartitionedTopic(String topic)
-                throws PulsarAdminException {
-            times++;
-            throw new PulsarAdminException(errorMsg);
-        }
-
-        @Override
-        public CompletableFuture<Map<String, String>> 
lookupPartitionedTopicAsync(String topic) {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public String getBundleRange(String topic) {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        @Override
-        public CompletableFuture<String> getBundleRangeAsync(String topic) {
-            throw new UnsupportedOperationException("We didn't support this 
method in test.");
-        }
-
-        public int callingTimes() {
-            return times;
-        }
-
-        public void resetTimes() {
-            this.times = 0;
-        }
-    }
-}
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
index e7bbd78..af3e6b7 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
@@ -97,16 +97,13 @@ class PulsarSinkBuilderTest {
     }
 
     @Test
-    void serviceUrlAndAdminUrlMustBeProvided() {
+    void serviceUrlMustBeProvided() {
         PulsarSinkBuilder<String> builder = PulsarSink.builder();
         builder.setSerializationSchema(new SimpleStringSchema());
         builder.setTopics("a", "b");
         
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
 
         builder.setServiceUrl("pulsar://127.0.0.1:8888");
-        
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
-
-        builder.setAdminUrl("http://127.0.0.1:9999";);
         assertThatCode(builder::build).doesNotThrowAnyException();
     }
 }
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 6998929..18d035b 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -131,7 +131,6 @@ class PulsarSinkITCase {
             PulsarSink<String> sink =
                     PulsarSink.builder()
                             .setServiceUrl(operator().serviceUrl())
-                            .setAdminUrl(operator().adminUrl())
                             .setDeliveryGuarantee(guarantee)
                             .setTopics(topic)
                             .setSerializationSchema(new SimpleStringSchema())
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
index 043ae27..f8c517c 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
@@ -31,9 +31,6 @@ class PulsarSourceBuilderTest {
         PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
         fillRequiredFields(builder);
 
-        assertThatThrownBy(() -> builder.setAdminUrl("admin-url2"))
-                .isInstanceOf(IllegalArgumentException.class);
-
         assertThatThrownBy(() -> builder.setServiceUrl("service-url2"))
                 .isInstanceOf(IllegalArgumentException.class);
 
@@ -50,7 +47,6 @@ class PulsarSourceBuilderTest {
     }
 
     private void fillRequiredFields(PulsarSourceBuilder<String> builder) {
-        builder.setAdminUrl("admin-url");
         builder.setServiceUrl("service-url");
         builder.setSubscriptionName("subscription-name");
         builder.setTopics("topic");
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index 7cd685d..fc49073 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -56,7 +56,6 @@ class StopCursorTest extends PulsarTestSuiteBase {
         PulsarPartitionSplitReader splitReader =
                 new PulsarPartitionSplitReader(
                         operator().client(),
-                        operator().admin(),
                         sourceConfig(),
                         Schema.BYTES,
                         PulsarCrypto.disabled(),
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
index 22edb71..27803ac 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
@@ -101,7 +101,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
     @Test
     void topicListSubscriber() throws Exception {
         PulsarSubscriber subscriber = 
getTopicListSubscriber(Arrays.asList(topic1, topic2));
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -120,7 +120,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
         String partition = topicNameWithPartition(topic1, 2);
 
         PulsarSubscriber subscriber = 
getTopicListSubscriber(singletonList(partition));
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> partitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -132,7 +132,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
     @Test
     void subscribeNonPartitionedTopicList() throws Exception {
         PulsarSubscriber subscriber = 
getTopicListSubscriber(singletonList(topic4));
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> partitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -147,7 +147,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
                 getTopicPatternSubscriber(
                         
Pattern.compile("flink/regex/pulsar-subscriber-non-partitioned-topic-.*"),
                         AllTopics);
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -165,7 +165,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
         PulsarSubscriber subscriber =
                 getTopicPatternSubscriber(
                         
Pattern.compile("flink/regex/pulsar-subscriber-topic-.*"), AllTopics);
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -185,7 +185,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
         PulsarSubscriber subscriber =
                 getTopicPatternSubscriber(
                         Pattern.compile("pulsar-subscriber-simple-topic-.*"), 
PersistentOnly);
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
@@ -205,7 +205,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
         PulsarSubscriber subscriber =
                 getTopicPatternSubscriber(
                         Pattern.compile("pulsar-subscriber-simple-topic-.*"), 
NonPersistentOnly);
-        subscriber.open(operator().client(), operator().admin());
+        subscriber.open(operator().client());
 
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(new 
FullRangeGenerator(), NUM_PARALLELISM);
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index ab0bad8..fa06190 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -267,7 +267,6 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
     private PulsarPartitionSplitReader splitReader() {
         return new PulsarPartitionSplitReader(
                 operator().client(),
-                operator().admin(),
                 sourceConfig(),
                 new BytesSchema(new PulsarSchema<>(STRING)),
                 PulsarCrypto.disabled(),
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
index 5c80d1c..fd4b4c0 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
@@ -82,7 +82,6 @@ class GenericRecordDeserializationSchemaTest extends 
PulsarTestSuiteBase {
                 PulsarSource.builder()
                         .setDeserializationSchema(new 
AvroGenericRecordDeserializer())
                         .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
                         .setTopics(topic)
                         .setStartCursor(StartCursor.earliest())
                         .setBoundedStopCursor(StopCursor.latest())
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index 339ec99..de0a729 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -297,7 +297,6 @@ class PulsarDeserializationSchemaTest extends 
PulsarTestSuiteBase {
         return PulsarSource.builder()
                 .setDeserializationSchema(deserializationSchema)
                 .setServiceUrl(operator().serviceUrl())
-                .setAdminUrl(operator().adminUrl())
                 .setTopics(topicName)
                 .setSubscriptionName(topicName + "-subscription")
                 .setBoundedStopCursor(StopCursor.latest())
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
index 59d8fbd..da22ee8 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
@@ -80,14 +80,12 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
                                 + " 'connector' = '%s',"
                                 + " 'topics' = '%s',"
                                 + " 'service-url' = '%s',\n"
-                                + " 'admin-url' = '%s',\n"
                                 + " 'value.format' = '%s',"
                                 + " 'pulsar.source.fetchOneMessageTime' = 
'100'"
                                 + ")",
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         DebeziumJsonFormatFactory.IDENTIFIER);
         String sinkDDL =
                 "CREATE TABLE debezium_sink ("
@@ -216,14 +214,12 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
                                 + " 'connector' = '%s',"
                                 + " 'topics' = '%s',"
                                 + " 'service-url' = '%s',\n"
-                                + " 'admin-url' = '%s',\n"
                                 + " 'value.format' = 'canal-json',"
                                 + " 'pulsar.source.fetchOneMessageTime' = 
'100'"
                                 + ")",
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         CanalJsonFormatFactory.IDENTIFIER);
         String sinkDDL =
                 "CREATE TABLE canal_sink ("
@@ -356,14 +352,12 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
                                 + " 'connector' = '%s',"
                                 + " 'topics' = '%s',"
                                 + " 'service-url' = '%s',\n"
-                                + " 'admin-url' = '%s',\n"
                                 + " 'value.format' = '%s',"
                                 + " 'pulsar.source.fetchOneMessageTime' = 
'100'"
                                 + ")",
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         MaxwellJsonFormatFactory.IDENTIFIER);
         String sinkDDL =
                 "CREATE TABLE maxwell_sink ("
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
index 7625c63..10c46b5 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
@@ -59,11 +59,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableFactory.UPSERT_DISABLED;
-import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
 import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SERVICE_URL;
@@ -82,7 +80,6 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class PulsarTableFactoryTest {
     private static final String TEST_TOPIC = "test-topic";
-    private static final String TEST_ADMIN_URL = 
"http://my-broker.example.com:8080";;
     private static final String TEST_SERVICE_URL = "pulsar://localhost:6650";
     private static final String TEST_SUBSCRIPTION_NAME = 
"default-subscription";
 
@@ -107,12 +104,10 @@ public class PulsarTableFactoryTest {
                     "%s.%s", TestFormatFactory.IDENTIFIER, 
TestFormatFactory.FAIL_ON_MISSING.key());
 
     static {
-        EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(PULSAR_ADMIN_URL.key(), 
TEST_ADMIN_URL);
         
EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(PULSAR_SERVICE_URL.key(), 
TEST_SERVICE_URL);
         EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(
                 PULSAR_SUBSCRIPTION_NAME.key(), TEST_SUBSCRIPTION_NAME);
 
-        EXPECTED_PULSAR_SINK_PROPERTIES.setProperty(PULSAR_ADMIN_URL.key(), 
TEST_ADMIN_URL);
         EXPECTED_PULSAR_SINK_PROPERTIES.setProperty(PULSAR_SERVICE_URL.key(), 
TEST_SERVICE_URL);
     }
 
@@ -371,7 +366,6 @@ public class PulsarTableFactoryTest {
         Map<String, String> tableOptions = new HashMap<>();
         tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
         tableOptions.put(TOPICS.key(), TEST_TOPIC);
-        tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
         tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
         tableOptions.put(SOURCE_SUBSCRIPTION_NAME.key(), 
TEST_SUBSCRIPTION_NAME);
         // Format options.
@@ -385,7 +379,6 @@ public class PulsarTableFactoryTest {
         Map<String, String> tableOptions = new HashMap<>();
         tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
         tableOptions.put(TOPICS.key(), TEST_TOPIC);
-        tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
         tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
         tableOptions.put(SOURCE_SUBSCRIPTION_NAME.key(), 
TEST_SUBSCRIPTION_NAME);
         // Format options.
@@ -404,7 +397,6 @@ public class PulsarTableFactoryTest {
         Map<String, String> tableOptions = new HashMap<>();
         tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
         tableOptions.put(TOPICS.key(), TEST_TOPIC);
-        tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
         tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
         // Format options.
         tableOptions.put(FORMAT.key(), TestFormatFactory.IDENTIFIER);
@@ -417,7 +409,6 @@ public class PulsarTableFactoryTest {
         Map<String, String> tableOptions = new HashMap<>();
         tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
         tableOptions.put(TOPICS.key(), TEST_TOPIC);
-        tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
         tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
         // Format options.
         tableOptions.put(FORMAT.key(), TestFormatFactory.IDENTIFIER);
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
index c1739bd..6538676 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
@@ -96,14 +96,12 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         randomTableName,
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createTable);
@@ -181,7 +179,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s',\n"
                                 + "  'key.format' = '%s',\n"
                                 + "  'key.fields' = 'user_id; event_id'\n"
@@ -190,7 +187,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format,
                         format);
 
@@ -238,7 +234,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'pulsar.producer.producerName' = 
'pulsar-table-test',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
@@ -246,7 +241,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         topic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
         tableEnv.executeSql(createTable);
 
@@ -312,14 +306,12 @@ public class PulsarTableITCase extends 
PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sourceTableName,
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         RAW_FORMAT);
 
         tableEnv.executeSql(createSourceTable);
@@ -349,7 +341,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sourceTableName,
@@ -357,7 +348,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSourceTable);
@@ -382,7 +372,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sinkTableName,
@@ -390,7 +379,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         sinkTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSinkTable);
@@ -418,14 +406,12 @@ public class PulsarTableITCase extends 
PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sourceTableName,
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSourceTable);
@@ -452,14 +438,12 @@ public class PulsarTableITCase extends 
PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sourceTableName,
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSourceTable);
@@ -493,14 +477,12 @@ public class PulsarTableITCase extends 
PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sourceTableName,
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSourceTable);
@@ -517,14 +499,12 @@ public class PulsarTableITCase extends 
PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
                         sinkTableName,
                         PulsarTableFactory.IDENTIFIER,
                         sinkTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format);
 
         tableEnv.executeSql(createSinkTable);
@@ -549,7 +529,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  
'pulsar.source.partitionDiscoveryIntervalMs' = '-1',\n"
                                 + "  'source.stop.at-message-id' = 'latest',\n"
                                 + "  'format' = '%s'\n"
@@ -558,7 +537,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         RAW_FORMAT);
 
         tableEnv.executeSql(createSourceTable);
@@ -591,7 +569,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                                 + "  'connector' = '%s',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'source.stop.at-message-id' = 'latest',\n"
                                 + "  'format' = '%s'\n"
                                 + ")",
@@ -599,7 +576,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
                         PulsarTableFactory.IDENTIFIER,
                         sourceTopic,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         RAW_FORMAT);
 
         tableEnv.executeSql(createSourceTable);
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
index e01289c..e82ed8b 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
@@ -408,14 +408,10 @@ public class PulsarTableOptionsTest extends 
PulsarTableTestBase {
                                 + "  `physical_3` BOOLEAN\n"
                                 + ") WITH (\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  %s\n"
                                 + "  'connector' = 'pulsar'"
                                 + ")",
-                        topicName,
-                        pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
-                        testConfigString);
+                        topicName, pulsar.operator().serviceUrl(), 
testConfigString);
         tableEnv.executeSql(createTable);
     }
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
index 0173192..208264c 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
@@ -108,14 +108,12 @@ public class UpsertPulsarTableITCase extends 
PulsarTableITCase {
                                 + "  'connector' = 'upsert-pulsar',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'key.format' = '%s',\n"
                                 + "  'value.format' = '%s'"
                                 + ")",
                         wordCountTable,
                         wordCountTable,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format,
                         format);
         tableEnv.executeSql(createSinkTable);
@@ -168,7 +166,6 @@ public class UpsertPulsarTableITCase extends 
PulsarTableITCase {
                                 + "  'connector' = 'pulsar',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'key.format' = '%s',\n"
                                 + "  'key.fields' = 'word',\n"
                                 + "  'value.format' = '%s'\n"
@@ -176,7 +173,6 @@ public class UpsertPulsarTableITCase extends 
PulsarTableITCase {
                         rawWordCountTable,
                         wordCountTable,
                         pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
                         format,
                         format));
 
@@ -343,16 +339,10 @@ public class UpsertPulsarTableITCase extends 
PulsarTableITCase {
                                 + "  'connector' = 'upsert-pulsar',\n"
                                 + "  'topics' = '%s',\n"
                                 + "  'service-url' = '%s',\n"
-                                + "  'admin-url' = '%s',\n"
                                 + "  'key.format' = '%s',\n"
                                 + "  'value.format' = '%s'"
                                 + ")",
-                        userTable,
-                        userTable,
-                        pulsar.operator().serviceUrl(),
-                        pulsar.operator().adminUrl(),
-                        format,
-                        format);
+                        userTable, userTable, pulsar.operator().serviceUrl(), 
format, format);
         tableEnv.executeSql(createSinkTable);
         String initialValues =
                 "INSERT INTO " + userTable + " " + "SELECT * " + "FROM 
users_changelog_" + format;
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index bd4b243..060eeea 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -59,7 +59,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
-import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.getTcClient;
@@ -426,7 +425,6 @@ public class PulsarRuntimeOperator implements Closeable {
     public Configuration config() {
         Configuration configuration = new Configuration();
         configuration.set(PULSAR_SERVICE_URL, serviceUrl());
-        configuration.set(PULSAR_ADMIN_URL, adminUrl());
         return configuration;
     }
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
index e96c3ac..3c384dd 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
@@ -69,7 +69,6 @@ public abstract class PulsarSinkTestContext extends 
PulsarTestContext<String>
         PulsarSinkBuilder<String> builder =
                 PulsarSink.builder()
                         .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
                         .setDeliveryGuarantee(guarantee)
                         .setSerializationSchema(schema)
                         .enableSchemaEvolution()
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index 5a65a33..aece284 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -63,7 +63,6 @@ public abstract class PulsarSourceTestContext extends 
PulsarTestContext<String>
                 PulsarSource.builder()
                         .setDeserializationSchema(schema)
                         .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(topicPattern(), AllTopics)
                         .setSubscriptionName(subscriptionName())
                         .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
DISCOVERY_INTERVAL);


Reply via email to