This is an automated email from the ASF dual-hosted git repository.
ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/main by this push:
new de95253 [FLINK-38545] Fix weekly CI pipeline
de95253 is described below
commit de95253ae5d50493290ff7dc8e0ae5cf5694f15d
Author: Ferenc Csaky <[email protected]>
AuthorDate: Sun May 3 17:39:33 2026 +0200
[FLINK-38545] Fix weekly CI pipeline
* Fix IntelliJ link in README
* Spotless code format
* Exclude incompatible testcontainers versions for E2E tests
* Disable zookeeper HA
* Update deprecated `CheckpointingMode`
---
README.md | 2 +-
flink-connector-pulsar-e2e-tests/pom.xml | 10 ++++++++++
.../flink/tests/util/pulsar/PulsarSinkE2ECase.java | 14 ++++++++++----
.../tests/util/pulsar/PulsarSourceE2ECase.java | 12 +++++++++---
.../source/reader/PulsarPartitionSplitReader.java | 21 ++++++++++++++-------
.../connector/pulsar/sink/PulsarSinkITCase.java | 6 +++---
.../connector/pulsar/source/PulsarSourceITCase.java | 4 ++--
.../connector/pulsar/table/PulsarTableTestBase.java | 2 +-
8 files changed, 50 insertions(+), 21 deletions(-)
diff --git a/README.md b/README.md
index bd5b427..aa7a261 100644
--- a/README.md
+++ b/README.md
@@ -41,7 +41,7 @@ The IntelliJ IDE supports Maven out of the box and offers a
plugin for Scala dev
* IntelliJ download:
[https://www.jetbrains.com/idea/](https://www.jetbrains.com/idea/)
* IntelliJ Scala Plugin:
[https://plugins.jetbrains.com/plugin/?id=1347](https://plugins.jetbrains.com/plugin/?id=1347)
-Check out our [Setting up
IntelliJ](https://nightlies.apache.org/flink/flink-docs-master/flinkDev/ide_setup.html#intellij-idea)
guide for details.
+Check out our [Setting up
IntelliJ](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/maven/#importing-the-project-into-your-ide)
guide for details.
## Support
diff --git a/flink-connector-pulsar-e2e-tests/pom.xml
b/flink-connector-pulsar-e2e-tests/pom.xml
index 8fec5c7..5800eb5 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -49,6 +49,16 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
diff --git
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index ef63b84..a1ac646 100644
---
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
+++
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -21,18 +21,19 @@ package org.apache.flink.tests.util.pulsar;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import
org.apache.flink.connector.pulsar.testutils.sink.cases.SingleTopicProducingContext;
import
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils;
import
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
-import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE;
/** Pulsar sink E2E test based on the connector testing framework. */
@SuppressWarnings("unused")
@@ -45,7 +46,12 @@ public class PulsarSinkE2ECase extends
SinkTestSuiteBase<String> {
// Defines TestEnvironment
@TestEnv
FlinkContainerTestEnvironment flink =
- new
FlinkContainerTestEnvironment(FlinkContainerUtils.flinkConfiguration(), 1, 6);
+ FlinkContainerTestEnvironment.fromSettings(
+ FlinkContainersSettings.builder()
+ .basedOn(FlinkContainerUtils.flinkConfiguration())
+ .numTaskManagers(1)
+ .numSlotsPerTaskManager(6)
+ .build());
// Defines ConnectorExternalSystem.
@TestExternalSystem PulsarTestEnvironment pulsar = new
PulsarContainerTestEnvironment(flink);
diff --git
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
index ee362b8..e3a54f0 100644
---
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
+++
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
@@ -21,17 +21,18 @@ package org.apache.flink.tests.util.pulsar;
import
org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicsConsumingContext;
import
org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext;
import
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
+import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils;
import
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE;
/** Pulsar source E2E test based on the connector testing framework. */
@SuppressWarnings("unused")
@@ -43,7 +44,12 @@ public class PulsarSourceE2ECase extends
SourceTestSuiteBase<String> {
// Defines TestEnvironment.
@TestEnv
FlinkContainerTestEnvironment flink =
- new
FlinkContainerTestEnvironment(FlinkContainerUtils.flinkConfiguration(), 1, 6);
+ FlinkContainerTestEnvironment.fromSettings(
+ FlinkContainersSettings.builder()
+ .basedOn(FlinkContainerUtils.flinkConfiguration())
+ .numTaskManagers(1)
+ .numSlotsPerTaskManager(6)
+ .build());
// Defines ConnectorExternalSystem.
@TestExternalSystem
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index c700f88..93d5d95 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -238,27 +238,34 @@ public class PulsarPartitionSplitReader
try {
this.pulsarConsumer =
createPulsarConsumer(registeredSplit.getPartition());
} catch (PulsarClientException e) {
- LOG.warn("Failed to create consumer on partition {} on first
attempt, will retry after 2 seconds",
- registeredSplit.getPartition(), e);
+ LOG.warn(
+ "Failed to create consumer on partition {} on first
attempt, will retry after 2 seconds",
+ registeredSplit.getPartition(),
+ e);
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(
- String.format("Interrupted while waiting to retry
consumer creation on partition %s",
- registeredSplit.getPartition()), ie);
+ String.format(
+ "Interrupted while waiting to retry consumer
creation on partition %s",
+ registeredSplit.getPartition()),
+ ie);
}
// Retry consumer creation
try {
this.pulsarConsumer =
createPulsarConsumer(registeredSplit.getPartition());
- LOG.info("Successfully created consumer on partition {} on
second attempt",
+ LOG.info(
+ "Successfully created consumer on partition {} on
second attempt",
registeredSplit.getPartition());
} catch (PulsarClientException retryException) {
throw new FlinkRuntimeException(
- String.format("Failed to create consumer on partition
%s after retry",
- registeredSplit.getPartition()),
retryException);
+ String.format(
+ "Failed to create consumer on partition %s
after retry",
+ registeredSplit.getPartition()),
+ retryException);
}
}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 18d035b..0db4432 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -35,9 +35,9 @@ import
org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
@@ -52,8 +52,8 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
-import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE;
import static org.apache.pulsar.client.api.Schema.STRING;
import static org.assertj.core.api.Assertions.assertThat;
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index c8a6b33..63b2da7 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -31,11 +31,11 @@ import
org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.pulsar.client.api.SubscriptionType;
-import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE;
/**
* Unit test class for {@link PulsarSource}. Used for {@link
SubscriptionType#Exclusive}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java
index 58e2a35..cecd3e2 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarTableTestBase.java
@@ -25,7 +25,7 @@ import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;