This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb1a2938d2 [improve][build] Drop deprecated pulsar-client-1x and
pulsar-client-2x-shaded modules (#23450)
4eb1a2938d2 is described below
commit 4eb1a2938d21650a972fd33791567707ae42c17d
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 14 07:38:58 2024 +0300
[improve][build] Drop deprecated pulsar-client-1x and
pulsar-client-2x-shaded modules (#23450)
---
pom.xml | 1 -
pulsar-bom/pom.xml | 15 -
pulsar-client-1x-base/pom.xml | 80 ----
pulsar-client-1x-base/pulsar-client-1x/pom.xml | 93 ----
.../pulsar/client/api/ClientConfiguration.java | 388 -----------------
.../org/apache/pulsar/client/api/Consumer.java | 331 --------------
.../pulsar/client/api/ConsumerConfiguration.java | 411 ------------------
.../apache/pulsar/client/api/MessageBuilder.java | 139 ------
.../apache/pulsar/client/api/MessageListener.java | 56 ---
.../org/apache/pulsar/client/api/Producer.java | 199 ---------
.../pulsar/client/api/ProducerConfiguration.java | 474 ---------------------
.../org/apache/pulsar/client/api/PulsarClient.java | 273 ------------
.../java/org/apache/pulsar/client/api/Reader.java | 81 ----
.../pulsar/client/api/ReaderConfiguration.java | 175 --------
.../apache/pulsar/client/api/ReaderListener.java | 52 ---
.../org/apache/pulsar/client/api/package-info.java | 22 -
.../pulsar/client/impl/MessageBuilderImpl.java | 115 -----
.../apache/pulsar/client/impl/package-info.java | 22 -
.../pulsar/client/impl/v1/ConsumerV1Impl.java | 176 --------
.../pulsar/client/impl/v1/ProducerV1Impl.java | 90 ----
.../pulsar/client/impl/v1/PulsarClientV1Impl.java | 172 --------
.../apache/pulsar/client/impl/v1/ReaderV1Impl.java | 85 ----
.../apache/pulsar/client/impl/v1/package-info.java | 22 -
.../src/main/resources/findbugsExclude.xml | 48 ---
.../pulsar-client-2x-shaded/pom.xml | 97 -----
25 files changed, 3617 deletions(-)
diff --git a/pom.xml b/pom.xml
index b89dd1597cc..5734c6fddac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2475,7 +2475,6 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-client-api</module>
<module>pulsar-client</module>
<module>pulsar-client-shaded</module>
- <module>pulsar-client-1x-base</module>
<module>pulsar-client-admin-api</module>
<module>pulsar-client-admin</module>
<module>pulsar-client-admin-shaded</module>
diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml
index e674301f18a..b4c271ff891 100644
--- a/pulsar-bom/pom.xml
+++ b/pulsar-bom/pom.xml
@@ -215,21 +215,6 @@
<artifactId>pulsar-cli-utils</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-1x-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-1x</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-2x-shaded</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin-api</artifactId>
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
deleted file mode 100644
index fedbe80cc62..00000000000
--- a/pulsar-client-1x-base/pom.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
- <version>4.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-client-1x-base</artifactId>
- <name>Pulsar Client 1.x Compatibility Base</name>
- <packaging>pom</packaging>
-
- <modules>
- <module>pulsar-client-2x-shaded</module>
- <module>pulsar-client-1x</module>
- </modules>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <release>${pulsar.client.compiler.release}</release>
- </configuration>
- </plugin>
- <plugin>
- <groupId>com.github.spotbugs</groupId>
- <artifactId>spotbugs-maven-plugin</artifactId>
- <version>${spotbugs-maven-plugin.version}</version>
- <executions>
- <execution>
- <id>spotbugs</id>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <executions>
- <execution>
- <id>checkstyle</id>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml
b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
deleted file mode 100644
index b9c8fa7d3eb..00000000000
--- a/pulsar-client-1x-base/pulsar-client-1x/pom.xml
+++ /dev/null
@@ -1,93 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-1x-base</artifactId>
- <version>4.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-client-1x</artifactId>
- <name>Pulsar Client 1.x Compatibility API</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-2x-shaded</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- </dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.gaul</groupId>
- <artifactId>modernizer-maven-plugin</artifactId>
- <configuration>
- <failOnViolations>true</failOnViolations>
- <javaVersion>8</javaVersion>
- </configuration>
- <executions>
- <execution>
- <id>modernizer</id>
- <phase>verify</phase>
- <goals>
- <goal>modernizer</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>com.github.spotbugs</groupId>
- <artifactId>spotbugs-maven-plugin</artifactId>
- <version>${spotbugs-maven-plugin.version}</version>
- <configuration>
-
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
- </configuration>
- <executions>
- <execution>
- <id>spotbugs</id>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
deleted file mode 100644
index 3b0efe64cf5..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-
-/**
- * Class used to specify client side configuration like authentication, etc..
- *
- * @deprecated Use {@link PulsarClient#builder()} to construct and configure a
new {@link PulsarClient} instance
- */
-@Deprecated
-public class ClientConfiguration implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final ClientConfigurationData confData = new
ClientConfigurationData();
-
- /**
- * @return the authentication provider to be used
- */
- public Authentication getAuthentication() {
- return confData.getAuthentication();
- }
-
- /**
- * Set the authentication provider to use in the Pulsar client instance.
- * <p>
- * Example:
- * <p>
- *
- * <pre>
- * <code>
- * ClientConfiguration confData = new ClientConfiguration();
- * String authPluginClassName =
"org.apache.pulsar.client.impl.auth.MyAuthentication";
- * String authParamsString = "key1:val1,key2:val2";
- * Authentication auth = AuthenticationFactory.create(authPluginClassName,
authParamsString);
- * confData.setAuthentication(auth);
- * PulsarClient client = PulsarClient.create(serviceUrl, confData);
- * ....
- * </code>
- * </pre>
- *
- * @param authentication
- */
- public void setAuthentication(Authentication authentication) {
- confData.setAuthentication(authentication);
- }
-
- /**
- * Set the authentication provider to use in the Pulsar client instance.
- * <p>
- * Example:
- * <p>
- *
- * <pre>
- * <code>
- * ClientConfiguration confData = new ClientConfiguration();
- * String authPluginClassName =
"org.apache.pulsar.client.impl.auth.MyAuthentication";
- * String authParamsString = "key1:val1,key2:val2";
- * confData.setAuthentication(authPluginClassName, authParamsString);
- * PulsarClient client = PulsarClient.create(serviceUrl, confData);
- * ....
- * </code>
- * </pre>
- *
- * @param authPluginClassName
- * name of the Authentication-Plugin you want to use
- * @param authParamsString
- * string which represents parameters for the
Authentication-Plugin, e.g., "key1:val1,key2:val2"
- * @throws UnsupportedAuthenticationException
- * failed to instantiate specified Authentication-Plugin
- */
- public void setAuthentication(String authPluginClassName, String
authParamsString)
- throws UnsupportedAuthenticationException {
-
confData.setAuthentication(AuthenticationFactory.create(authPluginClassName,
authParamsString));
- }
-
- /**
- * Set the authentication provider to use in the Pulsar client instance.
- * <p>
- * Example:
- * <p>
- *
- * <pre>
- * <code>
- * ClientConfiguration confData = new ClientConfiguration();
- * String authPluginClassName =
"org.apache.pulsar.client.impl.auth.MyAuthentication";
- * Map<String, String> authParams = new HashMap<String, String>();
- * authParams.put("key1", "val1");
- * confData.setAuthentication(authPluginClassName, authParams);
- * PulsarClient client = PulsarClient.create(serviceUrl, confData);
- * ....
- * </code>
- * </pre>
- *
- * @param authPluginClassName
- * name of the Authentication-Plugin you want to use
- * @param authParams
- * map which represents parameters for the Authentication-Plugin
- * @throws UnsupportedAuthenticationException
- * failed to instantiate specified Authentication-Plugin
- */
- public void setAuthentication(String authPluginClassName, Map<String,
String> authParams)
- throws UnsupportedAuthenticationException {
-
confData.setAuthentication(AuthenticationFactory.create(authPluginClassName,
authParams));
- }
-
- /**
- * @return the operation timeout in ms
- */
- public long getOperationTimeoutMs() {
- return confData.getOperationTimeoutMs();
- }
-
- /**
- * Set the operation timeout <i>(default: 30 seconds)</i>.
- * <p>
- * Producer-create, subscribe and unsubscribe operations will be retried
until this interval, after which the
- * operation will be marked as failed
- *
- * @param operationTimeout
- * operation timeout
- * @param unit
- * time unit for {@code operationTimeout}
- */
- public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
- checkArgument(operationTimeout >= 0);
- confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
- }
-
- /**
- * @return the number of threads to use for handling connections
- */
- public int getIoThreads() {
- return confData.getNumIoThreads();
- }
-
- /**
- * Set the number of threads to be used for handling connections to
brokers <i>(default: 1 thread)</i>.
- *
- * @param numIoThreads
- */
- public void setIoThreads(int numIoThreads) {
- checkArgument(numIoThreads > 0);
- confData.setNumIoThreads(numIoThreads);
- }
-
- /**
- * @return the number of threads to use for message listeners
- */
- public int getListenerThreads() {
- return confData.getNumListenerThreads();
- }
-
- /**
- * Set the number of threads to be used for message listeners <i>(default:
1 thread)</i>.
- *
- * @param numListenerThreads
- */
- public void setListenerThreads(int numListenerThreads) {
- checkArgument(numListenerThreads > 0);
- confData.setNumListenerThreads(numListenerThreads);
- }
-
- /**
- * @return the max number of connections per single broker
- */
- public int getConnectionsPerBroker() {
- return confData.getConnectionsPerBroker();
- }
-
- /**
- * Sets the max number of connection that the client library will open to
a single broker.
- * <p>
- * By default, the connection pool will use a single connection for all
the producers and consumers. Increasing this
- * parameter may improve throughput when using many producers over a high
latency connection.
- * <p>
- *
- * @param connectionsPerBroker
- * max number of connections per broker (needs to be greater
than 0)
- */
- public void setConnectionsPerBroker(int connectionsPerBroker) {
- checkArgument(connectionsPerBroker > 0, "Connections per broker need
to be greater than 0");
- confData.setConnectionsPerBroker(connectionsPerBroker);
- }
-
- /**
- * @return whether TCP no-delay should be set on the connections
- */
- public boolean isUseTcpNoDelay() {
- return confData.isUseTcpNoDelay();
- }
-
- /**
- * Configure whether to use TCP no-delay flag on the connection, to
disable Nagle algorithm.
- * <p>
- * No-delay features make sure packets are sent out on the network as soon
as possible, and it's critical to achieve
- * low latency publishes. On the other hand, sending out a huge number of
small packets might limit the overall
- * throughput, so if latency is not a concern, it's advisable to set the
<code>useTcpNoDelay</code> flag to false.
- * <p>
- * Default value is true
- *
- * @param useTcpNoDelay
- */
- public void setUseTcpNoDelay(boolean useTcpNoDelay) {
- confData.setUseTcpNoDelay(useTcpNoDelay);
- }
-
- /**
- * @return whether TLS encryption is used on the connection
- */
- public boolean isUseTls() {
- return confData.isUseTls();
- }
-
- /**
- * Configure whether to use TLS encryption on the connection <i>(default:
false)</i>.
- *
- * @param useTls
- */
- public void setUseTls(boolean useTls) {
- confData.setUseTls(useTls);
- }
-
- /**
- * @return path to the trusted TLS certificate file
- */
- public String getTlsTrustCertsFilePath() {
- return confData.getTlsTrustCertsFilePath();
- }
-
- /**
- * Set the path to the trusted TLS certificate file.
- *
- * @param tlsTrustCertsFilePath
- */
- public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
- confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
- }
-
- /**
- * @return whether the Pulsar client accept untrusted TLS certificate from
broker
- */
- public boolean isTlsAllowInsecureConnection() {
- return confData.isTlsAllowInsecureConnection();
- }
-
- /**
- * Configure whether the Pulsar client accept untrusted TLS certificate
from broker <i>(default: false)</i>.
- *
- * @param tlsAllowInsecureConnection
- */
- public void setTlsAllowInsecureConnection(boolean
tlsAllowInsecureConnection) {
- confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
- }
-
- /**
- * Stats will be activated with positive statsIntervalSeconds.
- *
- * @return the interval between each stat info <i>(default: 60 seconds)</i>
- */
- public long getStatsIntervalSeconds() {
- return confData.getStatsIntervalSeconds();
- }
-
- /**
- * Set the interval between each stat info <i>(default: 60 seconds)</i>
Stats will be activated with positive.
- * statsIntervalSeconds It should be set to at least 1 second
- *
- * @param statsInterval
- * the interval between each stat info
- * @param unit
- * time unit for {@code statsInterval}
- */
- public void setStatsInterval(long statsInterval, TimeUnit unit) {
- confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
- }
-
- /**
- * Get configured total allowed concurrent lookup-request.
- *
- * @return
- */
- public int getConcurrentLookupRequest() {
- return confData.getConcurrentLookupRequest();
- }
-
- /**
- * Number of concurrent lookup-requests allowed on each broker-connection
to prevent overload on broker.
- * <i>(default: 50000)</i> It should be configured with higher value only
in case of it requires to
- * produce/subscribe on thousands of topic using created {@link
PulsarClient}
- *
- * @param concurrentLookupRequest
- */
- public void setConcurrentLookupRequest(int concurrentLookupRequest) {
- confData.setConcurrentLookupRequest(concurrentLookupRequest);
- }
-
- /**
- * Get configured max number of reject-request in a time-frame (60
seconds) after which connection will be closed.
- *
- * @return
- */
- public int getMaxNumberOfRejectedRequestPerConnection() {
- return confData.getMaxNumberOfRejectedRequestPerConnection();
- }
-
- /**
- * Set max number of broker-rejected requests in a certain time-frame (60
seconds) after which current connection.
- * will be closed and client creates a new connection that give chance to
connect a different broker <i>(default:
- * 50)</i>
- *
- * @param maxNumberOfRejectedRequestPerConnection
- */
- public void setMaxNumberOfRejectedRequestPerConnection(int
maxNumberOfRejectedRequestPerConnection) {
-
confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
- }
-
- public boolean isTlsHostnameVerificationEnable() {
- return confData.isTlsHostnameVerificationEnable();
- }
-
- /**
- * It allows to validate hostname verification when client connects to
broker over tls. It validates incoming x509
- * certificate and matches provided hostname(CN/SAN) with expected
broker's host name. It follows RFC 2818, 3.1.
- * Server Identity hostname verification.
- *
- * @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
- *
- * @param tlsHostnameVerificationEnable
- */
- public void setTlsHostnameVerificationEnable(boolean
tlsHostnameVerificationEnable) {
-
confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
- }
-
- public ClientConfiguration setServiceUrl(String serviceUrl) {
- confData.setServiceUrl(serviceUrl);
- return this;
- }
-
- /**
- * Set the duration of time to wait for a connection to a broker to be
established. If the duration
- * passes without a response from the broker, the connection attempt is
dropped.
- *
- * @param duration the duration to wait
- * @param unit the time unit in which the duration is defined
- */
- public void setConnectionTimeout(int duration, TimeUnit unit) {
- confData.setConnectionTimeoutMs((int) unit.toMillis(duration));
- }
-
- /**
- * Get the duration of time for which the client will wait for a
connection to a broker to be
- * established before giving up.
- *
- * @return the duration, in milliseconds
- */
- public long getConnectionTimeoutMs() {
- return confData.getConnectionTimeoutMs();
- }
-
- public ClientConfigurationData getConfigurationData() {
- return confData;
- }
-
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
deleted file mode 100644
index d84b9981b03..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An interface that abstracts behavior of Pulsar's consumer.
- */
-public interface Consumer extends Closeable {
-
- /**
- * Get a topic for the consumer.
- *
- * @return topic for the consumer
- */
- String getTopic();
-
- /**
- * Get a subscription for the consumer.
- *
- * @return subscription for the consumer
- */
- String getSubscription();
-
- /**
- * Unsubscribe the consumer
- * <p>
- * This call blocks until the consumer is unsubscribed.
- *
- * @throws PulsarClientException
- */
- void unsubscribe() throws PulsarClientException;
-
- /**
- * Asynchronously unsubscribe the consumer.
- *
- * @return {@link CompletableFuture} for this operation
- */
- CompletableFuture<Void> unsubscribeAsync();
-
- /**
- * Receives a single message.
- * <p>
- * This calls blocks until a message is available.
- *
- * @return the received message
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws PulsarClientException.InvalidConfigurationException
- * if a message listener was defined in the configuration
- */
- Message<byte[]> receive() throws PulsarClientException;
-
- /**
- * Receive a single message
- * <p>
- * Retrieves a message when it will be available and completes {@link
CompletableFuture} with received message.
- * </p>
- * <p>
- * {@code receiveAsync()} should be called subsequently once returned
{@code CompletableFuture} gets complete with
- * received message. Else it creates <i> backlog of receive requests </i>
in the application.
- * </p>
- *
- * @return {@link CompletableFuture}<{@link Message}> will be completed
when message is available
- */
- CompletableFuture<Message<byte[]>> receiveAsync();
-
- /**
- * Receive a single message
- * <p>
- * Retrieves a message, waiting up to the specified wait time if necessary.
- *
- * @param timeout
- * 0 or less means immediate rather than infinite
- * @param unit
- * @return the received {@link Message} or null if no message available
before timeout
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- * @throws PulsarClientException.InvalidConfigurationException
- * if a message listener was defined in the configuration
- */
- Message<byte[]> receive(int timeout, TimeUnit unit) throws
PulsarClientException;
-
- /**
- * Acknowledge the consumption of a single message.
- *
- * @param message
- * The {@code Message} to be acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledge(Message<?> message) throws PulsarClientException;
-
- /**
- * Acknowledge the consumption of a single message, identified by its
MessageId.
- *
- * @param messageId
- * The {@code MessageId} to be acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledge(MessageId messageId) throws PulsarClientException;
-
- /**
- * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
- *
- * This method will block until the acknowledge has been sent to the
broker. After that, the messages will not be
- * re-delivered to this consumer.
- *
- * Cumulative acknowledge cannot be used when the consumer type is set to
ConsumerShared.
- *
- * It's equivalent to calling asyncAcknowledgeCumulative(Message) and
waiting for the callback to be triggered.
- *
- * @param message
- * The {@code Message} to be cumulatively acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledgeCumulative(Message<?> message) throws
PulsarClientException;
-
- /**
- * Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
- *
- * This method will block until the acknowledge has been sent to the
broker. After that, the messages will not be
- * re-delivered to this consumer.
- *
- * Cumulative acknowledge cannot be used when the consumer type is set to
ConsumerShared.
- *
- * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and
waiting for the callback to be triggered.
- *
- * @param messageId
- * The {@code MessageId} to be cumulatively acknowledged
- * @throws PulsarClientException.AlreadyClosedException
- * if the consumer was already closed
- */
- void acknowledgeCumulative(MessageId messageId) throws
PulsarClientException;
-
- /**
- * Asynchronously acknowledge the consumption of a single message.
- *
- * @param message
- * The {@code Message} to be acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeAsync(Message<?> message);
-
- /**
- * Asynchronously acknowledge the consumption of a single message.
- *
- * @param messageId
- * The {@code MessageId} to be acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeAsync(MessageId messageId);
-
- /**
- * Asynchronously Acknowledge the reception of all the messages in the
stream up to (and including) the provided
- * message.
- *
- * Cumulative acknowledge cannot be used when the consumer type is set to
ConsumerShared.
- *
- * @param message
- * The {@code Message} to be cumulatively acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message);
-
- /**
- * Asynchronously Acknowledge the reception of all the messages in the
stream up to (and including) the provided
- * message.
- *
- * Cumulative acknowledge cannot be used when the consumer type is set to
ConsumerShared.
- *
- * @param messageId
- * The {@code MessageId} to be cumulatively acknowledged
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId);
-
- /**
- * Get statistics for the consumer.
- *
- * <ul>
- * <li>numMsgsReceived : Number of messages received in the current
interval
- * <li>numBytesReceived : Number of bytes received in the current interval
- * <li>numReceiveFailed : Number of messages failed to receive in the
current interval
- * <li>numAcksSent : Number of acks sent in the current interval
- * <li>numAcksFailed : Number of acks failed to send in the current
interval
- * <li>totalMsgsReceived : Total number of messages received
- * <li>totalBytesReceived : Total number of bytes received
- * <li>totalReceiveFailed : Total number of messages failed to receive
- * <li>totalAcksSent : Total number of acks sent
- * <li>totalAcksFailed : Total number of acks failed to sent
- * </ul>
- *
- * @return statistic for the consumer
- */
- ConsumerStats getStats();
-
- /**
- * Close the consumer and stop the broker to push more messages.
- */
- @Override
- void close() throws PulsarClientException;
-
- /**
- * Asynchronously close the consumer and stop the broker to push more
messages.
- *
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> closeAsync();
-
- /**
- * Return true if the topic was terminated and this consumer has already
consumed all the messages in the topic.
- *
- * Please note that this does not simply mean that the consumer is caught
up with the last message published by
- * producers, rather the topic needs to be explicitly "terminated".
- */
- boolean hasReachedEndOfTopic();
-
- /**
- * Redelivers all the unacknowledged messages. In Failover mode, the
request is ignored if the consumer is not
- * active for the given topic. In Shared mode, the consumers messages to
be redelivered are distributed across all
- * the connected consumers. This is a non blocking call and doesn't throw
an exception. In case the connection
- * breaks, the messages are redelivered after reconnect.
- */
- void redeliverUnacknowledgedMessages();
-
- /**
- * Reset the subscription associated with this consumer to a specific
message id.
- * <p>
- *
- * The message id can either be a specific message or represent the first
or last messages in the topic.
- * <p>
- * <ul>
- * <li><code>MessageId.earliest</code> : Reset the subscription on the
earliest message available in the topic
- * <li><code>MessageId.latest</code> : Reset the subscription on the
latest message in the topic
- * </ul>
- *
- * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
- * the individual partitions.
- *
- * @param messageId
- * the message id where to reposition the subscription
- */
- void seek(MessageId messageId) throws PulsarClientException;
-
- /**
- * Reset the subscription associated with this consumer to a specific
message publish time.
- *
- * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
- * the individual partitions.
- *
- * @param timestamp
- * the message publish time where to reposition the subscription
- */
- void seek(long timestamp) throws PulsarClientException;
-
- /**
- * Reset the subscription associated with this consumer to a specific
message id.
- * <p>
- *
- * The message id can either be a specific message or represent the first
or last messages in the topic.
- * <p>
- * <ul>
- * <li><code>MessageId.earliest</code> : Reset the subscription on the
earliest message available in the topic
- * <li><code>MessageId.latest</code> : Reset the subscription on the
latest message in the topic
- * </ul>
- *
- * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
- * the individual partitions.
- *
- * @param messageId
- * the message id where to reposition the subscription
- * @return a future to track the completion of the seek operation
- */
- CompletableFuture<Void> seekAsync(MessageId messageId);
-
- /**
- * Reset the subscription associated with this consumer to a specific
message publish time.
- *
- * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
- * the individual partitions.
- *
- * @param timestamp
- * the message publish time where to reposition the subscription
- * @return a future to track the completion of the seek operation
- */
- CompletableFuture<Void> seekAsync(long timestamp);
-
- /**
- * @return Whether the consumer is connected to the broker
- */
- boolean isConnected();
-
- /**
- * Get the name of consumer.
- * @return consumer name.
- */
- String getConsumerName();
-
- /**
- * Stop requesting new messages from the broker until {@link #resume()} is
called. Note that this might cause
- * {@link #receive()} to block until {@link #resume()} is called and new
messages are pushed by the broker.
- */
- void pause();
-
- /**
- * Resume requesting messages from the broker.
- */
- void resume();
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
deleted file mode 100644
index 81956db56f7..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.v1.ConsumerV1Impl;
-/**
- * Class specifying the configuration of a consumer. In Exclusive
subscription, only a single consumer is allowed to
- * attach to the subscription. Other consumers will get an error message. In
Shared subscription, multiple consumers
- * will be able to use the same subscription name and the messages will be
dispatched in a round robin fashion.
- *
- * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a
{@link Consumer} instance
- */
-@Deprecated
-public class ConsumerConfiguration implements Serializable {
-
- /**
- * Resend shouldn't be requested before minAckTimeoutMillis.
- */
- static long minAckTimeoutMillis = 1000;
-
- private static final long serialVersionUID = 1L;
-
- private final ConsumerConfigurationData<byte[]> conf = new
ConsumerConfigurationData<>();
-
- private MessageListener<byte[]> messageListener;
-
- public ConsumerConfiguration() {
- // Disable acknowledgment grouping when using v1 API
- conf.setAcknowledgementsGroupTimeMicros(0);
- }
-
- /**
- * @return the configured timeout in milliseconds for unacked messages.
- */
- public long getAckTimeoutMillis() {
- return conf.getAckTimeoutMillis();
- }
-
- /**
- * Set the timeout for unacked messages, truncated to the nearest
millisecond. The timeout needs to be greater than
- * 10 seconds.
- *
- * @param ackTimeout
- * for unacked messages.
- * @param timeUnit
- * unit in which the timeout is provided.
- * @return {@link ConsumerConfiguration}
- */
- public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit
timeUnit) {
- long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
- checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
- "Ack timeout should be should be greater than " +
minAckTimeoutMillis + " ms");
- conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
- return this;
- }
-
- /**
- * @return the configured subscription type
- */
- public SubscriptionType getSubscriptionType() {
- return conf.getSubscriptionType();
- }
-
- /**
- * Select the subscription type to be used when subscribing to the topic.
- * <p>
- * Default is {@link SubscriptionType#Exclusive}
- *
- * @param subscriptionType
- * the subscription type value
- */
- public ConsumerConfiguration setSubscriptionType(SubscriptionType
subscriptionType) {
- Objects.requireNonNull(subscriptionType);
- conf.setSubscriptionType(subscriptionType);
- return this;
- }
-
- /**
- * @return the configured {@link MessageListener} for the consumer
- */
- public MessageListener<byte[]> getMessageListener() {
- return messageListener;
- }
-
- /**
- * Sets a {@link MessageListener} for the consumer
- * <p>
- * When a {@link MessageListener} is set, application will receive
messages through it. Calls to
- * {@link Consumer#receive()} will not be allowed.
- *
- * @param messageListener
- * the listener object
- */
- public ConsumerConfiguration setMessageListener(MessageListener<byte[]>
messageListener) {
- Objects.requireNonNull(messageListener);
- this.messageListener = messageListener;
- conf.setMessageListener(new
org.apache.pulsar.shade.client.api.v2.MessageListener<byte[]>() {
-
- @Override
- public void
received(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer,
Message<byte[]> msg) {
- messageListener.received(new ConsumerV1Impl(consumer), msg);
- }
-
- @Override
- public void
reachedEndOfTopic(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]>
consumer) {
- messageListener.reachedEndOfTopic(new
ConsumerV1Impl(consumer));
- }
- });
- return this;
- }
-
- /**
- * @return this configured {@link ConsumerEventListener} for the consumer.
- * @see #setConsumerEventListener(ConsumerEventListener)
- * @since 2.0
- */
- public ConsumerEventListener getConsumerEventListener() {
- return conf.getConsumerEventListener();
- }
-
- /**
- * Sets a {@link ConsumerEventListener} for the consumer.
- *
- * <p>
- * The consumer group listener is used for receiving consumer state change
in a consumer group for failover
- * subscription. Application can then react to the consumer state changes.
- *
- * <p>
- * This change is experimental. It is subject to changes coming in release
2.0.
- *
- * @param listener
- * the consumer group listener object
- * @return consumer configuration
- * @since 2.0
- */
- public ConsumerConfiguration
setConsumerEventListener(ConsumerEventListener listener) {
- Objects.requireNonNull(listener);
- conf.setConsumerEventListener(listener);
- return this;
- }
-
- /**
- * @return the configure receiver queue size value
- */
- public int getReceiverQueueSize() {
- return conf.getReceiverQueueSize();
- }
-
- /**
- * @return the configured max total receiver queue size across partitions
- */
- public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
- return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
- }
-
- /**
- * Set the max total receiver queue size across partitons.
- * <p>
- * This setting will be used to reduce the receiver queue size for
individual partitions
- * {@link #setReceiverQueueSize(int)} if the total exceeds this value
(default: 50000).
- *
- * @param maxTotalReceiverQueueSizeAcrossPartitions
- */
- public void setMaxTotalReceiverQueueSizeAcrossPartitions(int
maxTotalReceiverQueueSizeAcrossPartitions) {
- checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >=
conf.getReceiverQueueSize());
-
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
- }
-
- /**
- * @return the CryptoKeyReader
- */
- public CryptoKeyReader getCryptoKeyReader() {
- return conf.getCryptoKeyReader();
- }
-
- /**
- * Sets a {@link CryptoKeyReader}.
- *
- * @param cryptoKeyReader
- * CryptoKeyReader object
- */
- public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
- Objects.requireNonNull(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReader);
- return this;
- }
-
- /**
- * Sets the ConsumerCryptoFailureAction to the value specified.
- *
- * @param action
- * consumer action
- */
- public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
- conf.setCryptoFailureAction(action);
- }
-
- /**
- * @return The ConsumerCryptoFailureAction
- */
- public ConsumerCryptoFailureAction getCryptoFailureAction() {
- return conf.getCryptoFailureAction();
- }
-
- /**
- * Sets the size of the consumer receive queue.
- * <p>
- * The consumer receive queue controls how many messages can be
accumulated by the {@link Consumer} before the
- * application calls {@link Consumer#receive()}. Using a higher value
could potentially increase the consumer
- * throughput at the expense of bigger memory utilization.
- * </p>
- * <p>
- * <b>Setting the consumer queue size as zero</b>
- * <ul>
- * <li>Decreases the throughput of the consumer, by disabling pre-fetching
of messages. This approach improves the
- * message distribution on shared subscription, by pushing messages only
to the consumers that are ready to process
- * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned
Topics can be used if the consumer queue
- * size is zero. {@link Consumer#receive()} function call should not be
interrupted when the consumer queue size is
- * zero.</li>
- * <li>Doesn't support Batch-Message: if consumer receives any
batch-message then it closes consumer connection with
- * broker and {@link Consumer#receive()} call will remain blocked while
{@link Consumer#receiveAsync()} receives
- * exception in callback. <b> consumer will not be able receive any
further message unless batch-message in pipeline
- * is removed</b></li>
- * </ul>
- * </p>
- * Default value is {@code 1000} messages and should be good for most use
cases.
- *
- * @param receiverQueueSize
- * the new receiver queue size value
- */
- public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
- checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be
negative");
- conf.setReceiverQueueSize(receiverQueueSize);
- return this;
- }
-
- /**
- * @return the consumer name
- */
- public String getConsumerName() {
- return conf.getConsumerName();
- }
-
- /**
- * Set the consumer name.
- *
- * @param consumerName
- */
- public ConsumerConfiguration setConsumerName(String consumerName) {
- checkArgument(consumerName != null && !consumerName.equals(""));
- conf.setConsumerName(consumerName);
- return this;
- }
-
- public int getPriorityLevel() {
- return conf.getPriorityLevel();
- }
-
- /**
- * Sets priority level for the shared subscription consumers to which
broker gives more priority while dispatching
- * messages. Here, broker follows descending priorities. (eg:
0=max-priority, 1, 2,..) <br>
- * In Shared subscription mode, broker will first dispatch messages to max
priority-level consumers if they have
- * permits, else broker will consider next priority level consumers. <br>
- * If subscription has consumer-A with priorityLevel 0 and Consumer-B with
priorityLevel 1 then broker will dispatch
- * messages to only consumer-A until it runs out permit and then broker
starts dispatching messages to Consumer-B.
- *
- * <pre>
- * Consumer PriorityLevel Permits
- * C1 0 2
- * C2 0 1
- * C3 0 1
- * C4 1 2
- * C5 1 1
- * Order in which broker dispatches messages to consumers: C1, C2, C3, C1,
C4, C5, C4
- * </pre>
- *
- * @param priorityLevel
- */
- public void setPriorityLevel(int priorityLevel) {
- conf.setPriorityLevel(priorityLevel);
- }
-
- public boolean getReadCompacted() {
- return conf.isReadCompacted();
- }
-
- /**
- * If enabled, the consumer will read messages from the compacted topic
rather than reading the full message backlog
- * of the topic. This means that, if the topic has been compacted, the
consumer will only see the latest value for
- * each key in the topic, up until the point in the topic message backlog
that has been compacted. Beyond that
- * point, the messages will be sent as normal.
- *
- * readCompacted can only be enabled subscriptions to persistent topics,
which have a single active consumer (i.e.
- * failure or exclusive subscriptions). Attempting to enable it on
subscriptions to a non-persistent topics or on a
- * shared subscription, will lead to the subscription call throwing a
PulsarClientException.
- *
- * @param readCompacted
- * whether to read from the compacted topic
- */
- public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
- conf.setReadCompacted(readCompacted);
- return this;
- }
-
- /**
- * Set a name/value property with this consumer.
- *
- * @param key
- * @param value
- * @return
- */
- public ConsumerConfiguration setProperty(String key, String value) {
- checkArgument(key != null);
- checkArgument(value != null);
- conf.getProperties().put(key, value);
- return this;
- }
-
- /**
- * Add all the properties in the provided map.
- *
- * @param properties
- * @return
- */
- public ConsumerConfiguration setProperties(Map<String, String> properties)
{
- conf.getProperties().putAll(properties);
- return this;
- }
-
- public Map<String, String> getProperties() {
- return conf.getProperties();
- }
-
- public ConsumerConfigurationData<byte[]> getConfigurationData() {
- return conf;
- }
-
- /**
- * @param subscriptionInitialPosition the initial position at which to set
- * set cursor when subscribing to the topic first time
- * Default is {@value InitialPosition.Latest}
- */
- public ConsumerConfiguration setSubscriptionInitialPosition(
- SubscriptionInitialPosition subscriptionInitialPosition) {
- conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
- return this;
- }
-
- /**
- * @return the configured {@link subscriptionInitialPosition} for the
consumer
- */
- public SubscriptionInitialPosition getSubscriptionInitialPosition(){
- return conf.getSubscriptionInitialPosition();
- }
-
- /**
- * @return the configured {@link RedeliveryBackoff} for the consumer
- */
- public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
- return conf.getNegativeAckRedeliveryBackoff();
- }
-
- /**
- * @param negativeAckRedeliveryBackoff the negative ack redelivery backoff
policy.
- * Default value is: MultiplierRedeliveryBackoff
- * @return the {@link ConsumerConfiguration}
- */
- public ConsumerConfiguration
setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff)
{
- conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
- return this;
- }
-
- /**
- * @return the configured {@link RedeliveryBackoff} for the consumer
- */
- public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
- return conf.getAckTimeoutRedeliveryBackoff();
- }
-
- /**
- * @param ackTimeoutRedeliveryBackoff redelivery backoff policy for ack
timeout.
- * Default value is: MultiplierRedeliveryBackoff
- * @return the {@link ConsumerConfiguration}
- */
- public ConsumerConfiguration
setAckTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
- conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
- return this;
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
deleted file mode 100644
index 084312ed28c..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import org.apache.pulsar.client.impl.MessageBuilderImpl;
-
-/**
- * Message builder factory. Use this class to create messages to be send to
the Pulsar producer
- *
- * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by
{@link Producer#newMessage()} to create a new
- * message builder.
- */
-@Deprecated
-public interface MessageBuilder {
-
- static MessageBuilder create() {
- return new MessageBuilderImpl();
- }
-
- /**
- * Finalize the immutable message.
- *
- * @return a {@link Message} ready to be sent through a {@link Producer}
- */
- Message<byte[]> build();
-
- /**
- * Set the content of the message.
- *
- * @param data
- * array containing the payload
- */
- MessageBuilder setContent(byte[] data);
-
- /**
- * Set the content of the message.
- *
- * @param data
- * array containing the payload
- * @param offset
- * offset into the data array
- * @param length
- * length of the payload starting from the above offset
- */
- MessageBuilder setContent(byte[] data, int offset, int length);
-
- /**
- * Set the content of the message.
- *
- * @param buf
- * a {@link ByteBuffer} with the payload of the message
- */
- MessageBuilder setContent(ByteBuffer buf);
-
- /**
- * Sets a new property on a message.
- *
- * @param name
- * the name of the property
- * @param value
- * the associated value
- */
- MessageBuilder setProperty(String name, String value);
-
- /**
- * Add all the properties in the provided map.
- */
- MessageBuilder setProperties(Map<String, String> properties);
-
- /**
- * Sets the key of the message for routing policy.
- *
- * @param key
- */
- MessageBuilder setKey(String key);
-
- /**
- * Set the event time for a given message.
- *
- * <p>
- * Applications can retrieve the event time by calling {@link
Message#getEventTime()}.
- *
- * <p>
- * Note: currently pulsar doesn't support event-time based index. so the
subscribers can't seek the messages by
- * event time.
- *
- * @since 1.20.0
- */
- MessageBuilder setEventTime(long timestamp);
-
- /**
- * Specify a custom sequence id for the message being published.
- * <p>
- * The sequence id can be used for deduplication purposes and it needs to
follow these rules:
- * <ol>
- * <li><code>sequenceId >= 0</code>
- * <li>Sequence id for a message needs to be greater than sequence id for
earlier messages:
- * <code>sequenceId(N+1) > sequenceId(N)</code>
- * <li>It's not necessary for sequence ids to be consecutive. There can be
holes between messages. Eg. the
- * <code>sequenceId</code> could represent an offset or a cumulative size.
- * </ol>
- *
- * @param sequenceId
- * the sequence id to assign to the current message
- * @since 1.20.0
- */
- MessageBuilder setSequenceId(long sequenceId);
-
- /**
- * Override the replication clusters for this message.
- *
- * @param clusters
- */
- MessageBuilder setReplicationClusters(List<String> clusters);
-
- /**
- * Disable replication for this message.
- */
- MessageBuilder disableReplication();
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java
deleted file mode 100644
index 301740be398..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Serializable;
-
-/**
- * A listener that will be called in order for every message received.
- *
- *
- */
-public interface MessageListener<T> extends Serializable {
- /**
- * This method is called whenever a new message is received.
- *
- * Messages are guaranteed to be delivered in order and from the same
thread for a single consumer
- *
- * This method will only be called once for each message, unless either
application or broker crashes.
- *
- * Implementation should acknowledge messages by calling
consumer.acknowledge(msg).
- *
- * Application is responsible of handling any exception that could be
thrown while processing the message.
- *
- * @param consumer
- * the consumer that received the message
- * @param msg
- * the message object
- */
- void received(Consumer consumer, Message<T> msg);
-
- /**
- * Get the notification when a topic is terminated.
- *
- * @param consumer
- * the Consumer object associated with the terminated topic
- */
- default void reachedEndOfTopic(Consumer consumer) {
- // By default ignore the notification
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
deleted file mode 100644
index 0b431050377..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Producer object.
- *
- * The producer is used to publish messages on a topic
- *
- *
- */
-public interface Producer extends Closeable {
-
- /**
- * @return the topic which producer is publishing to
- */
- String getTopic();
-
- /**
- * @return the producer name which could have been assigned by the system
or specified by the client
- */
- String getProducerName();
-
- /**
- * Sends a message.
- * <p>
- * This call will be blocking until is successfully acknowledged by the
Pulsar broker.
- * <p>
- * Use {@link #newMessage()} to specify more properties than just the
value on the message to be sent.
- *
- * @param message
- * a message
- * @return the message id assigned to the published message
- * @throws PulsarClientException.TimeoutException
- * if the message was not correctly received by the system
within the timeout period
- * @throws PulsarClientException.AlreadyClosedException
- * if the producer was already closed
- */
- MessageId send(byte[] message) throws PulsarClientException;
-
- /**
- * Send a message asynchronously
- * <p>
- * When the producer queue is full, by default this method will complete
the future with an exception
- * {@link PulsarClientException.ProducerQueueIsFullError}
- * <p>
- * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the
producer queue size and
- * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the
blocking behavior.
- * <p>
- * Use {@link #newMessage()} to specify more properties than just the
value on the message to be sent.
- *
- * @param message
- * a byte array with the payload of the message
- * @return a future that can be used to track when the message will have
been safely persisted
- */
- CompletableFuture<MessageId> sendAsync(byte[] message);
-
- /**
- * Flush all the messages buffered in the client and wait until all
messages have been successfully persisted.
- *
- * @throws PulsarClientException
- * @since 2.1.0
- * @see #flushAsync()
- */
- void flush() throws PulsarClientException;
-
- /**
- * Flush all the messages buffered in the client asynchronously.
- *
- * @return a future that can be used to track when all the messages have
been safely persisted.
- * @since 2.1.0
- * @see #flush()
- */
- CompletableFuture<Void> flushAsync();
-
- /**
- * Send a message.
- *
- * @param message
- * a message
- * @return the message id assigned to the published message
- * @throws PulsarClientException.TimeoutException
- * if the message was not correctly received by the system
within the timeout period
- *
- * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by
{@link Producer.newMessage()} to create a
- * new message builder.
- */
- @Deprecated
- MessageId send(Message<byte[]> message) throws PulsarClientException;
-
- /**
- * Send a message asynchronously.
- * <p>
- * When the returned {@link CompletableFuture} is marked as completed
successfully, the provided message will
- * contain the {@link MessageId} assigned by the broker to the published
message.
- * <p>
- * Example:
- *
- * <pre>
- * <code>Message msg =
MessageBuilder.create().setContent(myContent).build();
- * producer.sendAsync(msg).thenRun(v -> {
- * System.out.println("Published message: " + msg.getMessageId());
- * }).exceptionally(e -> {
- * // Failed to publish
- * });</code>
- * </pre>
- * <p>
- * When the producer queue is full, by default this method will complete
the future with an exception
- * {@link PulsarClientException.ProducerQueueIsFullError}
- * <p>
- * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the
producer queue size and
- * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the
blocking behavior.
- *
- * @param message
- * a message
- * @return a future that can be used to track when the message will have
been safely persisted
- * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by
{@link Producer#newMessage()} to create a
- * new message builder.
- */
- @Deprecated
- CompletableFuture<MessageId> sendAsync(Message<byte[]> message);
-
- /**
- * Get the last sequence id that was published by this producer.
- * <p>
- * This represent either the automatically assigned or custom sequence id
(set on the {@link MessageBuilder}) that
- * was published and acknowledged by the broker.
- * <p>
- * After recreating a producer with the same producer name, this will
return the last message that was published in
- * the previous producer session, or -1 if there no message was ever
published.
- *
- * @return the last sequence id published by this producer
- */
- long getLastSequenceId();
-
- /**
- * Get statistics for the producer.
- *
- * <ul>
- * <li>numMsgsSent : Number of messages sent in the current interval
- * <li>numBytesSent : Number of bytes sent in the current interval
- * <li>numSendFailed : Number of messages failed to send in the current
interval
- * <li>numAcksReceived : Number of acks received in the current interval
- * <li>totalMsgsSent : Total number of messages sent
- * <li>totalBytesSent : Total number of bytes sent
- * <li>totalSendFailed : Total number of messages failed to send
- * <li>totalAcksReceived: Total number of acks received
- * </ul>
- *
- * @return statistic for the producer or null if ProducerStatsRecorderImpl
is disabled.
- */
- ProducerStats getStats();
-
- /**
- * Close the producer and releases resources allocated.
- *
- * No more writes will be accepted from this producer. Waits until all
pending write request are persisted. In case
- * of errors, pending writes will not be retried.
- *
- * @throws PulsarClientException.AlreadyClosedException
- * if the producer was already closed
- */
- @Override
- void close() throws PulsarClientException;
-
- /**
- * Close the producer and releases resources allocated.
- *
- * No more writes will be accepted from this producer. Waits until all
pending write request are persisted. In case
- * of errors, pending writes will not be retried.
- *
- * @return a future that can used to track when the producer has been
closed
- */
- CompletableFuture<Void> closeAsync();
-
- /**
- * @return Whether the producer is connected to the broker
- */
- boolean isConnected();
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
deleted file mode 100644
index 761c49ec242..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import lombok.EqualsAndHashCode;
-import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-
-/**
- * Producer's configuration.
- *
- * @deprecated use {@link PulsarClient#newProducer()} to construct and
configure a {@link Producer} instance
- */
-@Deprecated
-@EqualsAndHashCode
-public class ProducerConfiguration implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final ProducerConfigurationData conf = new
ProducerConfigurationData();
-
- @Deprecated
- public enum MessageRoutingMode {
- SinglePartition, RoundRobinPartition, CustomPartition
- }
-
- @Deprecated
- public enum HashingScheme {
- JavaStringHash, Murmur3_32Hash
- }
-
- /**
- * @return the configured custom producer name or null if no custom name
was specified
- * @since 1.20.0
- */
- public String getProducerName() {
- return conf.getProducerName();
- }
-
- /**
- * Specify a name for the producer
- * <p>
- * If not assigned, the system will generate a globally unique name which
can be access with
- * {@link Producer#getProducerName()}.
- * <p>
- * When specifying a name, it is app to the user to ensure that, for a
given topic, the producer name is unique
- * across all Pulsar's clusters.
- * <p>
- * If a producer with the same name is already connected to a particular
topic, the
- * {@link PulsarClient#createProducer(String)} operation will fail with
{@link ProducerBusyException}.
- *
- * @param producerName
- * the custom name to use for the producer
- * @since 1.20.0
- */
- public void setProducerName(String producerName) {
- conf.setProducerName(producerName);
- }
-
- /**
- * @return the message send timeout in ms
- */
- public long getSendTimeoutMs() {
- return conf.getSendTimeoutMs();
- }
-
- /**
- * Set the send timeout <i>(default: 30 seconds)</i>
- * <p>
- * If a message is not acknowledged by the server before the sendTimeout
expires, an error will be reported.
- *
- * @param sendTimeout
- * the send timeout
- * @param unit
- * the time unit of the {@code sendTimeout}
- */
- public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit
unit) {
- conf.setSendTimeoutMs(sendTimeout, unit);
- return this;
- }
-
- /**
- * @return the maximum number of messages allowed in the outstanding
messages queue for the producer
- */
- public int getMaxPendingMessages() {
- return conf.getMaxPendingMessages();
- }
-
- /**
- * Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
- * <p>
- * When the queue is full, by default, all calls to {@link Producer#send}
and {@link Producer#sendAsync} will fail
- * unless blockIfQueueFull is set to true. Use {@link
#setBlockIfQueueFull} to change the blocking behavior.
- *
- * @param maxPendingMessages
- * @return
- */
- public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages)
{
- conf.setMaxPendingMessages(maxPendingMessages);
- return this;
- }
-
- public HashingScheme getHashingScheme() {
- return HashingScheme.valueOf(conf.getHashingScheme().toString());
- }
-
- public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme)
{
-
conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
- return this;
- }
-
- /**
- *
- * @return the maximum number of pending messages allowed across all the
partitions
- */
- public int getMaxPendingMessagesAcrossPartitions() {
- return conf.getMaxPendingMessagesAcrossPartitions();
- }
-
- /**
- * Set the number of max pending messages across all the partitions
- * <p>
- * This setting will be used to lower the max pending messages for each
partition
- * ({@link #setMaxPendingMessages(int)}), if the total exceeds the
configured value.
- *
- * @param maxPendingMessagesAcrossPartitions
- */
- public void setMaxPendingMessagesAcrossPartitions(int
maxPendingMessagesAcrossPartitions) {
-
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
- }
-
- /**
- *
- * @return whether the producer will block {@link Producer#send} and
{@link Producer#sendAsync} operations when the
- * pending queue is full
- */
- public boolean getBlockIfQueueFull() {
- return conf.isBlockIfQueueFull();
- }
-
- /**
- * Set whether the {@link Producer#send} and {@link Producer#sendAsync}
operations should block when the outgoing
- * message queue is full.
- * <p>
- * Default is <code>false</code>. If set to <code>false</code>, send
operations will immediately fail with
- * {@link PulsarClientException.ProducerQueueIsFullError} when there is no
space left in pending queue.
- *
- * @param blockIfQueueFull
- * whether to block {@link Producer#send} and {@link
Producer#sendAsync} operations on queue full
- * @return
- */
- public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull)
{
- conf.setBlockIfQueueFull(blockIfQueueFull);
- return this;
- }
-
- /**
- * Set the message routing mode for the partitioned producer.
- *
- * @param messageRouteMode message routing mode.
- * @return producer configuration
- * @see MessageRoutingMode
- */
- public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode
messageRouteMode) {
- Objects.requireNonNull(messageRouteMode);
- conf.setMessageRoutingMode(
-
org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
- return this;
- }
-
- /**
- * Get the message routing mode for the partitioned producer.
- *
- * @return message routing mode, default is round-robin routing.
- * @see MessageRoutingMode#RoundRobinPartition
- */
- public MessageRoutingMode getMessageRoutingMode() {
- return
MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
- }
-
- /**
- * Set the compression type for the producer.
- * <p>
- * By default, message payloads are not compressed. Supported compression
types are:
- * <ul>
- * <li><code>CompressionType.LZ4</code></li>
- * <li><code>CompressionType.ZLIB</code></li>
- * </ul>
- *
- * @param compressionType
- * @return
- *
- * @since 1.0.28 <br>
- * Make sure all the consumer applications have been updated to use
this client version, before starting to
- * compress messages.
- */
- public ProducerConfiguration setCompressionType(CompressionType
compressionType) {
- conf.setCompressionType(compressionType);
- return this;
- }
-
- /**
- * @return the configured compression type for this producer
- */
- public CompressionType getCompressionType() {
- return conf.getCompressionType();
- }
-
- /**
- * Set a custom message routing policy by passing an implementation of
MessageRouter.
- *
- *
- * @param messageRouter
- */
- public ProducerConfiguration setMessageRouter(MessageRouter messageRouter)
{
- Objects.requireNonNull(messageRouter);
- setMessageRoutingMode(MessageRoutingMode.CustomPartition);
- conf.setCustomMessageRouter(messageRouter);
- return this;
- }
-
- /**
- * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
- *
- * @return message router.
- * @deprecated since 1.22.0-incubating. <tt>numPartitions</tt> is already
passed as parameter in
- * {@link MessageRouter#choosePartition(Message,
TopicMetadata)}.
- * @see MessageRouter
- */
- @Deprecated
- public MessageRouter getMessageRouter(int numPartitions) {
- return conf.getCustomMessageRouter();
- }
-
- /**
- * Get the message router set by {@link #setMessageRouter(MessageRouter)}.
- *
- * @return message router set by {@link #setMessageRouter(MessageRouter)}.
- */
- public MessageRouter getMessageRouter() {
- return conf.getCustomMessageRouter();
- }
-
- /**
- * Return the flag whether automatic message batching is enabled or not.
- *
- * @return true if batch messages are enabled. otherwise false.
- * @since 2.0.0 <br>
- * It is enabled by default.
- */
- public boolean getBatchingEnabled() {
- return conf.isBatchingEnabled();
- }
-
- /**
- * Control whether automatic batching of messages is enabled for the
producer. <i>default: false [No batching]</i>
- *
- * When batching is enabled, multiple calls to Producer.sendAsync can
result in a single batch to be sent to the
- * broker, leading to better throughput, especially when publishing small
messages. If compression is enabled,
- * messages will be compressed at the batch level, leading to a much
better compression ratio for similar headers or
- * contents.
- *
- * When enabled default batch delay is set to 1 ms and default batch size
is 1000 messages
- *
- * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
- * @since 1.0.36 <br>
- * Make sure all the consumer applications have been updated to use
this client version, before starting to
- * batch messages.
- *
- */
- public ProducerConfiguration setBatchingEnabled(boolean
batchMessagesEnabled) {
- conf.setBatchingEnabled(batchMessagesEnabled);
- return this;
- }
-
- /**
- * @return the CryptoKeyReader
- */
- public CryptoKeyReader getCryptoKeyReader() {
- return conf.getCryptoKeyReader();
- }
-
- /**
- * Sets a {@link CryptoKeyReader}.
- *
- * @param cryptoKeyReader
- * CryptoKeyReader object
- */
- public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
- Objects.requireNonNull(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReader);
- return this;
- }
-
- /**
- *
- * @return encryptionKeys
- *
- */
- public Set<String> getEncryptionKeys() {
- return conf.getEncryptionKeys();
- }
-
- /**
- *
- * Returns true if encryption keys are added.
- *
- */
- public boolean isEncryptionEnabled() {
- return conf.isEncryptionEnabled();
- }
-
- /**
- * Add public encryption key, used by producer to encrypt the data key.
- *
- * At the time of producer creation, Pulsar client checks if there are
keys added to encryptionKeys. If keys are
- * found, a callback getKey(String keyName) is invoked against each key to
load the values of the key. Application
- * should implement this callback to return the key in pkcs8 format. If
compression is enabled, message is encrypted
- * after compression. If batch messaging is enabled, the batched message
is encrypted.
- *
- */
- public void addEncryptionKey(String key) {
- conf.getEncryptionKeys().add(key);
- }
-
- public void removeEncryptionKey(String key) {
- conf.getEncryptionKeys().remove(key);
- }
-
- /**
- * Sets the ProducerCryptoFailureAction to the value specified.
- *
- * @param action
- * The producer action
- */
- public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
- conf.setCryptoFailureAction(action);
- }
-
- /**
- * @return The ProducerCryptoFailureAction
- */
- public ProducerCryptoFailureAction getCryptoFailureAction() {
- return conf.getCryptoFailureAction();
- }
-
- /**
- *
- * @return the batch time period in ms.
- * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
- */
- public long getBatchingMaxPublishDelayMs() {
- return
TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
- }
-
- /**
- * Set the time period within which the messages sent will be batched
<i>default: 1ms</i> if batch messages are
- * enabled. If set to a non zero value, messages will be queued until this
time interval or until
- *
- * @see ProducerConfiguration#batchingMaxMessages threshold is reached;
all messages will be published as a single
- * batch message. The consumer will be delivered individual messages
in the batch in the same order they were
- * enqueued
- * @since 1.0.36 <br>
- * Make sure all the consumer applications have been updated to use
this client version, before starting to
- * batch messages.
- * @param batchDelay
- * the batch delay
- * @param timeUnit
- * the time unit of the {@code batchDelay}
- * @return
- */
- public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay,
TimeUnit timeUnit) {
- conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
- return this;
- }
-
- /**
- *
- * @return the maximum number of messages permitted in a batch.
- */
- public int getBatchingMaxMessages() {
- return conf.getBatchingMaxMessages();
- }
-
- /**
- * Set the maximum number of messages permitted in a batch. <i>default:
1000</i> If set to a value greater than 1,
- * messages will be queued until this threshold is reached or batch
interval has elapsed
- *
- * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
All messages in batch will be published as
- * a single batch message. The consumer will be delivered individual
messages in the batch in the same order
- * they were enqueued
- * @param batchMessagesMaxMessagesPerBatch
- * maximum number of messages in a batch
- * @return
- */
- public ProducerConfiguration setBatchingMaxMessages(int
batchMessagesMaxMessagesPerBatch) {
- conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
- return this;
- }
-
- public Optional<Long> getInitialSequenceId() {
- return Optional.ofNullable(conf.getInitialSequenceId());
- }
-
- /**
- * Set the baseline for the sequence ids for messages published by the
producer.
- * <p>
- * First message will be using (initialSequenceId + 1) as its sequence id
and subsequent messages will be assigned
- * incremental sequence ids, if not otherwise specified.
- *
- * @param initialSequenceId
- * @return
- */
- public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
- conf.setInitialSequenceId(initialSequenceId);
- return this;
- }
-
- /**
- * Set a name/value property with this producer.
- *
- * @param key
- * @param value
- * @return
- */
- public ProducerConfiguration setProperty(String key, String value) {
- checkArgument(key != null);
- checkArgument(value != null);
- conf.getProperties().put(key, value);
- return this;
- }
-
- /**
- * Add all the properties in the provided map.
- *
- * @param properties
- * @return
- */
- public ProducerConfiguration setProperties(Map<String, String> properties)
{
- conf.getProperties().putAll(properties);
- return this;
- }
-
- public Map<String, String> getProperties() {
- return conf.getProperties();
- }
-
- public ProducerConfigurationData getProducerConfigurationData() {
- return conf;
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
deleted file mode 100644
index 8ac1dfb71e0..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl;
-
-/**
- * Class that provides a client interface to Pulsar.
- * <p>
- * Client instances are thread-safe and can be reused for managing multiple
{@link Producer}, {@link Consumer} and
- * {@link Reader} instances.
- */
-public interface PulsarClient extends Closeable {
-
- /**
- * Create a new PulsarClient object using default client configuration.
- *
- * @param serviceUrl
- * the url of the Pulsar endpoint to be used
- * @return a new pulsar client object
- * @throws PulsarClientException.InvalidServiceURL
- * if the serviceUrl is invalid
- * @deprecated use {@link #builder()} to construct a client instance
- */
- @Deprecated
- static PulsarClient create(String serviceUrl) throws
PulsarClientException {
- return create(serviceUrl, new ClientConfiguration());
- }
-
- /**
- * Create a new PulsarClient object.
- *
- * @param serviceUrl
- * the url of the Pulsar endpoint to be used
- * @param conf
- * the client configuration
- * @return a new pulsar client object
- * @throws PulsarClientException.InvalidServiceURL
- * if the serviceUrl is invalid
- * @deprecated use {@link #builder()} to construct a client instance
- */
- @Deprecated
- static PulsarClient create(String serviceUrl, ClientConfiguration conf)
throws PulsarClientException {
- return new PulsarClientV1Impl(serviceUrl, conf);
- }
-
- /**
- * Create a producer with default {@link ProducerConfiguration} for
publishing on a specific topic.
- *
- * @param topic
- * The name of the topic where to produce
- * @return The producer object
- * @throws PulsarClientException.AlreadyClosedException
- * if the client was already closed
- * @throws PulsarClientException.InvalidTopicNameException
- * if the topic name is not valid
- * @throws PulsarClientException.AuthenticationException
- * if there was an error with the supplied credentials
- * @throws PulsarClientException.AuthorizationException
- * if the authorization to publish on topic was denied
- * @deprecated use {@link #newProducer()} to build a new producer
- */
- @Deprecated
- Producer createProducer(String topic) throws PulsarClientException;
-
- /**
- * Asynchronously create a producer with default {@link
ProducerConfiguration} for publishing on a specific topic.
- *
- * @param topic
- * The name of the topic where to produce
- * @return Future of the asynchronously created producer object
- * @deprecated use {@link #newProducer()} to build a new producer
- */
- @Deprecated
- CompletableFuture<Producer> createProducerAsync(String topic);
-
- /**
- * Create a producer with given {@code ProducerConfiguration} for
publishing on a specific topic.
- *
- * @param topic
- * The name of the topic where to produce
- * @param conf
- * The {@code ProducerConfiguration} object
- * @return The producer object
- * @throws PulsarClientException
- * if it was not possible to create the producer
- * @throws InterruptedException
- * @deprecated use {@link #newProducer()} to build a new producer
- */
- @Deprecated
- Producer createProducer(String topic, ProducerConfiguration conf) throws
PulsarClientException;
-
- /**
- * Asynchronously create a producer with given {@code
ProducerConfiguration} for publishing on a specific topic.
- *
- * @param topic
- * The name of the topic where to produce
- * @param conf
- * The {@code ProducerConfiguration} object
- * @return Future of the asynchronously created producer object
- * @deprecated use {@link #newProducer()} to build a new producer
- */
- @Deprecated
- CompletableFuture<Producer> createProducerAsync(String topic,
ProducerConfiguration conf);
-
- /**
- * Subscribe to the given topic and subscription combination with default
{@code ConsumerConfiguration}.
- *
- * @param topic
- * The name of the topic
- * @param subscription
- * The name of the subscription
- * @return The {@code Consumer} object
- * @throws PulsarClientException
- * @throws InterruptedException
- *
- * @deprecated Use {@link #newConsumer()} to build a new consumer
- */
- @Deprecated
- Consumer subscribe(String topic, String subscription) throws
PulsarClientException;
-
- /**
- * Asynchronously subscribe to the given topic and subscription
combination using default.
- * {@code ConsumerConfiguration}
- *
- * @param topic
- * The topic name
- * @param subscription
- * The subscription name
- * @return Future of the {@code Consumer} object
- * @deprecated Use {@link #newConsumer()} to build a new consumer
- */
- @Deprecated
- CompletableFuture<Consumer> subscribeAsync(String topic, String
subscription);
-
- /**
- * Subscribe to the given topic and subscription combination with given
{@code ConsumerConfiguration}.
- *
- * @param topic
- * The name of the topic
- * @param subscription
- * The name of the subscription
- * @param conf
- * The {@code ConsumerConfiguration} object
- * @return The {@code Consumer} object
- * @throws PulsarClientException
- * @deprecated Use {@link #newConsumer()} to build a new consumer
- */
- @Deprecated
- Consumer subscribe(String topic, String subscription,
ConsumerConfiguration conf) throws PulsarClientException;
-
- /**
- * Asynchronously subscribe to the given topic and subscription
combination using given.
- * {@code ConsumerConfiguration}
- *
- * @param topic
- * The name of the topic
- * @param subscription
- * The name of the subscription
- * @param conf
- * The {@code ConsumerConfiguration} object
- * @return Future of the {@code Consumer} object
- * @deprecated Use {@link #newConsumer()} to build a new consumer
- */
- @Deprecated
- CompletableFuture<Consumer> subscribeAsync(String topic, String
subscription, ConsumerConfiguration conf);
-
- /**
- * Create a topic reader with given {@code ReaderConfiguration} for
reading messages from the specified topic.
- * <p>
- * The Reader provides a low-level abstraction that allows for manual
positioning in the topic, without using a
- * subscription. Reader can only work on non-partitioned topics.
- * <p>
- * The initial reader positioning is done by specifying a message id. The
options are:
- * <ul>
- * <li><code>MessageId.earliest</code> : Start reading from the earliest
message available in the topic
- * <li><code>MessageId.latest</code> : Start reading from the end topic,
only getting messages published after the
- * reader was created
- * <li><code>MessageId</code> : When passing a particular message id, the
reader will position itself on that
- * specific position. The first message to be read will be the message
next to the specified messageId.
- * </ul>
- *
- * @param topic
- * The name of the topic where to read
- * @param startMessageId
- * The message id where the reader will position itself. The
first message returned will be the one after
- * the specified startMessageId
- * @param conf
- * The {@code ReaderConfiguration} object
- * @return The {@code Reader} object
- * @deprecated Use {@link #newReader()} to build a new reader
- */
- @Deprecated
- Reader createReader(String topic, MessageId startMessageId,
ReaderConfiguration conf) throws PulsarClientException;
-
- /**
- * Asynchronously create a topic reader with given {@code
ReaderConfiguration} for reading messages from the
- * specified topic.
- * <p>
- * The Reader provides a low-level abstraction that allows for manual
positioning in the topic, without using a
- * subscription. Reader can only work on non-partitioned topics.
- * <p>
- * The initial reader positioning is done by specifying a message id. The
options are:
- * <ul>
- * <li><code>MessageId.earliest</code> : Start reading from the earliest
message available in the topic
- * <li><code>MessageId.latest</code> : Start reading from the end topic,
only getting messages published after the
- * reader was created
- * <li><code>MessageId</code> : When passing a particular message id, the
reader will position itself on that
- * specific position. The first message to be read will be the message
next to the specified messageId.
- * </ul>
- *
- * @param topic
- * The name of the topic where to read
- * @param startMessageId
- * The message id where the reader will position itself. The
first message returned will be the one after
- * the specified startMessageId
- * @param conf
- * The {@code ReaderConfiguration} object
- * @return Future of the asynchronously created producer object
- * @deprecated Use {@link #newReader()} to build a new reader
- */
- @Deprecated
- CompletableFuture<Reader> createReaderAsync(String topic, MessageId
startMessageId, ReaderConfiguration conf);
-
- /**
- * Close the PulsarClient and release all the resources.
- *
- * All the producers and consumers will be orderly closed. Waits until all
pending write request are persisted.
- *
- * @throws PulsarClientException
- * if the close operation fails
- */
- @Override
- void close() throws PulsarClientException;
-
- /**
- * Asynchronously close the PulsarClient and release all the resources.
- *
- * All the producers and consumers will be orderly closed. Waits until all
pending write request are persisted.
- *
- * @throws PulsarClientException
- * if the close operation fails
- */
- CompletableFuture<Void> closeAsync();
-
- /**
- * Perform immediate shutdown of PulsarClient.
- *
- * Release all the resources and close all the producers without waiting
for ongoing operations to complete.
- *
- * @throws PulsarClientException
- * if the forceful shutdown fails
- */
- void shutdown() throws PulsarClientException;
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
deleted file mode 100644
index 98fcdb453bb..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A Reader can be used to scan through all the messages currently available
in a topic.
- *
- */
-public interface Reader extends Closeable {
-
- /**
- * @return the topic from which this reader is reading from
- */
- String getTopic();
-
- /**
- * Read the next message in the topic.
- *
- * @return the next message
- * @throws PulsarClientException
- */
- Message<byte[]> readNext() throws PulsarClientException;
-
- /**
- * Read the next message in the topic waiting for a maximum of timeout
- * time units. Returns null if no message is received in that time.
- *
- * @return the next message(Could be null if none received in time)
- * @throws PulsarClientException
- */
- Message<byte[]> readNext(int timeout, TimeUnit unit) throws
PulsarClientException;
-
- CompletableFuture<Message<byte[]>> readNextAsync();
-
- /**
- * Asynchronously close the reader and stop the broker to push more
messages.
- *
- * @return a future that can be used to track the completion of the
operation
- */
- CompletableFuture<Void> closeAsync();
-
- /**
- * Return true if the topic was terminated and this reader has reached the
end of the topic.
- */
- boolean hasReachedEndOfTopic();
-
- /**
- * Check if there is any message available to read from the current
position.
- */
- boolean hasMessageAvailable() throws PulsarClientException;
-
- /**
- * Asynchronously Check if there is message that has been published
successfully to the broker in the topic.
- */
- CompletableFuture<Boolean> hasMessageAvailableAsync();
-
- /**
- * @return Whether the reader is connected to the broker
- */
- boolean isConnected();
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
deleted file mode 100644
index 885436a1133..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.pulsar.client.impl.v1.ReaderV1Impl;
-
-/**
- *
- * @deprecated Use {@link PulsarClient#newReader()} to construct and configure
a {@link Reader} instance
- */
-@Deprecated
-public class ReaderConfiguration implements Serializable {
-
- private final ReaderConfigurationData<byte[]> conf = new
ReaderConfigurationData<>();
-
- private ReaderListener<byte[]> readerListener;
-
- /**
- * @return the configured {@link ReaderListener} for the reader
- */
- public ReaderListener<byte[]> getReaderListener() {
- return readerListener;
- }
-
- /**
- * Sets a {@link ReaderListener} for the reader
- * <p>
- * When a {@link ReaderListener} is set, application will receive messages
through it. Calls to
- * {@link Reader#readNext()} will not be allowed.
- *
- * @param readerListener
- * the listener object
- */
- public ReaderConfiguration setReaderListener(ReaderListener<byte[]>
readerListener) {
- Objects.requireNonNull(readerListener);
- this.readerListener = readerListener;
- conf.setReaderListener(new
org.apache.pulsar.shade.client.api.v2.ReaderListener<byte[]>() {
-
- @Override
- public void
received(org.apache.pulsar.shade.client.api.v2.Reader<byte[]> v2Reader,
Message<byte[]> msg) {
- readerListener.received(new ReaderV1Impl(v2Reader), msg);
- }
-
- @Override
- public void
reachedEndOfTopic(org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader) {
- readerListener.reachedEndOfTopic(new ReaderV1Impl(reader));
- }
- });
- return this;
- }
-
- /**
- * @return the configure receiver queue size value
- */
- public int getReceiverQueueSize() {
- return conf.getReceiverQueueSize();
- }
-
- /**
- * @return the CryptoKeyReader
- */
- public CryptoKeyReader getCryptoKeyReader() {
- return conf.getCryptoKeyReader();
- }
-
- /**
- * Sets a {@link CryptoKeyReader}.
- *
- * @param cryptoKeyReader
- * CryptoKeyReader object
- */
- public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
- Objects.requireNonNull(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReader);
- return this;
- }
-
- /**
- * Sets the ConsumerCryptoFailureAction to the value specified.
- *
- * @param action
- * The action to take when the decoding fails
- */
- public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
- conf.setCryptoFailureAction(action);
- }
-
- /**
- * @return The ConsumerCryptoFailureAction
- */
- public ConsumerCryptoFailureAction getCryptoFailureAction() {
- return conf.getCryptoFailureAction();
- }
-
- /**
- * Sets the size of the consumer receive queue.
- * <p>
- * The consumer receive queue controls how many messages can be
accumulated by the {@link Consumer} before the
- * application calls {@link Consumer#receive()}. Using a higher value
could potentially increase the consumer
- * throughput at the expense of bigger memory utilization.
- * </p>
- * Default value is {@code 1000} messages and should be good for most use
cases.
- *
- * @param receiverQueueSize
- * the new receiver queue size value
- */
- public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
- checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be
negative");
- conf.setReceiverQueueSize(receiverQueueSize);
- return this;
- }
-
- /**
- * @return the consumer name
- */
- public String getReaderName() {
- return conf.getReaderName();
- }
-
- /**
- * Set the consumer name.
- *
- * @param readerName
- */
- public ReaderConfiguration setReaderName(String readerName) {
- checkArgument(StringUtils.isNotBlank(readerName));
- conf.setReaderName(readerName);
- return this;
- }
-
- /**
- * @return the subscription role prefix for subscription auth
- */
- public String getSubscriptionRolePrefix() {
- return conf.getSubscriptionRolePrefix();
- }
-
- /**
- * Set the subscription role prefix for subscription auth. The default
prefix is "reader".
- *
- * @param subscriptionRolePrefix
- */
- public ReaderConfiguration setSubscriptionRolePrefix(String
subscriptionRolePrefix) {
- checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix));
- conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
- return this;
- }
-
- public ReaderConfigurationData<byte[]> getReaderConfigurationData() {
- return conf;
- }
-
- private static final long serialVersionUID = 1L;
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
deleted file mode 100644
index 26d694d589a..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import java.io.Serializable;
-
-/**
- * A listener that will be called in order for every message received.
- */
-public interface ReaderListener<T> extends Serializable {
- /**
- * This method is called whenever a new message is received.
- *
- * Messages are guaranteed to be delivered in order and from the same
thread for a single consumer
- *
- * This method will only be called once for each message, unless either
application or broker crashes.
- *
- * Application is responsible of handling any exception that could be
thrown while processing the message.
- *
- * @param reader
- * the Reader object from where the message was received
- * @param msg
- * the message object
- */
- void received(Reader reader, Message<T> msg);
-
- /**
- * Get the notification when a topic is terminated.
- *
- * @param reader
- * the Reader object associated with the terminated topic
- */
- default void reachedEndOfTopic(Reader reader) {
- // By default ignore the notification
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java
deleted file mode 100644
index 57896b5e272..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Pulsar Client API.
- */
-package org.apache.pulsar.client.api;
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
deleted file mode 100644
index 6d6a08725d7..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-
-@SuppressWarnings("deprecation")
-public class MessageBuilderImpl implements MessageBuilder {
- private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0);
- private final MessageMetadata msgMetadataBuilder = new MessageMetadata();
- private ByteBuffer content = EMPTY_CONTENT;
-
- @Override
- public Message<byte[]> build() {
- return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES,
null);
- }
-
- @Override
- public MessageBuilder setContent(byte[] data) {
- setContent(data, 0, data.length);
- return this;
- }
-
- @Override
- public MessageBuilder setContent(byte[] data, int offet, int length) {
- this.content = ByteBuffer.wrap(data, offet, length);
- return this;
- }
-
- @Override
- public MessageBuilder setContent(ByteBuffer buf) {
- this.content = buf.duplicate();
- return this;
- }
-
- @Override
- public MessageBuilder setProperties(Map<String, String> properties) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- msgMetadataBuilder.addProperty()
- .setKey(entry.getKey())
- .setValue(entry.getValue());
- }
-
- return this;
- }
-
- @Override
- public MessageBuilder setProperty(String name, String value) {
- msgMetadataBuilder.addProperty()
- .setKey(name)
- .setValue(value);
- return this;
- }
-
- @Override
- public MessageBuilder setKey(String key) {
- msgMetadataBuilder.setPartitionKey(key);
- return this;
- }
-
- @Override
- public MessageBuilder setEventTime(long timestamp) {
- checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
- msgMetadataBuilder.setEventTime(timestamp);
- return this;
- }
-
- @Override
- public MessageBuilder setSequenceId(long sequenceId) {
- checkArgument(sequenceId >= 0);
- msgMetadataBuilder.setSequenceId(sequenceId);
- return this;
- }
-
- @Override
- public MessageBuilder setReplicationClusters(List<String> clusters) {
- Objects.requireNonNull(clusters);
- msgMetadataBuilder.clearReplicateTo();
- msgMetadataBuilder.addAllReplicateTos(clusters);
- return this;
- }
-
- @Override
- public MessageBuilder disableReplication() {
- msgMetadataBuilder.clearReplicateTo();
- msgMetadataBuilder.addReplicateTo("__local__");
- return this;
- }
-
-
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java
deleted file mode 100644
index e429b403ec6..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Pulsar Client API.
- */
-package org.apache.pulsar.client.impl;
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
deleted file mode 100644
index ab17beee4f7..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.v1;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-public class ConsumerV1Impl implements Consumer {
- private final org.apache.pulsar.shade.client.api.v2.Consumer<byte[]>
consumer;
-
- public
ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer)
{
- this.consumer = consumer;
- }
-
- @Override
- public void acknowledge(Message<?> arg0) throws PulsarClientException {
- consumer.acknowledge(arg0);
- }
-
- @Override
- public void acknowledge(MessageId arg0) throws PulsarClientException {
- consumer.acknowledge(arg0);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(Message<?> arg0) {
- return consumer.acknowledgeAsync(arg0);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeAsync(MessageId arg0) {
- return consumer.acknowledgeAsync(arg0);
- }
-
- @Override
- public void acknowledgeCumulative(Message<?> arg0) throws
PulsarClientException {
- consumer.acknowledgeCumulative(arg0);
- }
-
- @Override
- public void acknowledgeCumulative(MessageId arg0) throws
PulsarClientException {
- consumer.acknowledgeCumulative(arg0);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> arg0)
{
- return consumer.acknowledgeCumulativeAsync(arg0);
- }
-
- @Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId arg0) {
- return consumer.acknowledgeCumulativeAsync(arg0);
- }
-
- @Override
- public void close() throws PulsarClientException {
- consumer.close();
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return consumer.closeAsync();
- }
-
- @Override
- public String getConsumerName() {
- return consumer.getConsumerName();
- }
-
- @Override
- public ConsumerStats getStats() {
- return consumer.getStats();
- }
-
- public String getSubscription() {
- return consumer.getSubscription();
- }
-
- public String getTopic() {
- return consumer.getTopic();
- }
-
- public boolean hasReachedEndOfTopic() {
- return consumer.hasReachedEndOfTopic();
- }
-
- public boolean isConnected() {
- return consumer.isConnected();
- }
-
- public void pause() {
- consumer.pause();
- }
-
- public Message<byte[]> receive() throws PulsarClientException {
- return consumer.receive();
- }
-
- public Message<byte[]> receive(int arg0, TimeUnit arg1) throws
PulsarClientException {
- return consumer.receive(arg0, arg1);
- }
-
- public CompletableFuture<Message<byte[]>> receiveAsync() {
- return consumer.receiveAsync();
- }
-
- public void redeliverUnacknowledgedMessages() {
- consumer.redeliverUnacknowledgedMessages();
- }
-
- public void resume() {
- consumer.resume();
- }
-
- public void seek(MessageId arg0) throws PulsarClientException {
- consumer.seek(arg0);
- }
-
- public void seek(long arg0) throws PulsarClientException {
- consumer.seek(arg0);
- }
-
- public void seek(Function<String, Object> function) throws
PulsarClientException {
- consumer.seek(function);
- }
-
- public CompletableFuture<Void> seekAsync(long arg0) {
- return consumer.seekAsync(arg0);
- }
-
- public CompletableFuture<Void> seekAsync(MessageId arg0) {
- return consumer.seekAsync(arg0);
- }
-
- public CompletableFuture<Void> seekAsync(Function<String, Object>
function) {
- return consumer.seekAsync(function);
- }
-
- public void unsubscribe() throws PulsarClientException {
- consumer.unsubscribe();
- }
-
- public CompletableFuture<Void> unsubscribeAsync() {
- return consumer.unsubscribeAsync();
- }
-
- public MessageId getLastMessageId() throws PulsarClientException {
- return consumer.getLastMessageId();
- }
-
- public CompletableFuture<MessageId> getLastMessageIdAsync() {
- return consumer.getLastMessageIdAsync();
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
deleted file mode 100644
index 12c8d0f1527..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.v1;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerStats;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ProducerImpl;
-
-public class ProducerV1Impl implements Producer {
-
- private final ProducerImpl<byte[]> producer;
-
- public ProducerV1Impl(ProducerImpl<byte[]> producer) {
- this.producer = producer;
- }
-
- public void close() throws PulsarClientException {
- producer.close();
- }
-
- public CompletableFuture<Void> closeAsync() {
- return producer.closeAsync();
- }
-
- public void flush() throws PulsarClientException {
- producer.flush();
- }
-
- public CompletableFuture<Void> flushAsync() {
- return producer.flushAsync();
- }
-
- public long getLastSequenceId() {
- return producer.getLastSequenceId();
- }
-
- public ProducerStats getStats() {
- return producer.getStats();
- }
-
- public boolean isConnected() {
- return producer.isConnected();
- }
-
- public MessageId send(byte[] value) throws PulsarClientException {
- return producer.send(value);
- }
-
- public MessageId send(Message<byte[]> value) throws PulsarClientException {
- return producer.send(value);
- }
-
- public CompletableFuture<MessageId> sendAsync(byte[] arg0) {
- return producer.sendAsync(arg0);
- }
-
- public CompletableFuture<MessageId> sendAsync(Message<byte[]> arg0) {
- return producer.sendAsync(arg0);
- }
-
- @Override
- public String getTopic() {
- return producer.getTopic();
- }
-
- @Override
- public String getProducerName() {
- return producer.getProducerName();
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
deleted file mode 100644
index ca4373d37f4..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.v1;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
-import org.apache.pulsar.common.util.FutureUtil;
-
-@SuppressWarnings("deprecation")
-public class PulsarClientV1Impl implements PulsarClient {
-
- private final PulsarClientImpl client;
-
- public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf)
throws PulsarClientException {
- this.client = new
PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
- }
-
- @Override
- public void close() throws PulsarClientException {
- client.close();
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return client.closeAsync();
- }
-
- @Override
- public Producer createProducer(final String topic, final
ProducerConfiguration conf) throws PulsarClientException {
- if (conf == null) {
- throw new
PulsarClientException.InvalidConfigurationException("Invalid null configuration
object");
- }
-
- try {
- return createProducerAsync(topic, conf).get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public Producer createProducer(String topic)
- throws PulsarClientException {
- return createProducer(topic, new ProducerConfiguration());
- }
-
- @Override
- public CompletableFuture<Producer> createProducerAsync(final String topic,
final ProducerConfiguration conf) {
- ProducerConfigurationData confData =
conf.getProducerConfigurationData().clone();
- confData.setTopicName(topic);
- return client.createProducerAsync(confData).thenApply(p -> new
ProducerV1Impl((ProducerImpl<byte[]>) p));
- }
-
- @Override
- public CompletableFuture<Producer> createProducerAsync(String topic) {
- return createProducerAsync(topic, new ProducerConfiguration());
- }
-
- @Override
- public Reader createReader(String topic, MessageId startMessageId,
ReaderConfiguration conf)
- throws PulsarClientException {
- try {
- return createReaderAsync(topic, startMessageId, conf).get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public CompletableFuture<Reader> createReaderAsync(String topic, MessageId
startMessageId,
- ReaderConfiguration conf) {
- ReaderConfigurationData<byte[]> confData =
conf.getReaderConfigurationData().clone();
- confData.setTopicName(topic);
- confData.setStartMessageId(startMessageId);
- return client.createReaderAsync(confData).thenApply(r -> new
ReaderV1Impl(r));
- }
-
- @Override
- public void shutdown() throws PulsarClientException {
- client.shutdown();
- }
-
- @Override
- public Consumer subscribe(String topic, String subscriptionName) throws
PulsarClientException {
- return subscribe(topic, subscriptionName, new ConsumerConfiguration());
- }
-
- @Override
- public CompletableFuture<Consumer> subscribeAsync(final String topic,
final String subscription,
- final ConsumerConfiguration conf) {
- if (conf == null) {
- return FutureUtil.failedFuture(
- new
PulsarClientException.InvalidConfigurationException("Invalid null
configuration"));
- }
-
- ConsumerConfigurationData<byte[]> confData =
conf.getConfigurationData().clone();
- confData.getTopicNames().add(topic);
- confData.setSubscriptionName(subscription);
- return client.subscribeAsync(confData).thenApply(c -> new
ConsumerV1Impl(c));
- }
-
- @Override
- public CompletableFuture<Consumer> subscribeAsync(String topic,
- String subscriptionName) {
- return subscribeAsync(topic, subscriptionName, new
ConsumerConfiguration());
- }
-
- @Override
- public Consumer subscribe(String topic, String subscription,
ConsumerConfiguration conf)
- throws PulsarClientException {
- try {
- return subscribeAsync(topic, subscription, conf).get();
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof PulsarClientException) {
- throw (PulsarClientException) t;
- } else {
- throw new PulsarClientException(t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
deleted file mode 100644
index 2e6384459e1..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.v1;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-
-public class ReaderV1Impl implements Reader {
-
- private final org.apache.pulsar.shade.client.api.v2.Reader<byte[]> reader;
-
- public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader<byte[]>
reader) {
- this.reader = reader;
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return reader.closeAsync();
- }
-
- @Override
- public String getTopic() {
- return reader.getTopic();
- }
-
- @Override
- public boolean hasMessageAvailable() throws PulsarClientException {
- return reader.hasMessageAvailable();
- }
-
- @Override
- public CompletableFuture<Boolean> hasMessageAvailableAsync() {
- return reader.hasMessageAvailableAsync();
- }
-
- @Override
- public boolean hasReachedEndOfTopic() {
- return reader.hasReachedEndOfTopic();
- }
-
- @Override
- public boolean isConnected() {
- return reader.isConnected();
- }
-
- @Override
- public Message<byte[]> readNext() throws PulsarClientException {
- return reader.readNext();
- }
-
- @Override
- public Message<byte[]> readNext(int arg0, TimeUnit arg1) throws
PulsarClientException {
- return reader.readNext(arg0, arg1);
- }
-
- @Override
- public CompletableFuture<Message<byte[]>> readNextAsync() {
- return reader.readNextAsync();
- }
-}
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java
deleted file mode 100644
index 97705e7d567..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Pulsar Client API.
- */
-package org.apache.pulsar.client.impl.v1;
diff --git
a/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml
b/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 7938e60bf43..00000000000
---
a/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<FindBugsFilter>
- <!-- Ignore violations that were present when the rule was enabled -->
- <Match>
- <Class name="org.apache.pulsar.client.api.ClientConfiguration"/>
- <Method name="getConfigurationData"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.client.api.ConsumerConfiguration"/>
- <Method name="getConfigurationData"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.client.api.ProducerConfiguration"/>
- <Method name="getProducerConfigurationData"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.client.api.ReaderConfiguration"/>
- <Method name="getReaderConfigurationData"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.client.impl.v1.ProducerV1Impl"/>
- <Method name="<init>"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- </Match>
-</FindBugsFilter>
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
deleted file mode 100644
index 2e316e8e5ee..00000000000
--- a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-1x-base</artifactId>
- <version>4.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-client-2x-shaded</artifactId>
- <name>Pulsar Client 2.x Shaded API</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <!-- Shade all the dependencies to avoid conflicts -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>${shadePluginPhase}</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <createDependencyReducedPom>true</createDependencyReducedPom>
-
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
- <minimizeJar>false</minimizeJar>
-
- <artifactSet>
- <includes>
- <include>org.apache.pulsar:pulsar-client</include>
- <include>org.apache.pulsar:pulsar-client-api</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>org.apache.pulsar:pulsar-client</artifact>
- <includes>
- <include>**</include>
- </includes>
- <excludes>
- <!-- bouncycastle jars could not be shaded, or the
signatures will be wrong-->
- <exclude>org/bouncycastle/**</exclude>
- </excludes>
- </filter>
- </filters>
- <relocations>
- <relocation>
- <pattern>org.apache.pulsar.client.api</pattern>
-
<shadedPattern>org.apache.pulsar.shade.client.api.v2</shadedPattern>
- <includes>
-
<include>org.apache.pulsar.client.api.PulsarClient</include>
- <include>org.apache.pulsar.client.api.Producer</include>
- <include>org.apache.pulsar.client.api.Consumer</include>
- <include>org.apache.pulsar.client.api.Reader</include>
-
<include>org.apache.pulsar.client.api.MessageListener</include>
-
<include>org.apache.pulsar.client.api.ReaderListener</include>
- </includes>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>