This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/main by this push:
new 78d00ea [FLINK-32938] Replace pulsar admin calls (#59)
78d00ea is described below
commit 78d00ea9e3e278d4ce2fbb0c8a8d380abef7b858
Author: Neng Lu <[email protected]>
AuthorDate: Wed Sep 20 20:14:43 2023 -0700
[FLINK-32938] Replace pulsar admin calls (#59)
---
.../f4d91193-72ba-4ce4-ad83-98f780dce581 | 14 +-
.../common/config/PulsarAdminProxyBuilder.java | 64 -----
.../pulsar/common/config/PulsarClientFactory.java | 52 +---
.../pulsar/common/config/PulsarOptions.java | 81 +-----
.../handler/PulsarAdminInvocationHandler.java | 145 ----------
.../flink/connector/pulsar/sink/PulsarSink.java | 1 -
.../connector/pulsar/sink/PulsarSinkBuilder.java | 13 +-
.../pulsar/sink/config/PulsarSinkConfigUtils.java | 2 -
.../sink/writer/context/PulsarSinkContext.java | 4 +-
.../sink/writer/context/PulsarSinkContextImpl.java | 4 +-
.../pulsar/sink/writer/topic/MetadataListener.java | 51 ++--
.../connector/pulsar/source/PulsarSource.java | 1 -
.../pulsar/source/PulsarSourceBuilder.java | 11 +-
.../source/config/PulsarSourceConfigUtils.java | 2 -
.../source/enumerator/PulsarSourceEnumerator.java | 18 +-
.../source/enumerator/cursor/CursorPosition.java | 77 +++--
.../source/enumerator/cursor/StopCursor.java | 4 +-
.../cursor/start/MessageIdStartCursor.java | 2 +-
.../cursor/stop/LatestMessageStopCursor.java | 27 +-
.../enumerator/subscriber/PulsarSubscriber.java | 4 +-
.../subscriber/impl/BasePulsarSubscriber.java | 34 +--
.../subscriber/impl/TopicListSubscriber.java | 5 +-
.../source/reader/PulsarPartitionSplitReader.java | 14 +-
.../pulsar/source/reader/PulsarSourceReader.java | 9 -
.../pulsar/source/split/PulsarPartitionSplit.java | 6 +-
.../connector/pulsar/table/PulsarTableFactory.java | 9 +-
.../connector/pulsar/table/PulsarTableOptions.java | 21 --
.../pulsar/table/UpsertPulsarTableFactory.java | 9 +-
.../handler/PulsarAdminInvocationHandlerTest.java | 309 ---------------------
.../pulsar/sink/PulsarSinkBuilderTest.java | 5 +-
.../connector/pulsar/sink/PulsarSinkITCase.java | 1 -
.../pulsar/source/PulsarSourceBuilderTest.java | 4 -
.../source/enumerator/cursor/StopCursorTest.java | 1 -
.../subscriber/PulsarSubscriberTest.java | 14 +-
.../reader/PulsarPartitionSplitReaderTest.java | 1 -
.../GenericRecordDeserializationSchemaTest.java | 1 -
.../PulsarDeserializationSchemaTest.java | 1 -
.../pulsar/table/PulsarChangelogTableITCase.java | 6 -
.../pulsar/table/PulsarTableFactoryTest.java | 9 -
.../connector/pulsar/table/PulsarTableITCase.java | 24 --
.../pulsar/table/PulsarTableOptionsTest.java | 6 +-
.../pulsar/table/UpsertPulsarTableITCase.java | 12 +-
.../testutils/runtime/PulsarRuntimeOperator.java | 2 -
.../testutils/sink/PulsarSinkTestContext.java | 1 -
.../testutils/source/PulsarSourceTestContext.java | 1 -
45 files changed, 131 insertions(+), 951 deletions(-)
diff --git
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 1638c6a..7a71d25 100644
---
a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++
b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -9,16 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase
does not satisfy: on
* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only
one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and
annotated with @TestEnv\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy:
only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that
are static, final, and of type InternalMiniClusterExtension and annotated with
@RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any
fields that are static, final, and of type MiniClusterExtension and annotated
with @RegisterExtension or are , and of type MiniClusterTestEnvironment and
annotated with @TestEnv\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with
@ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
+ or contain any fields that are public, static, and of type
MiniClusterWithClientResource and final and annotated with @ClassRule or
contain any fields that is of type MiniClusterWithClientResource and public and
final and not static and annotated with @Rule
\ No newline at end of file
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
deleted file mode 100644
index 5c8da6b..0000000
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarAdminProxyBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.config;
-
-import
org.apache.flink.connector.pulsar.common.handler.PulsarAdminInvocationHandler;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import java.lang.reflect.Proxy;
-
-/**
- * {@link org.apache.pulsar.client.admin.PulsarAdminBuilder} didn't expose all
the configurations to
- * end user. We have to extend the default builder method for adding extra
configurations.
- */
-public class PulsarAdminProxyBuilder extends PulsarAdminBuilderImpl {
-
- private final PulsarConfiguration configuration;
-
- public PulsarAdminProxyBuilder(PulsarConfiguration configuration) {
- this.configuration = configuration;
- }
-
- /**
- * This is used by the internal implementation of {@link
AsyncHttpConnector} to set the max
- * allowed number of request threads.
- */
- public void numIoThreads(int numIoThreads) {
- conf.setNumIoThreads(numIoThreads);
- }
-
- /**
- * Wrap the pulsar admin interface into a proxy instance which can retry
the request and limit
- * the request rate.
- */
- @Override
- public PulsarAdmin build() throws PulsarClientException {
- PulsarAdminInvocationHandler handler =
- new PulsarAdminInvocationHandler(super.build(), configuration);
- return (PulsarAdmin)
- Proxy.newProxyInstance(
- PulsarAdmin.class.getClassLoader(),
- new Class[] {PulsarAdmin.class},
- handler);
- }
-}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
index 7b45ef1..caaf05e 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.common.config;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -39,16 +38,13 @@ import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_MAX_IDLE_SECONDS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_DNS_LOOKUP_BIND_ADDRESS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
@@ -66,8 +62,6 @@ import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SOCKS5_PROXY_ADDRESS;
@@ -187,50 +181,8 @@ public final class PulsarClientFactory {
}
/**
- * PulsarAdmin shares almost the same configuration with PulsarClient, but
we separate this
- * creating method for directly use it.
- */
- public static PulsarAdmin createAdmin(PulsarConfiguration configuration)
- throws PulsarClientException {
- PulsarAdminProxyBuilder builder = new
PulsarAdminProxyBuilder(configuration);
-
- // Create the authentication instance for the Pulsar client.
- builder.authentication(createAuthentication(configuration));
-
- configuration.useOption(PULSAR_ADMIN_URL, builder::serviceHttpUrl);
- configuration.useOption(PULSAR_TLS_KEY_FILE_PATH,
builder::tlsKeyFilePath);
- configuration.useOption(PULSAR_TLS_CERTIFICATE_FILE_PATH,
builder::tlsCertificateFilePath);
- configuration.useOption(PULSAR_TLS_TRUST_CERTS_FILE_PATH,
builder::tlsTrustCertsFilePath);
- configuration.useOption(
- PULSAR_TLS_ALLOW_INSECURE_CONNECTION,
builder::allowTlsInsecureConnection);
- configuration.useOption(
- PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE,
builder::enableTlsHostnameVerification);
- configuration.useOption(PULSAR_USE_KEY_STORE_TLS,
builder::useKeyStoreTls);
- configuration.useOption(PULSAR_SSL_PROVIDER, builder::sslProvider);
- configuration.useOption(PULSAR_TLS_KEY_STORE_TYPE,
builder::tlsKeyStoreType);
- configuration.useOption(PULSAR_TLS_KEY_STORE_PATH,
builder::tlsKeyStorePath);
- configuration.useOption(PULSAR_TLS_KEY_STORE_PASSWORD,
builder::tlsKeyStorePassword);
- configuration.useOption(PULSAR_TLS_TRUST_STORE_TYPE,
builder::tlsTrustStoreType);
- configuration.useOption(PULSAR_TLS_TRUST_STORE_PATH,
builder::tlsTrustStorePath);
- configuration.useOption(PULSAR_TLS_TRUST_STORE_PASSWORD,
builder::tlsTrustStorePassword);
- configuration.useOption(PULSAR_TLS_CIPHERS, TreeSet::new,
builder::tlsCiphers);
- configuration.useOption(PULSAR_TLS_PROTOCOLS, TreeSet::new,
builder::tlsProtocols);
- configuration.useOption(
- PULSAR_CONNECT_TIMEOUT, v -> builder.connectionTimeout(v,
MILLISECONDS));
- configuration.useOption(PULSAR_READ_TIMEOUT, v ->
builder.readTimeout(v, MILLISECONDS));
- configuration.useOption(
- PULSAR_REQUEST_TIMEOUT, v -> builder.requestTimeout(v,
MILLISECONDS));
- configuration.useOption(
- PULSAR_AUTO_CERT_REFRESH_TIME, v ->
builder.autoCertRefreshTime(v, MILLISECONDS));
- configuration.useOption(PULSAR_NUM_IO_THREADS, builder::numIoThreads);
-
- return builder.build();
- }
-
- /**
- * Create the {@link Authentication} instance for both {@code
PulsarClient} and {@code
- * PulsarAdmin}. If the user didn't provide configuration, a {@link
AuthenticationDisabled}
- * instance would be returned.
+ * Create the {@link Authentication} instance for {@code PulsarClient}. If
the user didn't
+ * provide configuration, a {@link AuthenticationDisabled} instance would
be returned.
*
* <p>This method behavior is the same as the pulsar command line tools.
*/
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
index 9fdc1cf..c27e30a 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.description.Description;
import org.apache.pulsar.client.api.ProxyProtocol;
-import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -35,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.ADMIN_CONFIG_PREFIX;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIENT_CONFIG_PREFIX;
/**
@@ -43,17 +41,11 @@ import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIE
* table.
*/
@PublicEvolving
-@ConfigGroups(
- groups = {
- @ConfigGroup(name = "PulsarClient", keyPrefix =
CLIENT_CONFIG_PREFIX),
- @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
- })
+@ConfigGroups(groups = {@ConfigGroup(name = "PulsarClient", keyPrefix =
CLIENT_CONFIG_PREFIX)})
public final class PulsarOptions {
// Pulsar client API config prefix.
public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
- // Pulsar admin API config prefix.
- public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";
private PulsarOptions() {
// This is a constant class
@@ -582,75 +574,4 @@ public final class PulsarOptions {
.stringType()
.noDefaultValue()
.withDescription("Password of SOCKS5 proxy.");
-
-
///////////////////////////////////////////////////////////////////////////////
- //
- // The configuration for PulsarAdmin part.
- // All the configuration listed below should have the pulsar.admin prefix.
- //
-
///////////////////////////////////////////////////////////////////////////////
-
- public static final ConfigOption<String> PULSAR_ADMIN_URL =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "adminUrl")
- .stringType()
- .noDefaultValue()
- .withDescription(
- Description.builder()
- .text(
- "The Pulsar service HTTP URL for
the admin endpoint. For example, %s, or %s for TLS.",
-
code("http://my-broker.example.com:8080"),
-
code("https://my-broker.example.com:8443"))
- .build());
-
- public static final ConfigOption<Integer> PULSAR_CONNECT_TIMEOUT =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout")
- .intType()
- .defaultValue(60000)
- .withDescription("The connection time out (in ms) for the
PulsarAdmin client.");
-
- public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout")
- .intType()
- .defaultValue(60000)
- .withDescription(
- "The server response read timeout (in ms) for the
PulsarAdmin client for any request.");
-
- public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout")
- .intType()
- .defaultValue(300000)
- .withDescription(
- "The server request timeout (in ms) for the
PulsarAdmin client for any request.");
-
- public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime")
- .intType()
- .defaultValue(300000)
- .withDescription(
- "The auto cert refresh time (in ms) if Pulsar
admin supports TLS authentication.");
-
- // These config options below are passing to PulsarAdminInvocationHandler.
- // A wrapper for PulsarAdmin.
-
- public static final ConfigOption<Integer> PULSAR_ADMIN_REQUEST_RETRIES =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestRetries")
- .intType()
- .defaultValue(5)
- .withDescription(
- "For PulsarAdmin request, it will retry until we
get a success response,"
- + " fail if we exhausted retry count.");
-
- public static final ConfigOption<Long> PULSAR_ADMIN_REQUEST_WAIT_MILLIS =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestWaitMillis")
- .longType()
- .defaultValue(Duration.ofSeconds(3).toMillis())
- .withDescription(
- "For PulsarAdmin request, We will sleep the given
time before retrying the failed request.");
-
- public static final ConfigOption<Integer> PULSAR_ADMIN_REQUEST_RATES =
- ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestRates")
- .intType()
- .defaultValue(5)
- .withDescription(
- "It will add ratelimit for PulsarAdmin metadata
requests, stands for requests per second.");
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
deleted file mode 100644
index 14d5ad2..0000000
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.handler;
-
-import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import org.apache.pulsar.shade.com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
-import static
org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
-/** A wrapper which wraps the {@link PulsarAdmin} with request retry and rate
limit support. */
-public class PulsarAdminInvocationHandler implements InvocationHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(PulsarAdminInvocationHandler.class);
-
- @SuppressWarnings("java:S3077")
- private static volatile RateLimiter rateLimiter;
-
- private final PulsarAdmin admin;
- private final int retryTimes;
- private final long waitMillis;
- private final int requestRates;
- private final Map<String, Object> handlers;
-
- public PulsarAdminInvocationHandler(PulsarAdmin admin, PulsarConfiguration
configuration) {
- this.admin = admin;
- this.retryTimes = configuration.get(PULSAR_ADMIN_REQUEST_RETRIES);
- this.waitMillis = configuration.get(PULSAR_ADMIN_REQUEST_WAIT_MILLIS);
- this.requestRates = configuration.get(PULSAR_ADMIN_REQUEST_RATES);
- this.handlers = new ConcurrentHashMap<>();
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
- Class<?> returnType = method.getReturnType();
-
- // No need to proxy the void return type.
- // The non-interface type is not able to proxy.
- if (returnType.equals(Void.TYPE) || !returnType.isInterface()) {
- return method.invoke(admin, args);
- }
-
- String methodName = method.getName();
- if (handlers.containsKey(methodName)) {
- return handlers.get(methodName);
- }
-
- Object handler =
- Proxy.newProxyInstance(
- Thread.currentThread().getContextClassLoader(),
- new Class[] {returnType},
- new RequestHandler(method.invoke(admin, args)));
- this.handlers.put(methodName, handler);
-
- return handler;
- }
-
- /** A proxy handler with retry support for all the admin request. */
- @SuppressWarnings({"java:S1193", "java:S1181", "java:S3776", "java:S112"})
- private class RequestHandler implements InvocationHandler {
-
- private final Object handler;
-
- public RequestHandler(Object handler) {
- this.handler = handler;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- return doInvoke(method, args, retryTimes);
- }
-
- private Object doInvoke(Method method, Object[] args, int
remainingTimes) throws Throwable {
- while (true) {
- // Make sure the request is allowed in the given rates.
- requestRateLimit(requestRates);
-
- try {
- return method.invoke(handler, args);
- } catch (InvocationTargetException e) {
- Throwable throwable = e.getTargetException();
- if (throwable instanceof NotFoundException) {
- // No need to retry on such exceptions.
- throw throwable;
- } else if (throwable instanceof PulsarAdminException) {
- remainingTimes--;
- LOG.warn("Request error in Admin API, remain times:
{}", remainingTimes, e);
- if (remainingTimes == 0) {
- throw throwable;
- } else {
- // Sleep for the given times before executing the
next query.
- sleepUninterruptibly(waitMillis, MILLISECONDS);
- }
- } else {
- throw throwable;
- }
- }
- }
- }
- }
-
- /** Request a global ratelimit which limits the total admin API request
rates. */
- private static void requestRateLimit(int requestRates) {
- if (rateLimiter == null) {
- synchronized (PulsarAdminInvocationHandler.class) {
- if (rateLimiter == null) {
- rateLimiter = RateLimiter.create(requestRates);
- }
- }
- }
-
- rateLimiter.acquire();
- }
-}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
index 1d4e92b..559da3a 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -52,7 +52,6 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* <pre>{@code
* PulsarSink<String> sink = PulsarSink.builder()
* .setServiceUrl(operator().serviceUrl())
- * .setAdminUrl(operator().adminUrl())
* .setTopic(topic)
* .setSerializationSchema(Schema.STRING)
* .build();
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index e3e7b1b..ee6e7db 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -46,7 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -75,15 +74,14 @@ import static
org.apache.flink.util.Preconditions.checkState;
* <pre>{@code
* PulsarSink<String> sink = PulsarSink.builder()
* .setServiceUrl(operator().serviceUrl())
- * .setAdminUrl(operator().adminUrl())
* .setTopics(topic)
* .setSerializationSchema(Schema.STRING)
* .build();
* }</pre>
*
- * <p>The service url, admin url, and the record serializer are required
fields that must be set. If
- * you don't set the topics, make sure you have provided a custom {@link
TopicRouter}. Otherwise,
- * you must provide the topics to produce.
+ * <p>The service url and the record serializer are required fields that must
be set. If you don't
+ * set the topics, make sure you have provided a custom {@link TopicRouter}.
Otherwise, you must
+ * provide the topics to produce.
*
* <p>To specify the delivery guarantees of PulsarSink, one can call {@link
* #setDeliveryGuarantee(DeliveryGuarantee)}. The default value of the
delivery guarantee is {@link
@@ -93,7 +91,6 @@ import static org.apache.flink.util.Preconditions.checkState;
* <pre>{@code
* PulsarSink<String> sink = PulsarSink.builder()
* .setServiceUrl(operator().serviceUrl())
- * .setAdminUrl(operator().adminUrl())
* .setTopics(topic)
* .setSerializationSchema(Schema.STRING)
* .setDeliveryGuarantee(deliveryGuarantee)
@@ -126,9 +123,11 @@ public class PulsarSinkBuilder<IN> {
*
* @param adminUrl The url for the PulsarAdmin.
* @return this PulsarSinkBuilder.
+ * @deprecated this method will return builder directly
*/
+ @Deprecated
public PulsarSinkBuilder<IN> setAdminUrl(String adminUrl) {
- return setConfig(PULSAR_ADMIN_URL, adminUrl);
+ return this;
}
/**
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 9d0a6c1..da03ff2 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -31,7 +31,6 @@ import java.util.UUID;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
@@ -62,7 +61,6 @@ public final class PulsarSinkConfigUtils {
public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR =
PulsarConfigValidator.builder()
.requiredOption(PULSAR_SERVICE_URL)
- .requiredOption(PULSAR_ADMIN_URL)
.conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
.build();
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
index 6799677..cbc7b00 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContext.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
import java.util.Optional;
/** This context provides information on the pulsar record target location. */
@@ -56,5 +54,5 @@ public interface PulsarSinkContext {
*
* @return Return {@link Optional#empty()} if the topic doesn't exist.
*/
- Optional<TopicMetadata> topicMetadata(String topic) throws
PulsarAdminException;
+ Optional<TopicMetadata> topicMetadata(String topic);
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
index 5df0026..8c827db 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/context/PulsarSinkContextImpl.java
@@ -25,8 +25,6 @@ import
org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
import java.util.Optional;
/** An implementation that would contain all the required context. */
@@ -71,7 +69,7 @@ public class PulsarSinkContextImpl implements
PulsarSinkContext {
}
@Override
- public Optional<TopicMetadata> topicMetadata(String topic) throws
PulsarAdminException {
+ public Optional<TopicMetadata> topicMetadata(String topic) {
return metadataListener.queryTopicMetadata(topic);
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
index 9745dc6..b324efe 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/MetadataListener.java
@@ -25,13 +25,10 @@ import
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
@@ -52,7 +49,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
import static
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
@@ -72,7 +69,7 @@ public class MetadataListener implements Serializable,
Closeable {
private ImmutableList<TopicPartition> availablePartitions;
// Dynamic fields.
- private transient PulsarAdmin pulsarAdmin;
+ private transient PulsarClientImpl clientImpl;
private transient Long topicMetadataRefreshInterval;
private transient ProcessingTimeService timeService;
private transient LoadingCache<String, Optional<Integer>>
topicPartitionCache;
@@ -102,7 +99,7 @@ public class MetadataListener implements Serializable,
Closeable {
public void open(SinkConfiguration sinkConfiguration,
ProcessingTimeService timeService)
throws PulsarClientException {
// Initialize listener properties.
- this.pulsarAdmin = createAdmin(sinkConfiguration);
+ this.clientImpl = (PulsarClientImpl) createClient(sinkConfiguration);
this.topicMetadataRefreshInterval =
sinkConfiguration.getTopicMetadataRefreshInterval();
this.timeService = timeService;
this.topicPartitionCache =
@@ -113,25 +110,15 @@ public class MetadataListener implements Serializable,
Closeable {
@Override
@ParametersAreNonnullByDefault
public Optional<Integer> load(String topic)
- throws PulsarAdminException {
- try {
- PartitionedTopicMetadata metadata =
- pulsarAdmin
- .topics()
-
.getPartitionedTopicMetadata(topic);
- return
Optional.of(metadata.partitions);
- } catch (NotFoundException e) {
- return Optional.empty();
- }
+ throws ExecutionException,
InterruptedException {
+ PartitionedTopicMetadata metadata =
+
clientImpl.getPartitionedTopicMetadata(topic).get();
+ return
Optional.of(metadata.partitions);
}
});
// Initialize the topic metadata. Quit if fail to connect to Pulsar.
- try {
- updateTopicMetadata();
- } catch (PulsarAdminException e) {
- throw new FlinkRuntimeException(e);
- }
+ updateTopicMetadata();
// Register time service for update the topic metadata.
if (topics.isEmpty()) {
@@ -156,7 +143,7 @@ public class MetadataListener implements Serializable,
Closeable {
*
* @return Return {@link Optional#empty()} if the topic doesn't exist.
*/
- public Optional<TopicMetadata> queryTopicMetadata(String topic) throws
PulsarAdminException {
+ public Optional<TopicMetadata> queryTopicMetadata(String topic) {
if (isPartition(topic)) {
return Optional.of(new TopicMetadata(topic, NON_PARTITIONED));
}
@@ -164,13 +151,7 @@ public class MetadataListener implements Serializable,
Closeable {
try {
return topicPartitionCache.get(topic).map(size -> new
TopicMetadata(topic, size));
} catch (ExecutionException e) {
- Optional<PulsarAdminException> optional =
- ExceptionUtils.findThrowable(e,
PulsarAdminException.class);
- if (optional.isPresent()) {
- throw optional.get();
- } else {
- throw new FlinkRuntimeException(e);
- }
+ throw new FlinkRuntimeException(e);
}
}
@@ -181,8 +162,8 @@ public class MetadataListener implements Serializable,
Closeable {
@Override
public void close() throws IOException {
- if (pulsarAdmin != null) {
- pulsarAdmin.close();
+ if (clientImpl != null) {
+ clientImpl.close();
}
}
@@ -194,10 +175,10 @@ public class MetadataListener implements Serializable,
Closeable {
}
private void triggerNextTopicMetadataUpdate() {
- // Try to update the topic metadata, ignore the pulsar admin exception.
+ // Try to update the topic metadata.
try {
updateTopicMetadata();
- } catch (PulsarAdminException e) {
+ } catch (FlinkRuntimeException e) {
LOG.warn("", e);
}
@@ -205,7 +186,7 @@ public class MetadataListener implements Serializable,
Closeable {
registerNextTopicMetadataUpdateTimer();
}
- private void updateTopicMetadata() throws PulsarAdminException {
+ private void updateTopicMetadata() throws FlinkRuntimeException {
ImmutableList.Builder<TopicPartition> parititonsBuilder =
ImmutableList.builder();
for (String topic : topics) {
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index c038f32..447867c 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -55,7 +55,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
* .builder()
* .setTopics(TOPIC1, TOPIC2)
* .setServiceUrl(getServiceUrl())
- * .setAdminUrl(getAdminUrl())
* .setSubscriptionName("test")
* .setDeserializationSchema(new SimpleStringSchema())
* .setBounded(StopCursor::defaultStopCursor)
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index a31f185..687e9b8 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -59,7 +59,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
@@ -86,15 +85,14 @@ import static
org.apache.flink.util.Preconditions.checkState;
* PulsarSource<String> source = PulsarSource
* .builder()
* .setServiceUrl(PULSAR_BROKER_URL)
- * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
* .setSubscriptionName("flink-source-1")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializationSchema(new SimpleStringSchema())
* .build();
* }</pre>
*
- * <p>The service url, admin url, subscription name, topics to consume, and
the record deserializer
- * are required fields that must be set.
+ * <p>The service url, subscription name, topics to consume, and the record
deserializer are
+ * required fields that must be set.
*
* <p>To specify the starting position of PulsarSource, one can call {@link
* #setStartCursor(StartCursor)}.
@@ -115,7 +113,6 @@ import static
org.apache.flink.util.Preconditions.checkState;
* PulsarSource<String> source = PulsarSource
* .builder()
* .setServiceUrl(PULSAR_BROKER_URL)
- * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
* .setSubscriptionName("flink-source-1")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializationSchema(new SimpleStringSchema())
@@ -152,9 +149,11 @@ public final class PulsarSourceBuilder<OUT> {
*
* @param adminUrl the url for the PulsarAdmin.
* @return this PulsarSourceBuilder.
+ * @deprecated this method will return builder directly
*/
+ @Deprecated
public PulsarSourceBuilder<OUT> setAdminUrl(String adminUrl) {
- return setConfig(PULSAR_ADMIN_URL, adminUrl);
+ return this;
}
/**
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 5f379d3..eba6175 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
@@ -84,7 +83,6 @@ public final class PulsarSourceConfigUtils {
public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR =
PulsarConfigValidator.builder()
.requiredOption(PULSAR_SERVICE_URL)
- .requiredOption(PULSAR_ADMIN_URL)
.requiredOption(PULSAR_SUBSCRIPTION_NAME)
.conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
.build();
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 819f9cd..57bb72b 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -33,8 +33,6 @@ import
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
@@ -47,7 +45,6 @@ import java.util.List;
import java.util.Set;
import static java.util.Collections.singletonList;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
import static
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
import static
org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner.createAssigner;
@@ -60,7 +57,6 @@ public class PulsarSourceEnumerator
private static final Logger LOG =
LoggerFactory.getLogger(PulsarSourceEnumerator.class);
private final PulsarClient pulsarClient;
- private final PulsarAdmin pulsarAdmin;
private final PulsarSubscriber subscriber;
private final StartCursor startCursor;
private final RangeGenerator rangeGenerator;
@@ -97,7 +93,6 @@ public class PulsarSourceEnumerator
PulsarSourceEnumState enumState)
throws PulsarClientException {
this.pulsarClient = createClient(sourceConfiguration);
- this.pulsarAdmin = createAdmin(sourceConfiguration);
this.subscriber = subscriber;
this.startCursor = startCursor;
this.rangeGenerator = rangeGenerator;
@@ -109,7 +104,7 @@ public class PulsarSourceEnumerator
@Override
public void start() {
- subscriber.open(pulsarClient, pulsarAdmin);
+ subscriber.open(pulsarClient);
rangeGenerator.open(sourceConfiguration);
// Expose the split assignment metrics if Flink has supported.
@@ -178,9 +173,6 @@ public class PulsarSourceEnumerator
if (pulsarClient != null) {
pulsarClient.close();
}
- if (pulsarAdmin != null) {
- pulsarAdmin.close();
- }
}
// ----------------- private methods -------------------
@@ -228,12 +220,8 @@ public class PulsarSourceEnumerator
startCursor.position(partition.getTopic(),
partition.getPartitionId());
try {
- if (sourceConfiguration.isResetSubscriptionCursor()) {
- position.seekPosition(pulsarAdmin, topic,
subscriptionName);
- } else {
- position.createInitialPosition(pulsarAdmin, topic,
subscriptionName);
- }
- } catch (PulsarAdminException e) {
+ position.setupSubPosition(pulsarClient, topic,
subscriptionName);
+ } catch (PulsarClientException e) {
throw new FlinkRuntimeException(e);
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index 8d8ee2c..72fc614 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -21,15 +21,18 @@ package
org.apache.flink.connector.pulsar.source.enumerator.cursor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import
org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import java.io.Serializable;
-import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -78,51 +81,43 @@ public final class CursorPosition implements Serializable {
/** This method is used to create the initial position in {@link
PulsarSourceEnumerator}. */
@Internal
- public boolean createInitialPosition(
- PulsarAdmin pulsarAdmin, String topicName, String subscriptionName)
- throws PulsarAdminException {
- List<String> subscriptions =
pulsarAdmin.topics().getSubscriptions(topicName);
-
- if (!subscriptions.contains(subscriptionName)) {
- pulsarAdmin
- .topics()
- .createSubscription(topicName, subscriptionName,
MessageId.earliest);
-
+ public void setupSubPosition(PulsarClient client, String topicName, String
subscriptionName)
+ throws PulsarClientException {
+ try (Consumer<GenericRecord> consumer =
+ client.newConsumer(new AutoConsumeSchema())
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscribe()) {
// Reset cursor to desired position.
- MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
- pulsarAdmin
- .topics()
- .resetCursor(topicName, subscriptionName, initialPosition,
!include);
-
- return true;
+ if (type == Type.TIMESTAMP) {
+ consumer.seek(getActualTimestamp(this.timestamp));
+ } else if (messageId instanceof ChunkMessageIdImpl) {
+ MessageIdAdv msgId = ((ChunkMessageIdImpl)
messageId).getFirstChunkMessageId();
+ consumer.seek(getActualMessageId(msgId));
+ } else {
+ consumer.seek(getActualMessageId((MessageIdAdv) messageId));
+ }
}
-
- return false;
}
- /**
- * This method is used to reset the consuming position in {@link
PulsarPartitionSplitReader}.
- */
- @Internal
- public void seekPosition(PulsarAdmin pulsarAdmin, String topicName, String
subscriptionName)
- throws PulsarAdminException {
- if (!createInitialPosition(pulsarAdmin, topicName, subscriptionName)) {
- // Reset cursor to desired position.
- MessageId initialPosition = getMessageId(pulsarAdmin, topicName);
- pulsarAdmin
- .topics()
- .resetCursor(topicName, subscriptionName, initialPosition,
!include);
+ private MessageId getActualMessageId(MessageIdAdv messageIdImpl) {
+ if (include) {
+ return messageIdImpl;
+ } else {
+ // if the (ledgerId, entryId + 1) is not valid
+ // pulsar broker will automatically set the cursor to the next
valid message
+ return new MessageIdImpl(
+ messageIdImpl.getLedgerId(),
+ messageIdImpl.getEntryId() + 1,
+ messageIdImpl.getPartitionIndex());
}
}
- private MessageId getMessageId(PulsarAdmin pulsarAdmin, String topicName)
- throws PulsarAdminException {
- if (type == Type.TIMESTAMP) {
- return pulsarAdmin.topics().getMessageIdByTimestamp(topicName,
timestamp);
- } else if (messageId instanceof ChunkMessageIdImpl) {
- return ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId();
+ private long getActualTimestamp(long timestamp) {
+ if (include) {
+ return timestamp;
} else {
- return messageId;
+ return timestamp + 1;
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index 1af875e..3ba4ad2 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -26,9 +26,9 @@ import
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStop
import
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.PublishTimestampStopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import java.io.Serializable;
@@ -42,7 +42,7 @@ import java.io.Serializable;
public interface StopCursor extends Serializable {
/** The open method for the cursor initializer. This method could be
executed multiple times. */
- default void open(PulsarAdmin admin, TopicPartition partition) throws
Exception {}
+ default void open(PulsarClient client, TopicPartition partition) throws
Exception {}
/** Determine whether to pause consumption on the current message by the
returned enum. */
StopCondition shouldStop(Message<?> message);
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index b180262..fb0a916 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -36,7 +36,7 @@ public class MessageIdStartCursor implements StartCursor {
/**
* The default {@code inclusive} behavior should be controlled in {@link
* ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and
doesn't support this
- * feature currently. We have to use admin API to reset the cursor instead.
+ * feature currently. So we have to implement this feature by ourselves.
*
* @param messageId The message id for start position.
* @param inclusive Whether we include the start message id in the
consuming result. This works
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index def12b1..4c86977 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -21,10 +21,13 @@ package
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import java.util.Objects;
@@ -36,6 +39,8 @@ import java.util.Objects;
public class LatestMessageStopCursor implements StopCursor {
private static final long serialVersionUID = 1702059838323965723L;
+ private static final String SUBSCRIPTION_NAME =
LatestMessageStopCursor.class.getSimpleName();
+
private MessageId messageId;
private final boolean inclusive;
@@ -50,10 +55,22 @@ public class LatestMessageStopCursor implements StopCursor {
}
@Override
- public void open(PulsarAdmin admin, TopicPartition partition) throws
PulsarAdminException {
+ public void open(PulsarClient client, TopicPartition partition) throws
PulsarClientException {
if (messageId == null) {
- String topic = partition.getFullTopicName();
- this.messageId = admin.topics().getLastMessageId(topic);
+ Consumer<GenericRecord> consumer = null;
+ try {
+ consumer =
+ client.newConsumer(new AutoConsumeSchema())
+ .topic(partition.getFullTopicName())
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscribe();
+ this.messageId = consumer.getLastMessageId();
+ } finally {
+ if (consumer != null) {
+ consumer.unsubscribe();
+ consumer.close();
+ }
+ }
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
index 653a9ec..a7b08b7 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
@@ -24,7 +24,6 @@ import
org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.Topic
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -63,9 +62,8 @@ public interface PulsarSubscriber extends Serializable {
* Initialize the topic subscriber.
*
* @param client The client interface for querying the topics by regex
pattern.
- * @param admin The admin interface used to retrieve subscribed topic
partitions.
*/
- void open(PulsarClient client, PulsarAdmin admin);
+ void open(PulsarClient client);
// ----------------- factory methods --------------
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
index 28ee9d8..30a24df 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
@@ -24,19 +24,19 @@ import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import static
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
-/** PulsarSubscriber abstract class to simplify Pulsar admin related
operations. */
+/** PulsarSubscriber abstract class to simplify Pulsar metadata related
operations. */
public abstract class BasePulsarSubscriber implements PulsarSubscriber {
private static final long serialVersionUID = 2053021503331058888L;
@@ -45,33 +45,24 @@ public abstract class BasePulsarSubscriber implements
PulsarSubscriber {
private static final Set<String> NON_PARTITIONED_TOPICS =
ConcurrentHashMap.newKeySet();
protected transient PulsarClient client;
- protected transient PulsarAdmin admin;
- protected TopicMetadata queryTopicMetadata(String topic) throws
PulsarAdminException {
+ protected TopicMetadata queryTopicMetadata(String topic)
+ throws ExecutionException, InterruptedException {
if (NON_PARTITIONED_TOPICS.contains(topic)) {
return new TopicMetadata(topic, NON_PARTITIONED);
}
- try {
- PartitionedTopicMetadata metadata =
admin.topics().getPartitionedTopicMetadata(topic);
- if (metadata.partitions == NON_PARTITIONED) {
- NON_PARTITIONED_TOPICS.add(topic);
- }
- return new TopicMetadata(topic, metadata.partitions);
- } catch (PulsarAdminException e) {
- if (e.getStatusCode() == 404) {
- // Return null for skipping the topic metadata query.
- return null;
- } else {
- // This method would cause failure for subscribers.
- throw e;
- }
+ PulsarClientImpl clientImpl = (PulsarClientImpl) client;
+ PartitionedTopicMetadata metadata =
clientImpl.getPartitionedTopicMetadata(topic).get();
+ if (metadata.partitions == NON_PARTITIONED) {
+ NON_PARTITIONED_TOPICS.add(topic);
}
+ return new TopicMetadata(topic, metadata.partitions);
}
protected Set<TopicPartition> createTopicPartitions(
Set<String> topics, RangeGenerator generator, int parallelism)
- throws PulsarAdminException {
+ throws ExecutionException, InterruptedException {
Set<TopicPartition> results = new HashSet<>();
for (String topic : topics) {
@@ -94,8 +85,7 @@ public abstract class BasePulsarSubscriber implements
PulsarSubscriber {
}
@Override
- public void open(PulsarClient client, PulsarAdmin admin) {
+ public void open(PulsarClient client) {
this.client = client;
- this.admin = admin;
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
index b1923df..920e831 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
@@ -23,12 +23,12 @@ import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
@@ -54,7 +54,8 @@ public class TopicListSubscriber extends BasePulsarSubscriber
{
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(
- RangeGenerator generator, int parallelism) throws
PulsarAdminException {
+ RangeGenerator generator, int parallelism)
+ throws ExecutionException, InterruptedException {
// Query topics from Pulsar.
Set<TopicPartition> results = createTopicPartitions(fullTopicNames,
generator, parallelism);
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 4f4403f..38a7adb 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -38,8 +38,6 @@ import
org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerStats;
@@ -94,7 +92,6 @@ public class PulsarPartitionSplitReader
private static final Logger LOG =
LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
private final PulsarClient pulsarClient;
- private final PulsarAdmin pulsarAdmin;
private final SourceConfiguration sourceConfiguration;
private final Schema<byte[]> schema;
private final PulsarCrypto pulsarCrypto;
@@ -105,13 +102,11 @@ public class PulsarPartitionSplitReader
public PulsarPartitionSplitReader(
PulsarClient pulsarClient,
- PulsarAdmin pulsarAdmin,
SourceConfiguration sourceConfiguration,
Schema<byte[]> schema,
PulsarCrypto pulsarCrypto,
SourceReaderMetricGroup metricGroup) {
this.pulsarClient = pulsarClient;
- this.pulsarAdmin = pulsarAdmin;
this.sourceConfiguration = sourceConfiguration;
this.schema = schema;
this.pulsarCrypto = pulsarCrypto;
@@ -192,7 +187,7 @@ public class PulsarPartitionSplitReader
// Open stop cursor.
try {
- registeredSplit.open(pulsarAdmin);
+ registeredSplit.open(pulsarClient);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
@@ -215,11 +210,8 @@ public class PulsarPartitionSplitReader
String topicName =
registeredSplit.getPartition().getFullTopicName();
String subscriptionName =
sourceConfiguration.getSubscriptionName();
- // Remove Consumer.seek() here for waiting for
pulsar-client-all 2.12.0
- // See https://github.com/apache/pulsar/issues/16757 for more
details.
-
- cursorPosition.seekPosition(pulsarAdmin, topicName,
subscriptionName);
- } catch (PulsarAdminException e) {
+ cursorPosition.setupSubPosition(pulsarClient, topicName,
subscriptionName);
+ } catch (PulsarClientException e) {
if (sourceConfiguration.getVerifyInitialOffsets() ==
FAIL_ON_MISMATCH) {
throw new IllegalArgumentException(e);
} else {
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
index e5243a8..014d303 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
@@ -39,7 +39,6 @@ import
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
@@ -62,7 +61,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
import static
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
/**
@@ -79,7 +77,6 @@ public class PulsarSourceReader<OUT>
private final SourceConfiguration sourceConfiguration;
private final PulsarClient pulsarClient;
- private final PulsarAdmin pulsarAdmin;
@VisibleForTesting final SortedMap<Long, Map<TopicPartition, MessageId>>
cursorsToCommit;
private final ConcurrentMap<TopicPartition, MessageId>
cursorsOfFinishedSplits;
private final AtomicReference<Throwable> cursorCommitThrowable;
@@ -92,7 +89,6 @@ public class PulsarSourceReader<OUT>
PulsarDeserializationSchema<OUT> deserializationSchema,
SourceConfiguration sourceConfiguration,
PulsarClient pulsarClient,
- PulsarAdmin pulsarAdmin,
SourceReaderContext context) {
super(
elementsQueue,
@@ -103,7 +99,6 @@ public class PulsarSourceReader<OUT>
this.sourceConfiguration = sourceConfiguration;
this.pulsarClient = pulsarClient;
- this.pulsarAdmin = pulsarAdmin;
this.cursorsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
@@ -221,7 +216,6 @@ public class PulsarSourceReader<OUT>
// Close shared pulsar resources.
pulsarClient.shutdown();
- pulsarAdmin.close();
}
// ----------------- helper methods --------------
@@ -264,7 +258,6 @@ public class PulsarSourceReader<OUT>
new FutureCompletingBlockingQueue<>(queueCapacity);
PulsarClient pulsarClient = createClient(sourceConfiguration);
- PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
// Initialize the deserialization schema before creating the pulsar
reader.
PulsarDeserializationSchemaInitializationContext initializationContext
=
@@ -287,7 +280,6 @@ public class PulsarSourceReader<OUT>
() ->
new PulsarPartitionSplitReader(
pulsarClient,
- pulsarAdmin,
sourceConfiguration,
schema,
pulsarCrypto,
@@ -303,7 +295,6 @@ public class PulsarSourceReader<OUT>
deserializationSchema,
sourceConfiguration,
pulsarClient,
- pulsarAdmin,
readerContext);
}
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 3044e9c..2e21fa8 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -24,8 +24,8 @@ import
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import javax.annotation.Nullable;
@@ -94,8 +94,8 @@ public class PulsarPartitionSplit implements SourceSplit,
Serializable {
}
/** Open stop cursor. */
- public void open(PulsarAdmin admin) throws Exception {
- stopCursor.open(admin, partition);
+ public void open(PulsarClient client) throws Exception {
+ stopCursor.open(client, partition);
}
@Override
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
index b867e18..ec64ec8 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java
@@ -53,7 +53,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
@@ -70,7 +69,6 @@ import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.get
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRoutingMode;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
-import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.EXPLICIT;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
@@ -121,7 +119,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
helper.validateExcept(
PulsarOptions.CLIENT_CONFIG_PREFIX,
- PulsarOptions.ADMIN_CONFIG_PREFIX,
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -140,7 +137,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
// Forward source configs
final Properties properties = getPulsarProperties(tableOptions);
- properties.setProperty(PULSAR_ADMIN_URL.key(),
tableOptions.get(ADMIN_URL));
properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
// Set random subscriptionName if not provided
properties.setProperty(
@@ -194,7 +190,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
helper.validateExcept(
PulsarOptions.CLIENT_CONFIG_PREFIX,
- PulsarOptions.ADMIN_CONFIG_PREFIX,
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -215,7 +210,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
// Forward sink configs
final Properties properties = getPulsarProperties(tableOptions);
- properties.setProperty(PULSAR_ADMIN_URL.key(),
tableOptions.get(ADMIN_URL));
properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
// Retrieve physical DataType (not including computed or metadata
fields)
@@ -254,7 +248,7 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
@Override
public Set<ConfigOption<?>> requiredOptions() {
- return Stream.of(TOPICS, ADMIN_URL,
SERVICE_URL).collect(Collectors.toSet());
+ return Stream.of(TOPICS, SERVICE_URL).collect(Collectors.toSet());
}
@Override
@@ -284,7 +278,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory, DynamicTab
public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
TOPICS,
- ADMIN_URL,
SERVICE_URL,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_SUBSCRIPTION_NAME,
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
index 702f130..3db2bd4 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java
@@ -214,27 +214,6 @@ public final class PulsarTableOptions {
// Pulsar Options
//
--------------------------------------------------------------------------------------------
- /**
- * Exactly same as {@link
- *
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_ADMIN_URL}.
Copied here
- * because it is a required config option and should not be included in
the {@link
- *
org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)}
method.
- *
- * <p>By default all {@link
org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
- * included in the validateExcept() method./p>
- */
- public static final ConfigOption<String> ADMIN_URL =
- ConfigOptions.key("admin-url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- Description.builder()
- .text(
- "The Pulsar service HTTP URL for
the admin endpoint. For example, %s, or %s for TLS.",
-
code("http://my-broker.example.com:8080"),
-
code("https://my-broker.example.com:8443"))
- .build());
-
/**
* Exactly same as {@link
*
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_SERVICE_URL}.
Copied
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
index a972de0..7a769d0 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableFactory.java
@@ -55,7 +55,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
@@ -71,7 +70,6 @@ import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.get
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRouter;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
-import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.EXPLICIT;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
@@ -120,7 +118,6 @@ public class UpsertPulsarTableFactory
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
helper.validateExcept(
PulsarOptions.CLIENT_CONFIG_PREFIX,
- PulsarOptions.ADMIN_CONFIG_PREFIX,
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -134,7 +131,6 @@ public class UpsertPulsarTableFactory
// Forward source configs
final Properties properties = getPulsarProperties(tableOptions);
- properties.setProperty(PULSAR_ADMIN_URL.key(),
tableOptions.get(ADMIN_URL));
properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
// Set random subscriptionName if not provided
properties.setProperty(
@@ -196,7 +192,6 @@ public class UpsertPulsarTableFactory
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
helper.validateExcept(
PulsarOptions.CLIENT_CONFIG_PREFIX,
- PulsarOptions.ADMIN_CONFIG_PREFIX,
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
@@ -211,7 +206,6 @@ public class UpsertPulsarTableFactory
// Forward sink configs
final Properties properties = getPulsarProperties(tableOptions);
- properties.setProperty(PULSAR_ADMIN_URL.key(),
tableOptions.get(ADMIN_URL));
properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
// Retrieve physical DataType (not including computed or metadata
fields)
@@ -263,7 +257,7 @@ public class UpsertPulsarTableFactory
@Override
public Set<ConfigOption<?>> requiredOptions() {
- return Stream.of(TOPICS, ADMIN_URL,
SERVICE_URL).collect(Collectors.toSet());
+ return Stream.of(TOPICS, SERVICE_URL).collect(Collectors.toSet());
}
@Override
@@ -293,7 +287,6 @@ public class UpsertPulsarTableFactory
public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
TOPICS,
- ADMIN_URL,
SERVICE_URL,
SOURCE_SUBSCRIPTION_NAME,
SOURCE_START_FROM_MESSAGE_ID,
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
deleted file mode 100644
index 3c76edb..0000000
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandlerTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.handler;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
-
-import org.apache.pulsar.client.admin.Bookies;
-import org.apache.pulsar.client.admin.BrokerStats;
-import org.apache.pulsar.client.admin.Brokers;
-import org.apache.pulsar.client.admin.Clusters;
-import org.apache.pulsar.client.admin.Functions;
-import org.apache.pulsar.client.admin.Lookup;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.NonPersistentTopics;
-import org.apache.pulsar.client.admin.Packages;
-import org.apache.pulsar.client.admin.Properties;
-import org.apache.pulsar.client.admin.ProxyStats;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import org.apache.pulsar.client.admin.ResourceGroups;
-import org.apache.pulsar.client.admin.ResourceQuotas;
-import org.apache.pulsar.client.admin.Schemas;
-import org.apache.pulsar.client.admin.Sink;
-import org.apache.pulsar.client.admin.Sinks;
-import org.apache.pulsar.client.admin.Source;
-import org.apache.pulsar.client.admin.Sources;
-import org.apache.pulsar.client.admin.Tenants;
-import org.apache.pulsar.client.admin.TopicPolicies;
-import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.admin.Transactions;
-import org.apache.pulsar.client.admin.Worker;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RATES;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_RETRIES;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_REQUEST_WAIT_MILLIS;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Unit test of {@link PulsarAdminInvocationHandler}. */
-class PulsarAdminInvocationHandlerTest {
-
- @Test
- void retryWithFixedTimesOnRequestFailure() {
- int retryTimes = 5 + ThreadLocalRandom.current().nextInt(5);
- String errorMsg = "Always failed with reason: " +
randomAlphanumeric(10);
-
- PulsarAdminTestImpl admin = new PulsarAdminTestImpl(errorMsg);
-
- Configuration configuration = new Configuration();
- configuration.set(PULSAR_ADMIN_REQUEST_RETRIES, retryTimes);
- configuration.set(PULSAR_ADMIN_REQUEST_WAIT_MILLIS, 50L);
- configuration.set(PULSAR_ADMIN_REQUEST_RATES, 1000);
- SinkConfiguration sinkConfiguration = new
SinkConfiguration(configuration);
-
- PulsarAdminInvocationHandler handler =
- new PulsarAdminInvocationHandler(admin, sinkConfiguration);
-
- PulsarAdmin proxyAdmin =
- (PulsarAdmin)
- Proxy.newProxyInstance(
- PulsarAdmin.class.getClassLoader(),
- new Class[] {PulsarAdmin.class},
- handler);
-
- assertThatThrownBy(() ->
proxyAdmin.lookups().lookupPartitionedTopic("aa"))
- .isInstanceOf(PulsarAdminException.class)
- .hasMessage(errorMsg);
- assertThat(admin.lookup.callingTimes()).isEqualTo(retryTimes);
- }
-
- @Test
- void didNotRetryOnNotFoundException() {
- PulsarAdminTestImpl admin = new PulsarAdminTestImpl("not found");
-
- PulsarAdminInvocationHandler handler =
- new PulsarAdminInvocationHandler(admin, new
SinkConfiguration(new Configuration()));
-
- PulsarAdmin proxyAdmin =
- (PulsarAdmin)
- Proxy.newProxyInstance(
- PulsarAdmin.class.getClassLoader(),
- new Class[] {PulsarAdmin.class},
- handler);
-
- assertThatThrownBy(() -> proxyAdmin.lookups().lookupTopic("some"))
- .isInstanceOf(NotFoundException.class)
- .hasMessage("not found");
- assertThat(admin.lookup.callingTimes()).isEqualTo(1);
- }
-
- /** Test implementation for PulsarAdmin. */
- private static final class PulsarAdminTestImpl implements PulsarAdmin {
-
- private final LookupTestImpl lookup;
-
- private PulsarAdminTestImpl(String errorMsg) {
- this.lookup = new LookupTestImpl(errorMsg);
- }
-
- @Override
- public Clusters clusters() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Brokers brokers() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Tenants tenants() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public ResourceGroups resourcegroups() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Properties properties() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Namespaces namespaces() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Topics topics() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public TopicPolicies topicPolicies() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public TopicPolicies topicPolicies(boolean isGlobal) {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Bookies bookies() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public NonPersistentTopics nonPersistentTopics() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public ResourceQuotas resourceQuotas() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Lookup lookups() {
- lookup.resetTimes();
- return lookup;
- }
-
- @Override
- public Functions functions() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Source source() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Sources sources() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Sink sink() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Sinks sinks() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Worker worker() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public BrokerStats brokerStats() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public ProxyStats proxyStats() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public String getServiceUrl() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Schemas schemas() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Packages packages() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Transactions transactions() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public void close() {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
- }
-
- /** Test implementation for Lookup. */
- private static final class LookupTestImpl implements Lookup {
-
- private final String errorMsg;
- private int times;
-
- private LookupTestImpl(String errorMsg) {
- this.errorMsg = errorMsg;
- }
-
- @Override
- public String lookupTopic(String topic) throws PulsarAdminException {
- times++;
- throw new NotFoundException(
- new IllegalArgumentException("not found"), "not found",
404);
- }
-
- @Override
- public CompletableFuture<String> lookupTopicAsync(String topic) {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public Map<String, String> lookupPartitionedTopic(String topic)
- throws PulsarAdminException {
- times++;
- throw new PulsarAdminException(errorMsg);
- }
-
- @Override
- public CompletableFuture<Map<String, String>>
lookupPartitionedTopicAsync(String topic) {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public String getBundleRange(String topic) {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- @Override
- public CompletableFuture<String> getBundleRangeAsync(String topic) {
- throw new UnsupportedOperationException("We didn't support this
method in test.");
- }
-
- public int callingTimes() {
- return times;
- }
-
- public void resetTimes() {
- this.times = 0;
- }
- }
-}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
index e7bbd78..af3e6b7 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilderTest.java
@@ -97,16 +97,13 @@ class PulsarSinkBuilderTest {
}
@Test
- void serviceUrlAndAdminUrlMustBeProvided() {
+ void serviceUrlMustBeProvided() {
PulsarSinkBuilder<String> builder = PulsarSink.builder();
builder.setSerializationSchema(new SimpleStringSchema());
builder.setTopics("a", "b");
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
builder.setServiceUrl("pulsar://127.0.0.1:8888");
-
assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class);
-
- builder.setAdminUrl("http://127.0.0.1:9999");
assertThatCode(builder::build).doesNotThrowAnyException();
}
}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 6998929..18d035b 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -131,7 +131,6 @@ class PulsarSinkITCase {
PulsarSink<String> sink =
PulsarSink.builder()
.setServiceUrl(operator().serviceUrl())
- .setAdminUrl(operator().adminUrl())
.setDeliveryGuarantee(guarantee)
.setTopics(topic)
.setSerializationSchema(new SimpleStringSchema())
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
index 043ae27..f8c517c 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
@@ -31,9 +31,6 @@ class PulsarSourceBuilderTest {
PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
fillRequiredFields(builder);
- assertThatThrownBy(() -> builder.setAdminUrl("admin-url2"))
- .isInstanceOf(IllegalArgumentException.class);
-
assertThatThrownBy(() -> builder.setServiceUrl("service-url2"))
.isInstanceOf(IllegalArgumentException.class);
@@ -50,7 +47,6 @@ class PulsarSourceBuilderTest {
}
private void fillRequiredFields(PulsarSourceBuilder<String> builder) {
- builder.setAdminUrl("admin-url");
builder.setServiceUrl("service-url");
builder.setSubscriptionName("subscription-name");
builder.setTopics("topic");
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index 7cd685d..fc49073 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -56,7 +56,6 @@ class StopCursorTest extends PulsarTestSuiteBase {
PulsarPartitionSplitReader splitReader =
new PulsarPartitionSplitReader(
operator().client(),
- operator().admin(),
sourceConfig(),
Schema.BYTES,
PulsarCrypto.disabled(),
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
index 22edb71..27803ac 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
@@ -101,7 +101,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
@Test
void topicListSubscriber() throws Exception {
PulsarSubscriber subscriber =
getTopicListSubscriber(Arrays.asList(topic1, topic2));
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -120,7 +120,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
String partition = topicNameWithPartition(topic1, 2);
PulsarSubscriber subscriber =
getTopicListSubscriber(singletonList(partition));
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> partitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -132,7 +132,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
@Test
void subscribeNonPartitionedTopicList() throws Exception {
PulsarSubscriber subscriber =
getTopicListSubscriber(singletonList(topic4));
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> partitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -147,7 +147,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
getTopicPatternSubscriber(
Pattern.compile("flink/regex/pulsar-subscriber-non-partitioned-topic-.*"),
AllTopics);
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -165,7 +165,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
PulsarSubscriber subscriber =
getTopicPatternSubscriber(
Pattern.compile("flink/regex/pulsar-subscriber-topic-.*"), AllTopics);
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -185,7 +185,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
PulsarSubscriber subscriber =
getTopicPatternSubscriber(
Pattern.compile("pulsar-subscriber-simple-topic-.*"),
PersistentOnly);
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
@@ -205,7 +205,7 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
PulsarSubscriber subscriber =
getTopicPatternSubscriber(
Pattern.compile("pulsar-subscriber-simple-topic-.*"),
NonPersistentOnly);
- subscriber.open(operator().client(), operator().admin());
+ subscriber.open(operator().client());
Set<TopicPartition> topicPartitions =
subscriber.getSubscribedTopicPartitions(new
FullRangeGenerator(), NUM_PARALLELISM);
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index ab0bad8..fa06190 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -267,7 +267,6 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
private PulsarPartitionSplitReader splitReader() {
return new PulsarPartitionSplitReader(
operator().client(),
- operator().admin(),
sourceConfig(),
new BytesSchema(new PulsarSchema<>(STRING)),
PulsarCrypto.disabled(),
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
index 5c80d1c..fd4b4c0 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/GenericRecordDeserializationSchemaTest.java
@@ -82,7 +82,6 @@ class GenericRecordDeserializationSchemaTest extends
PulsarTestSuiteBase {
PulsarSource.builder()
.setDeserializationSchema(new
AvroGenericRecordDeserializer())
.setServiceUrl(operator().serviceUrl())
- .setAdminUrl(operator().adminUrl())
.setTopics(topic)
.setStartCursor(StartCursor.earliest())
.setBoundedStopCursor(StopCursor.latest())
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index 339ec99..de0a729 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -297,7 +297,6 @@ class PulsarDeserializationSchemaTest extends
PulsarTestSuiteBase {
return PulsarSource.builder()
.setDeserializationSchema(deserializationSchema)
.setServiceUrl(operator().serviceUrl())
- .setAdminUrl(operator().adminUrl())
.setTopics(topicName)
.setSubscriptionName(topicName + "-subscription")
.setBoundedStopCursor(StopCursor.latest())
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
index 59d8fbd..da22ee8 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
@@ -80,14 +80,12 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',"
+ " 'topics' = '%s',"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'value.format' = '%s',"
+ " 'pulsar.source.fetchOneMessageTime' =
'100'"
+ ")",
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
DebeziumJsonFormatFactory.IDENTIFIER);
String sinkDDL =
"CREATE TABLE debezium_sink ("
@@ -216,14 +214,12 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',"
+ " 'topics' = '%s',"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'value.format' = 'canal-json',"
+ " 'pulsar.source.fetchOneMessageTime' =
'100'"
+ ")",
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
CanalJsonFormatFactory.IDENTIFIER);
String sinkDDL =
"CREATE TABLE canal_sink ("
@@ -356,14 +352,12 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',"
+ " 'topics' = '%s',"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'value.format' = '%s',"
+ " 'pulsar.source.fetchOneMessageTime' =
'100'"
+ ")",
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
MaxwellJsonFormatFactory.IDENTIFIER);
String sinkDDL =
"CREATE TABLE maxwell_sink ("
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
index 7625c63..10c46b5 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableFactoryTest.java
@@ -59,11 +59,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static
org.apache.flink.connector.pulsar.table.PulsarTableFactory.UPSERT_DISABLED;
-import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
import static
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SERVICE_URL;
@@ -82,7 +80,6 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class PulsarTableFactoryTest {
private static final String TEST_TOPIC = "test-topic";
- private static final String TEST_ADMIN_URL =
"http://my-broker.example.com:8080";
private static final String TEST_SERVICE_URL = "pulsar://localhost:6650";
private static final String TEST_SUBSCRIPTION_NAME =
"default-subscription";
@@ -107,12 +104,10 @@ public class PulsarTableFactoryTest {
"%s.%s", TestFormatFactory.IDENTIFIER,
TestFormatFactory.FAIL_ON_MISSING.key());
static {
- EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(PULSAR_ADMIN_URL.key(),
TEST_ADMIN_URL);
EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(PULSAR_SERVICE_URL.key(),
TEST_SERVICE_URL);
EXPECTED_PULSAR_SOURCE_PROPERTIES.setProperty(
PULSAR_SUBSCRIPTION_NAME.key(), TEST_SUBSCRIPTION_NAME);
- EXPECTED_PULSAR_SINK_PROPERTIES.setProperty(PULSAR_ADMIN_URL.key(),
TEST_ADMIN_URL);
EXPECTED_PULSAR_SINK_PROPERTIES.setProperty(PULSAR_SERVICE_URL.key(),
TEST_SERVICE_URL);
}
@@ -371,7 +366,6 @@ public class PulsarTableFactoryTest {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
tableOptions.put(TOPICS.key(), TEST_TOPIC);
- tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
tableOptions.put(SOURCE_SUBSCRIPTION_NAME.key(),
TEST_SUBSCRIPTION_NAME);
// Format options.
@@ -385,7 +379,6 @@ public class PulsarTableFactoryTest {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
tableOptions.put(TOPICS.key(), TEST_TOPIC);
- tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
tableOptions.put(SOURCE_SUBSCRIPTION_NAME.key(),
TEST_SUBSCRIPTION_NAME);
// Format options.
@@ -404,7 +397,6 @@ public class PulsarTableFactoryTest {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
tableOptions.put(TOPICS.key(), TEST_TOPIC);
- tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
// Format options.
tableOptions.put(FORMAT.key(), TestFormatFactory.IDENTIFIER);
@@ -417,7 +409,6 @@ public class PulsarTableFactoryTest {
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put(CONNECTOR.key(), PulsarTableFactory.IDENTIFIER);
tableOptions.put(TOPICS.key(), TEST_TOPIC);
- tableOptions.put(ADMIN_URL.key(), TEST_ADMIN_URL);
tableOptions.put(SERVICE_URL.key(), TEST_SERVICE_URL);
// Format options.
tableOptions.put(FORMAT.key(), TestFormatFactory.IDENTIFIER);
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
index c1739bd..6538676 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableITCase.java
@@ -96,14 +96,12 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
randomTableName,
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createTable);
@@ -181,7 +179,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s',\n"
+ " 'key.format' = '%s',\n"
+ " 'key.fields' = 'user_id; event_id'\n"
@@ -190,7 +187,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format,
format);
@@ -238,7 +234,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'pulsar.producer.producerName' =
'pulsar-table-test',\n"
+ " 'format' = '%s'\n"
+ ")",
@@ -246,7 +241,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
topic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createTable);
@@ -312,14 +306,12 @@ public class PulsarTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sourceTableName,
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
RAW_FORMAT);
tableEnv.executeSql(createSourceTable);
@@ -349,7 +341,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sourceTableName,
@@ -357,7 +348,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSourceTable);
@@ -382,7 +372,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sinkTableName,
@@ -390,7 +379,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
sinkTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSinkTable);
@@ -418,14 +406,12 @@ public class PulsarTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sourceTableName,
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSourceTable);
@@ -452,14 +438,12 @@ public class PulsarTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sourceTableName,
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSourceTable);
@@ -493,14 +477,12 @@ public class PulsarTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sourceTableName,
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSourceTable);
@@ -517,14 +499,12 @@ public class PulsarTableITCase extends
PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
sinkTableName,
PulsarTableFactory.IDENTIFIER,
sinkTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format);
tableEnv.executeSql(createSinkTable);
@@ -549,7 +529,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ "
'pulsar.source.partitionDiscoveryIntervalMs' = '-1',\n"
+ " 'source.stop.at-message-id' = 'latest',\n"
+ " 'format' = '%s'\n"
@@ -558,7 +537,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
RAW_FORMAT);
tableEnv.executeSql(createSourceTable);
@@ -591,7 +569,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
+ " 'connector' = '%s',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'source.stop.at-message-id' = 'latest',\n"
+ " 'format' = '%s'\n"
+ ")",
@@ -599,7 +576,6 @@ public class PulsarTableITCase extends PulsarTableTestBase {
PulsarTableFactory.IDENTIFIER,
sourceTopic,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
RAW_FORMAT);
tableEnv.executeSql(createSourceTable);
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
index e01289c..e82ed8b 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableOptionsTest.java
@@ -408,14 +408,10 @@ public class PulsarTableOptionsTest extends
PulsarTableTestBase {
+ " `physical_3` BOOLEAN\n"
+ ") WITH (\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " %s\n"
+ " 'connector' = 'pulsar'"
+ ")",
- topicName,
- pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
- testConfigString);
+ topicName, pulsar.operator().serviceUrl(),
testConfigString);
tableEnv.executeSql(createTable);
}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
index 0173192..208264c 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/UpsertPulsarTableITCase.java
@@ -108,14 +108,12 @@ public class UpsertPulsarTableITCase extends
PulsarTableITCase {
+ " 'connector' = 'upsert-pulsar',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'key.format' = '%s',\n"
+ " 'value.format' = '%s'"
+ ")",
wordCountTable,
wordCountTable,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format,
format);
tableEnv.executeSql(createSinkTable);
@@ -168,7 +166,6 @@ public class UpsertPulsarTableITCase extends
PulsarTableITCase {
+ " 'connector' = 'pulsar',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'key.format' = '%s',\n"
+ " 'key.fields' = 'word',\n"
+ " 'value.format' = '%s'\n"
@@ -176,7 +173,6 @@ public class UpsertPulsarTableITCase extends
PulsarTableITCase {
rawWordCountTable,
wordCountTable,
pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
format,
format));
@@ -343,16 +339,10 @@ public class UpsertPulsarTableITCase extends
PulsarTableITCase {
+ " 'connector' = 'upsert-pulsar',\n"
+ " 'topics' = '%s',\n"
+ " 'service-url' = '%s',\n"
- + " 'admin-url' = '%s',\n"
+ " 'key.format' = '%s',\n"
+ " 'value.format' = '%s'"
+ ")",
- userTable,
- userTable,
- pulsar.operator().serviceUrl(),
- pulsar.operator().adminUrl(),
- format,
- format);
+ userTable, userTable, pulsar.operator().serviceUrl(),
format, format);
tableEnv.executeSql(createSinkTable);
String initialValues =
"INSERT INTO " + userTable + " " + "SELECT * " + "FROM
users_changelog_" + format;
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index bd4b243..060eeea 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -59,7 +59,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
-import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.getTcClient;
@@ -426,7 +425,6 @@ public class PulsarRuntimeOperator implements Closeable {
public Configuration config() {
Configuration configuration = new Configuration();
configuration.set(PULSAR_SERVICE_URL, serviceUrl());
- configuration.set(PULSAR_ADMIN_URL, adminUrl());
return configuration;
}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
index e96c3ac..3c384dd 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
@@ -69,7 +69,6 @@ public abstract class PulsarSinkTestContext extends
PulsarTestContext<String>
PulsarSinkBuilder<String> builder =
PulsarSink.builder()
.setServiceUrl(operator.serviceUrl())
- .setAdminUrl(operator.adminUrl())
.setDeliveryGuarantee(guarantee)
.setSerializationSchema(schema)
.enableSchemaEvolution()
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index 5a65a33..aece284 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -63,7 +63,6 @@ public abstract class PulsarSourceTestContext extends
PulsarTestContext<String>
PulsarSource.builder()
.setDeserializationSchema(schema)
.setServiceUrl(operator.serviceUrl())
- .setAdminUrl(operator.adminUrl())
.setTopicPattern(topicPattern(), AllTopics)
.setSubscriptionName(subscriptionName())
.setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL);