This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1602e4b7d26cf52cea993c410769b7b15a672aff
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 13:00:53 2022 +0800

    [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1
    
    1. Bump the pulsar-client-all version in pom file.
    2. Exclude useless dependencies for pulsar-client-all.
    3. Bump the Pulsar docker version.
    4. Change the dependencies to pass the tests.
    5. Drop PulsarTransactionUtils and fix compile issues in tests.
    6. Add bouncycastle to Pulsar e2e tests.
---
 flink-connectors/flink-connector-pulsar/pom.xml    |  74 +++++++++++--
 .../common/utils/PulsarTransactionUtils.java       | 118 ---------------------
 .../split/PulsarUnorderedPartitionSplitReader.java |   3 +-
 .../PulsarDeserializationSchemaTest.java           |   2 +-
 .../src/main/resources/META-INF/NOTICE             |  16 +--
 .../flink-end-to-end-tests-pulsar/pom.xml          |  43 +++++++-
 .../FlinkContainerWithPulsarEnvironment.java       |   5 +
 .../org/apache/flink/util/DockerImageVersions.java |   2 +-
 8 files changed, 124 insertions(+), 139 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/pom.xml 
b/flink-connectors/flink-connector-pulsar/pom.xml
index 87b6ba0..45047eb 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -36,12 +36,14 @@ under the License.
        <packaging>jar</packaging>
 
        <properties>
-               <pulsar.version>2.8.0</pulsar.version>
+               <pulsar.version>2.9.1</pulsar.version>
 
                <!-- Test Libraries -->
                
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
-               <commons-lang3.version>3.11</commons-lang3.version>
-               <grpc.version>1.33.0</grpc.version>
+               
<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version>
+               <pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version>
+               <pulsar-netty.version>4.1.72.Final</pulsar-netty.version>
+               <pulsar-grpc.version>1.33.0</pulsar-grpc.version>
        </properties>
 
        <dependencies>
@@ -138,12 +140,22 @@ under the License.
                        <version>${pulsar.version}</version>
                        <scope>test</scope>
                </dependency>
+
                <!-- Pulsar use a newer commons-lang3 in broker. -->
                <!-- Bump the version only for testing. -->
                <dependency>
                        <groupId>org.apache.commons</groupId>
                        <artifactId>commons-lang3</artifactId>
-                       <version>${commons-lang3.version}</version>
+                       <version>${pulsar-commons-lang3.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Pulsar use a newer zookeeper in broker. -->
+               <!-- Bump the version only for testing. -->
+               <dependency>
+                       <groupId>org.apache.zookeeper</groupId>
+                       <artifactId>zookeeper</artifactId>
+                       <version>${pulsar-zookeeper.version}</version>
                        <scope>test</scope>
                </dependency>
 
@@ -156,9 +168,41 @@ under the License.
                        <version>${pulsar.version}</version>
                        <exclusions>
                                <exclusion>
+                                       <groupId>com.sun.activation</groupId>
+                                       
<artifactId>javax.activation</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>jakarta.activation</groupId>
+                                       
<artifactId>jakarta.activation-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>jakarta.ws.rs</groupId>
+                                       
<artifactId>jakarta.ws.rs-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>jakarta.xml.bind</groupId>
+                                       
<artifactId>jakarta.xml.bind-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>javax.validation</groupId>
+                                       <artifactId>validation-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>javax.xml.bind</groupId>
+                                       <artifactId>jaxb-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>net.jcip</groupId>
+                                       
<artifactId>jcip-annotations</artifactId>
+                               </exclusion>
+                               <exclusion>
                                        <groupId>org.apache.pulsar</groupId>
                                        
<artifactId>pulsar-package-core</artifactId>
                                </exclusion>
+                               <exclusion>
+                                       <groupId>com.beust</groupId>
+                                       <artifactId>jcommander</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 
@@ -171,13 +215,23 @@ under the License.
                </dependency>
        </dependencies>
 
-       <!-- gRPC use version range which don't support by flink ci. -->
+
        <dependencyManagement>
                <dependencies>
+                       <!-- Pulsar use higher gRPC version. -->
                        <dependency>
                                <groupId>io.grpc</groupId>
                                <artifactId>grpc-bom</artifactId>
-                               <version>${grpc.version}</version>
+                               <version>${pulsar-grpc.version}</version>
+                               <type>pom</type>
+                               <scope>import</scope>
+                       </dependency>
+
+                       <!-- Pulsar use higher netty version. -->
+                       <dependency>
+                               <groupId>io.netty</groupId>
+                               <artifactId>netty-bom</artifactId>
+                               <version>${pulsar-netty.version}</version>
                                <type>pom</type>
                                <scope>import</scope>
                        </dependency>
@@ -200,7 +254,9 @@ under the License.
                                <configuration>
                                        <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
                                        <forkCount>1</forkCount>
-                                       <argLine>-Xms256m -Xmx2048m 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit 
-Duser.country=US -Duser.language=en</argLine>
+                                       <argLine>-Xms256m -Xmx2048m 
-Dmvn.forkNumber=${surefire.forkNumber}
+                                               -XX:-UseGCOverheadLimit 
-Duser.country=US -Duser.language=en
+                                       </argLine>
                                </configuration>
                        </plugin>
                        <plugin>
@@ -222,7 +278,9 @@ under the License.
                                        <outputDirectory>
                                                
${project.build.directory}/generated-test-sources/protobuf/java
                                        </outputDirectory>
-                                       
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+                                       <protocArtifact>
+                                               
com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+                                       </protocArtifact>
                                </configuration>
                                <executions>
                                        <execution>
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
deleted file mode 100644
index ef54779..0000000
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.common.utils;
-
-import org.apache.flink.annotation.Internal;
-
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Transaction was introduced into pulsar since 2.7.0, but the interface 
{@link Transaction} didn't
- * provide a id method until 2.8.1. We have to add this util for acquiring the 
{@link TxnID} for
- * compatible consideration.
- *
- * <p>TODO Remove this hack after pulsar 2.8.1 release.
- */
-@Internal
-@SuppressWarnings("java:S3011")
-public final class PulsarTransactionUtils {
-
-    private static volatile Field mostBitsField;
-    private static volatile Field leastBitsField;
-
-    private PulsarTransactionUtils() {
-        // No public constructor
-    }
-
-    public static TxnID getId(Transaction transaction) {
-        // 2.8.1 and after.
-        try {
-            Method getId = Transaction.class.getDeclaredMethod("getTxnID");
-            return (TxnID) getId.invoke(transaction);
-        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
-            // 2.8.0 and before.
-            TransactionImpl impl = (TransactionImpl) transaction;
-            Long txnIdMostBits = getTxnIdMostBits(impl);
-            Long txnIdLeastBits = getTxnIdLeastBits(impl);
-
-            checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits");
-            checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits");
-
-            return new TxnID(txnIdMostBits, txnIdLeastBits);
-        }
-    }
-
-    private static Long getTxnIdMostBits(TransactionImpl transaction) {
-        if (mostBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (mostBitsField == null) {
-                    try {
-                        mostBitsField = 
TransactionImpl.class.getDeclaredField("txnIdMostBits");
-                        mostBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
-            }
-        }
-
-        if (mostBitsField != null) {
-            try {
-                return (Long) mostBitsField.get(transaction);
-            } catch (IllegalAccessException e) {
-                // Nothing to do for this exception.
-            }
-        }
-
-        return null;
-    }
-
-    private static Long getTxnIdLeastBits(TransactionImpl transaction) {
-        if (leastBitsField == null) {
-            synchronized (PulsarTransactionUtils.class) {
-                if (leastBitsField == null) {
-                    try {
-                        leastBitsField = 
TransactionImpl.class.getDeclaredField("txnIdLeastBits");
-                        leastBitsField.setAccessible(true);
-                    } catch (NoSuchFieldException e) {
-                        // Nothing to do for this exception.
-                    }
-                }
-            }
-        }
-
-        if (leastBitsField != null) {
-            try {
-                return (Long) leastBitsField.get(transaction);
-            } catch (IllegalAccessException e) {
-                // Nothing to do for this exception.
-            }
-        }
-
-        return null;
-    }
-}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 846101d..7262863 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import 
org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
@@ -155,7 +154,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
 
         // Avoiding NP problem when Pulsar don't get the message before Flink 
checkpoint.
         if (uncommittedTransaction != null) {
-            TxnID txnID = PulsarTransactionUtils.getId(uncommittedTransaction);
+            TxnID txnID = uncommittedTransaction.getTxnID();
             this.uncommittedTransaction = newTransaction();
             state.setUncommittedTransactionId(txnID);
         }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index aa4bcee..48e6e7a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -113,7 +113,7 @@ class PulsarDeserializationSchemaTest {
         MessageMetadata metadata = new MessageMetadata();
         ByteBuffer payload = ByteBuffer.wrap(bytes);
 
-        return MessageImpl.create(metadata, payload, Schema.BYTES);
+        return MessageImpl.create(metadata, payload, Schema.BYTES, "");
     }
 
     /** This collector is used for collecting only one message. Used for test 
purpose. */
diff --git 
a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
 
b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 79ebbfc..56ad187 100644
--- 
a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ 
b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,12 +6,12 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.8.0
-- org.apache.pulsar:pulsar-client-admin-api:2.8.0
-- org.apache.pulsar:pulsar-client-all:2.8.0
-- org.apache.pulsar:pulsar-client-api:2.8.0
-- org.bouncycastle:bcpkix-jdk15on:1.68
-- org.bouncycastle:bcprov-ext-jdk15on:1.68
-- org.bouncycastle:bcprov-jdk15on:1.68
-- org.bouncycastle:bcutil-jdk15on:1.68
+- org.apache.pulsar:bouncy-castle-bc:pkg:2.9.1
+- org.apache.pulsar:pulsar-client-admin-api:2.9.1
+- org.apache.pulsar:pulsar-client-all:2.9.1
+- org.apache.pulsar:pulsar-client-api:2.9.1
+- org.bouncycastle:bcpkix-jdk15on:1.69
+- org.bouncycastle:bcprov-ext-jdk15on:1.69
+- org.bouncycastle:bcprov-jdk15on:1.69
+- org.bouncycastle:bcutil-jdk15on:1.69
 - org.slf4j:jul-to-slf4j:1.7.25
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
index e7caf8b..7c87ec7 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -31,7 +31,8 @@ under the License.
        <name>Flink : E2E Tests : Pulsar</name>
 
        <properties>
-               <pulsar.version>2.8.0</pulsar.version>
+               <pulsar.version>2.9.1</pulsar.version>
+               <bouncycastle.version>1.69</bouncycastle.version>
        </properties>
 
        <dependencies>
@@ -105,6 +106,46 @@ under the License.
                                                        <type>jar</type>
                                                        
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                                                </artifactItem>
+                                               <dependency>
+                                                       
<groupId>org.apache.pulsar</groupId>
+                                                       
<artifactId>bouncy-castle-bc</artifactId>
+                                                       
<version>${pulsar.version}</version>
+                                                       
<destFileName>bouncy-castle-bc.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </dependency>
+                                               <dependency>
+                                                       
<groupId>org.bouncycastle</groupId>
+                                                       
<artifactId>bcpkix-jdk15on</artifactId>
+                                                       
<version>${bouncycastle.version}</version>
+                                                       
<destFileName>bcpkix-jdk15on.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </dependency>
+                                               <dependency>
+                                                       
<groupId>org.bouncycastle</groupId>
+                                                       
<artifactId>bcprov-jdk15on</artifactId>
+                                                       
<version>${bouncycastle.version}</version>
+                                                       
<destFileName>bcprov-jdk15on.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </dependency>
+                                               <dependency>
+                                                       
<groupId>org.bouncycastle</groupId>
+                                                       
<artifactId>bcutil-jdk15on</artifactId>
+                                                       
<version>${bouncycastle.version}</version>
+                                                       
<destFileName>bcutil-jdk15on.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </dependency>
+                                               <dependency>
+                                                       
<groupId>org.bouncycastle</groupId>
+                                                       
<artifactId>bcprov-ext-jdk15on</artifactId>
+                                                       
<version>${bouncycastle.version}</version>
+                                                       
<destFileName>bcprov-ext-jdk15on.jar</destFileName>
+                                                       <type>jar</type>
+                                                       
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+                                               </dependency>
                                                <artifactItem>
                                                        
<groupId>org.slf4j</groupId>
                                                        
<artifactId>jul-to-slf4j</artifactId>
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 52957fc..ccfe277 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -37,6 +37,11 @@ public class FlinkContainerWithPulsarEnvironment extends 
FlinkContainerTestEnvir
                 resourcePath("pulsar-client-all.jar"),
                 resourcePath("pulsar-client-api.jar"),
                 resourcePath("pulsar-admin-api.jar"),
+                resourcePath("bouncy-castle-bc.jar"),
+                resourcePath("bcpkix-jdk15on.jar"),
+                resourcePath("bcprov-jdk15on.jar"),
+                resourcePath("bcutil-jdk15on.jar"),
+                resourcePath("bcprov-ext-jdk15on.jar"),
                 resourcePath("jul-to-slf4j.jar"));
     }
 
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
index 04298b4..273cee8 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
@@ -42,7 +42,7 @@ public class DockerImageVersions {
 
     public static final String LOCALSTACK = "localstack/localstack:0.13.3";
 
-    public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
+    public static final String PULSAR = "apachepulsar/pulsar:2.9.1";
 
     public static final String CASSANDRA_3 = "cassandra:3.0";
 

Reply via email to