This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1602e4b7d26cf52cea993c410769b7b15a672aff Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Feb 9 13:00:53 2022 +0800 [FLINK-24246][connector/pulsar] Bump PulsarClient version to latest 2.9.1 1. Bump the pulsar-client-all version in pom file. 2. Exclude useless dependencies for pulsar-client-all. 3. Bump the Pulsar docker version. 4. Change the dependencies to pass the tests. 5. Drop PulsarTransactionUtils and fix compile issues in tests. 6. Add bouncycastle to Pulsar e2e tests. --- flink-connectors/flink-connector-pulsar/pom.xml | 74 +++++++++++-- .../common/utils/PulsarTransactionUtils.java | 118 --------------------- .../split/PulsarUnorderedPartitionSplitReader.java | 3 +- .../PulsarDeserializationSchemaTest.java | 2 +- .../src/main/resources/META-INF/NOTICE | 16 +-- .../flink-end-to-end-tests-pulsar/pom.xml | 43 +++++++- .../FlinkContainerWithPulsarEnvironment.java | 5 + .../org/apache/flink/util/DockerImageVersions.java | 2 +- 8 files changed, 124 insertions(+), 139 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index 87b6ba0..45047eb 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -36,12 +36,14 @@ under the License. <packaging>jar</packaging> <properties> - <pulsar.version>2.8.0</pulsar.version> + <pulsar.version>2.9.1</pulsar.version> <!-- Test Libraries --> <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version> - <commons-lang3.version>3.11</commons-lang3.version> - <grpc.version>1.33.0</grpc.version> + <pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version> + <pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version> + <pulsar-netty.version>4.1.72.Final</pulsar-netty.version> + <pulsar-grpc.version>1.33.0</pulsar-grpc.version> </properties> <dependencies> @@ -138,12 +140,22 @@ under the License. <version>${pulsar.version}</version> <scope>test</scope> </dependency> + <!-- Pulsar use a newer commons-lang3 in broker. --> <!-- Bump the version only for testing. --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> + <version>${pulsar-commons-lang3.version}</version> + <scope>test</scope> + </dependency> + + <!-- Pulsar use a newer zookeeper in broker. --> + <!-- Bump the version only for testing. --> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${pulsar-zookeeper.version}</version> <scope>test</scope> </dependency> @@ -156,9 +168,41 @@ under the License. <version>${pulsar.version}</version> <exclusions> <exclusion> + <groupId>com.sun.activation</groupId> + <artifactId>javax.activation</artifactId> + </exclusion> + <exclusion> + <groupId>jakarta.activation</groupId> + <artifactId>jakarta.activation-api</artifactId> + </exclusion> + <exclusion> + <groupId>jakarta.ws.rs</groupId> + <artifactId>jakarta.ws.rs-api</artifactId> + </exclusion> + <exclusion> + <groupId>jakarta.xml.bind</groupId> + <artifactId>jakarta.xml.bind-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + </exclusion> + <exclusion> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + </exclusion> + <exclusion> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + </exclusion> + <exclusion> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-package-core</artifactId> </exclusion> + <exclusion> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </exclusion> </exclusions> </dependency> @@ -171,13 +215,23 @@ under the License. </dependency> </dependencies> - <!-- gRPC use version range which don't support by flink ci. --> + <dependencyManagement> <dependencies> + <!-- Pulsar use higher gRPC version. --> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-bom</artifactId> - <version>${grpc.version}</version> + <version>${pulsar-grpc.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + + <!-- Pulsar use higher netty version. --> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-bom</artifactId> + <version>${pulsar-netty.version}</version> <type>pom</type> <scope>import</scope> </dependency> @@ -200,7 +254,9 @@ under the License. <configuration> <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> <forkCount>1</forkCount> - <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en</argLine> + <argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} + -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en + </argLine> </configuration> </plugin> <plugin> @@ -222,7 +278,9 @@ under the License. <outputDirectory> ${project.build.directory}/generated-test-sources/protobuf/java </outputDirectory> - <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> + <protocArtifact> + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + </protocArtifact> </configuration> <executions> <execution> diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java deleted file mode 100644 index ef54779..0000000 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.pulsar.common.utils; - -import org.apache.flink.annotation.Internal; - -import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.client.impl.transaction.TransactionImpl; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Transaction was introduced into pulsar since 2.7.0, but the interface {@link Transaction} didn't - * provide a id method until 2.8.1. We have to add this util for acquiring the {@link TxnID} for - * compatible consideration. - * - * <p>TODO Remove this hack after pulsar 2.8.1 release. - */ -@Internal -@SuppressWarnings("java:S3011") -public final class PulsarTransactionUtils { - - private static volatile Field mostBitsField; - private static volatile Field leastBitsField; - - private PulsarTransactionUtils() { - // No public constructor - } - - public static TxnID getId(Transaction transaction) { - // 2.8.1 and after. - try { - Method getId = Transaction.class.getDeclaredMethod("getTxnID"); - return (TxnID) getId.invoke(transaction); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - // 2.8.0 and before. - TransactionImpl impl = (TransactionImpl) transaction; - Long txnIdMostBits = getTxnIdMostBits(impl); - Long txnIdLeastBits = getTxnIdLeastBits(impl); - - checkNotNull(txnIdMostBits, "Failed to get txnIdMostBits"); - checkNotNull(txnIdLeastBits, "Failed to get txnIdLeastBits"); - - return new TxnID(txnIdMostBits, txnIdLeastBits); - } - } - - private static Long getTxnIdMostBits(TransactionImpl transaction) { - if (mostBitsField == null) { - synchronized (PulsarTransactionUtils.class) { - if (mostBitsField == null) { - try { - mostBitsField = TransactionImpl.class.getDeclaredField("txnIdMostBits"); - mostBitsField.setAccessible(true); - } catch (NoSuchFieldException e) { - // Nothing to do for this exception. - } - } - } - } - - if (mostBitsField != null) { - try { - return (Long) mostBitsField.get(transaction); - } catch (IllegalAccessException e) { - // Nothing to do for this exception. - } - } - - return null; - } - - private static Long getTxnIdLeastBits(TransactionImpl transaction) { - if (leastBitsField == null) { - synchronized (PulsarTransactionUtils.class) { - if (leastBitsField == null) { - try { - leastBitsField = TransactionImpl.class.getDeclaredField("txnIdLeastBits"); - leastBitsField.setAccessible(true); - } catch (NoSuchFieldException e) { - // Nothing to do for this exception. - } - } - } - } - - if (leastBitsField != null) { - try { - return (Long) leastBitsField.get(transaction); - } catch (IllegalAccessException e) { - // Nothing to do for this exception. - } - } - - return null; - } -} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index 846101d..7262863 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.source.reader.split; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader; @@ -155,7 +154,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl // Avoiding NP problem when Pulsar don't get the message before Flink checkpoint. if (uncommittedTransaction != null) { - TxnID txnID = PulsarTransactionUtils.getId(uncommittedTransaction); + TxnID txnID = uncommittedTransaction.getTxnID(); this.uncommittedTransaction = newTransaction(); state.setUncommittedTransactionId(txnID); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java index aa4bcee..48e6e7a 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java @@ -113,7 +113,7 @@ class PulsarDeserializationSchemaTest { MessageMetadata metadata = new MessageMetadata(); ByteBuffer payload = ByteBuffer.wrap(bytes); - return MessageImpl.create(metadata, payload, Schema.BYTES); + return MessageImpl.create(metadata, payload, Schema.BYTES, ""); } /** This collector is used for collecting only one message. Used for test purpose. */ diff --git a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE index 79ebbfc..56ad187 100644 --- a/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE +++ b/flink-connectors/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE @@ -6,12 +6,12 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.pulsar:bouncy-castle-bc:pkg:2.8.0 -- org.apache.pulsar:pulsar-client-admin-api:2.8.0 -- org.apache.pulsar:pulsar-client-all:2.8.0 -- org.apache.pulsar:pulsar-client-api:2.8.0 -- org.bouncycastle:bcpkix-jdk15on:1.68 -- org.bouncycastle:bcprov-ext-jdk15on:1.68 -- org.bouncycastle:bcprov-jdk15on:1.68 -- org.bouncycastle:bcutil-jdk15on:1.68 +- org.apache.pulsar:bouncy-castle-bc:pkg:2.9.1 +- org.apache.pulsar:pulsar-client-admin-api:2.9.1 +- org.apache.pulsar:pulsar-client-all:2.9.1 +- org.apache.pulsar:pulsar-client-api:2.9.1 +- org.bouncycastle:bcpkix-jdk15on:1.69 +- org.bouncycastle:bcprov-ext-jdk15on:1.69 +- org.bouncycastle:bcprov-jdk15on:1.69 +- org.bouncycastle:bcutil-jdk15on:1.69 - org.slf4j:jul-to-slf4j:1.7.25 diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml index e7caf8b..7c87ec7 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml @@ -31,7 +31,8 @@ under the License. <name>Flink : E2E Tests : Pulsar</name> <properties> - <pulsar.version>2.8.0</pulsar.version> + <pulsar.version>2.9.1</pulsar.version> + <bouncycastle.version>1.69</bouncycastle.version> </properties> <dependencies> @@ -105,6 +106,46 @@ under the License. <type>jar</type> <outputDirectory>${project.build.directory}/dependencies</outputDirectory> </artifactItem> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>bouncy-castle-bc</artifactId> + <version>${pulsar.version}</version> + <destFileName>bouncy-castle-bc.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpkix-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <destFileName>bcpkix-jdk15on.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <destFileName>bcprov-jdk15on.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcutil-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <destFileName>bcutil-jdk15on.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-ext-jdk15on</artifactId> + <version>${bouncycastle.version}</version> + <destFileName>bcprov-ext-jdk15on.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </dependency> <artifactItem> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index 52957fc..ccfe277 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -37,6 +37,11 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir resourcePath("pulsar-client-all.jar"), resourcePath("pulsar-client-api.jar"), resourcePath("pulsar-admin-api.jar"), + resourcePath("bouncy-castle-bc.jar"), + resourcePath("bcpkix-jdk15on.jar"), + resourcePath("bcprov-jdk15on.jar"), + resourcePath("bcutil-jdk15on.jar"), + resourcePath("bcprov-ext-jdk15on.jar"), resourcePath("jul-to-slf4j.jar")); } diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java index 04298b4..273cee8 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java @@ -42,7 +42,7 @@ public class DockerImageVersions { public static final String LOCALSTACK = "localstack/localstack:0.13.3"; - public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; + public static final String PULSAR = "apachepulsar/pulsar:2.9.1"; public static final String CASSANDRA_3 = "cassandra:3.0";