This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bc9e8ee541 Upgrade Pulsar to 3.2.2 (#12967)
bc9e8ee541 is described below
commit bc9e8ee5413c8611fe2be3ed6c3d7073e750d608
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Apr 23 21:43:12 2024 -0700
Upgrade Pulsar to 3.2.2 (#12967)
---
.../pinot-stream-ingestion/pinot-pulsar/pom.xml | 134 +++------------------
.../pinot/plugin/stream/pulsar/PulsarUtils.java | 27 ++---
.../plugin/stream/pulsar/PulsarConsumerTest.java | 2 +-
pinot-tools/pom.xml | 38 ------
pom.xml | 31 ++---
5 files changed, 37 insertions(+), 195 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
index f742f52f45..cb13fb9bba 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml
@@ -37,137 +37,33 @@
<properties>
<shade.phase.prop>package</shade.phase.prop>
<pinot.root>${basedir}/../../..</pinot.root>
- <simpleclient_common.version>0.16.0</simpleclient_common.version>
- <grpc-context.version>1.63.0</grpc-context.version>
- <grpc-protobuf-lite.version>1.63.0</grpc-protobuf-lite.version>
- <caffeine.version>2.6.2</caffeine.version>
- <codehaus-annotations.version>1.17</codehaus-annotations.version>
+ <pulsar.version>3.2.2</pulsar.version>
+ <testcontainers.pulsar.version>1.19.7</testcontainers.pulsar.version>
</properties>
<dependencies>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>pulsar</artifactId>
- <version>1.19.7</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <exclusions>
- <exclusion>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-ext-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>pulsar-client</artifactId>
+ <version>${pulsar.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-admin-original</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-grizzly2-http</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-resolver</artifactId>
- </dependency>
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_common</artifactId>
- <version>${simpleclient_common.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.api.grpc</groupId>
- <artifactId>proto-google-common-protos</artifactId>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-context</artifactId>
- <version>${grpc-context.version}</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf-lite</artifactId>
- <version>${grpc-protobuf-lite.version}</version>
- </dependency>
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient</artifactId>
- <version>${simpleclient_common.version}</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- </dependency>
- <dependency>
- <groupId>com.squareup.okio</groupId>
- <artifactId>okio</artifactId>
- </dependency>
- <dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_hotspot</artifactId>
- <version>${simpleclient_common.version}</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>animal-sniffer-annotations</artifactId>
- <version>${codehaus-annotations.version}</version>
- </dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- <version>${caffeine.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-socks</artifactId>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15to18</artifactId>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-ext-jdk15to18</artifactId>
+ <artifactId>pulsar-client-admin</artifactId>
+ <version>${pulsar.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15to18</artifactId>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>${testcontainers.pulsar.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.pinot</groupId>
- <artifactId>pinot-spi</artifactId>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index 0ccacc3047..e1b7b50c21 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.BitSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -31,12 +32,11 @@ import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.BatchMessageAcker;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.internal.DefaultImplementation;
public class PulsarUtils {
@@ -119,22 +119,21 @@ public class PulsarUtils {
* record in the new ledger.
*/
public static MessageId getNextMessageId(MessageId messageId) {
- MessageIdImpl messageIdImpl =
MessageIdImpl.convertToMessageIdImpl(messageId);
- long ledgerId = messageIdImpl.getLedgerId();
- long entryId = messageIdImpl.getEntryId();
- int partitionIndex = messageIdImpl.getPartitionIndex();
- if (messageIdImpl instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl)
messageIdImpl;
- int batchIndex = batchMessageIdImpl.getBatchIndex();
- int batchSize = batchMessageIdImpl.getBatchSize();
- BatchMessageAcker acker = batchMessageIdImpl.getAcker();
+ MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
+ long ledgerId = messageIdAdv.getLedgerId();
+ long entryId = messageIdAdv.getEntryId();
+ int partitionIndex = messageIdAdv.getPartitionIndex();
+ int batchSize = messageIdAdv.getBatchSize();
+ if (batchSize > 0) {
+ int batchIndex = messageIdAdv.getBatchIndex();
+ BitSet ackSet = messageIdAdv.getAckSet();
if (batchIndex < batchSize - 1) {
- return new BatchMessageIdImpl(ledgerId, entryId, partitionIndex,
batchIndex + 1, batchSize, acker);
+ return new BatchMessageIdImpl(ledgerId, entryId, partitionIndex,
batchIndex + 1, batchSize, ackSet);
} else {
- return new BatchMessageIdImpl(ledgerId, entryId + 1, partitionIndex,
0, batchSize, acker);
+ return new BatchMessageIdImpl(ledgerId, entryId + 1, partitionIndex,
0, batchSize, ackSet);
}
} else {
- return
DefaultImplementation.getDefaultImplementation().newMessageId(ledgerId, entryId
+ 1, partitionIndex);
+ return new MessageIdImpl(ledgerId, entryId + 1, partitionIndex);
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 1baf212f17..0ee9eb0623 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -54,7 +54,7 @@ import static org.testng.Assert.assertTrue;
public class PulsarConsumerTest {
- private static final DockerImageName PULSAR_IMAGE =
DockerImageName.parse("apachepulsar/pulsar:2.11.4");
+ private static final DockerImageName PULSAR_IMAGE =
DockerImageName.parse("apachepulsar/pulsar:3.2.2");
public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
public static final String TEST_TOPIC = "test-topic";
public static final String TEST_TOPIC_BATCH = "test-topic-batch";
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 26c466ee0c..2e9dcf3b87 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -146,40 +146,6 @@
<artifactId>pinot-pulsar</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>com.google.errorprone</groupId>
- <artifactId>error_prone_annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>animal-sniffer-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.api.grpc</groupId>
- <artifactId>proto-google-common-protos</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf-lite</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-context</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.typesafe.netty</groupId>
- <artifactId>netty-reactive-streams</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.beust</groupId>
- <artifactId>jcommander</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
@@ -208,10 +174,6 @@
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>
- <dependency>
- <groupId>io.airlift</groupId>
- <artifactId>aircompressor</artifactId>
- </dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
diff --git a/pom.xml b/pom.xml
index b52922f01e..6a75bfd73e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,6 +240,7 @@
<flink.version>1.14.6</flink.version>
+ <!-- Solve conflict across dependencies -->
<kotlin.stdlib.version>1.9.23</kotlin.stdlib.version>
<okio.version>3.9.0</okio.version>
<kerby.version>2.0.3</kerby.version>
@@ -249,8 +250,7 @@
<eclipse.jetty.version>9.4.54.v20240208</eclipse.jetty.version>
<nimbus-jose-jwt.version>9.37.3</nimbus-jose-jwt.version>
<bouncycastle.version>1.78</bouncycastle.version>
- <airlift.version>0.26</airlift.version>
- <pulsar.version>2.11.4</pulsar.version>
+ <aircompressor.version>0.26</aircompressor.version>
</properties>
<profiles>
@@ -1561,19 +1561,7 @@
<version>${sslcontext.kickstart.version}</version>
</dependency>
- <!-- Pulsar -->
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${pulsar.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-admin-original</artifactId>
- <version>${pulsar.version}</version>
- </dependency>
-
- <!-- bouncycastle libraries are used by Kafka and Pulsar plugins -->
+ <!-- Bouncy Castle libraries are used by Kafka and Pulsar plugins -->
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
@@ -1586,27 +1574,24 @@
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15to18</artifactId>
+ <artifactId>bcutil-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-ext-jdk15to18</artifactId>
- <version>${bouncycastle.version}</version>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15to18</artifactId>
+ <artifactId>bcprov-ext-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
+ <!-- Used by ORC, Parquet and Pulsar -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
- <version>${airlift.version}</version>
+ <version>${aircompressor.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
+
<build>
<defaultGoal>clean install</defaultGoal>
<extensions>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]