This is an automated email from the ASF dual-hosted git repository.
dybyte pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5b708afd54 [Fix][Engine] Fix JDK 8 NoSuchMethodError in
StainTracePayload#append (#10994)
5b708afd54 is described below
commit 5b708afd54db6abbc0bfb67441f1ed9f578e0a75
Author: David Zollo <[email protected]>
AuthorDate: Thu Jun 4 20:30:24 2026 +0800
[Fix][Engine] Fix JDK 8 NoSuchMethodError in StainTracePayload#append
(#10994)
Co-authored-by: Daniel <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
.github/workflows/backend.yml | 3 +-
.../e2e/connector/pulsar/PulsarBatchIT.java | 20 ++--
.../connector/pulsar/PulsarContainerSupport.java | 103 +++++++++++++++++++++
.../e2e/connector/pulsar/PulsarMultiTableIT.java | 19 ++--
.../e2e/connector/pulsar/PulsarSinkIT.java | 20 ++--
.../engine/server/trace/StainTracePayload.java | 7 +-
6 files changed, 129 insertions(+), 43 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 930a4d4dec..b0b557cf87 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1240,7 +1240,8 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 60
+ # Kudu E2E expands each @TestTemplate case across several PR test
containers.
+ timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index 374571d0c4..8c5794886e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -47,11 +47,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PulsarContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
@@ -62,7 +58,6 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
@Slf4j
public class PulsarBatchIT extends TestSuiteBase implements TestResource {
@@ -113,15 +108,12 @@ public class PulsarBatchIT extends TestSuiteBase
implements TestResource {
@BeforeAll
public void startUp() throws Exception {
pulsarContainer =
- new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
- .withNetwork(NETWORK)
- .withNetworkAliases(PULSAR_HOST)
- .withStartupTimeout(Duration.ofMinutes(3))
- .withLogConsumer(
- new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
-
- Startables.deepStart(Stream.of(pulsarContainer)).join();
+ PulsarContainerSupport.startPulsarContainer(
+ dockerClient,
+ PULSAR_IMAGE_NAME,
+ NETWORK,
+ PULSAR_HOST,
+ Duration.ofMinutes(3));
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java
new file mode 100644
index 0000000000..ad9c0bc492
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarContainerSupport.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.exception.NotFoundException;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+final class PulsarContainerSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PulsarContainerSupport.class);
+ private static final Duration IMAGE_PULL_TIMEOUT = Duration.ofMinutes(10);
+
+ private PulsarContainerSupport() {}
+
+ /**
+ * Pre-pulls the Pulsar image before Testcontainers starts the container.
This keeps the first
+ * CI pull from failing when GitHub runners briefly make no download
progress and Testcontainers
+ * aborts the pull attempt early.
+ */
+ static PulsarContainer startPulsarContainer(
+ DockerClient dockerClient,
+ String imageName,
+ Network network,
+ String networkAlias,
+ Duration startupTimeout)
+ throws InterruptedException {
+ ensureImageAvailable(dockerClient, imageName);
+
+ PulsarContainer pulsarContainer =
+ new PulsarContainer(DockerImageName.parse(imageName))
+ .withNetwork(network)
+ .withNetworkAliases(networkAlias)
+ .withStartupTimeout(startupTimeout)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName)));
+
+ Startables.deepStart(Stream.of(pulsarContainer)).join();
+ return pulsarContainer;
+ }
+
+ private static void ensureImageAvailable(DockerClient dockerClient, String
imageName)
+ throws InterruptedException {
+ if (isImageAvailable(dockerClient, imageName)) {
+ LOG.info("Reuse local Pulsar image {}", imageName);
+ return;
+ }
+
+ DockerImageName dockerImageName = DockerImageName.parse(imageName);
+ LOG.info(
+ "Pre-pulling Pulsar image {} with timeout {} to avoid flaky
first-pull failures",
+ imageName,
+ IMAGE_PULL_TIMEOUT);
+ boolean completed =
+ dockerClient
+ .pullImageCmd(dockerImageName.getUnversionedPart())
+ .withTag(dockerImageName.getVersionPart())
+ .start()
+ .awaitCompletion(IMAGE_PULL_TIMEOUT.getSeconds(),
TimeUnit.SECONDS);
+ if (!completed || !isImageAvailable(dockerClient, imageName)) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to pre-pull Pulsar image %s within %s",
+ imageName, IMAGE_PULL_TIMEOUT));
+ }
+ }
+
+ private static boolean isImageAvailable(DockerClient dockerClient, String
imageName) {
+ try {
+ dockerClient.inspectImageCmd(imageName).exec();
+ return true;
+ } catch (NotFoundException ignored) {
+ return false;
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java
index 5eef343acd..ea6c59efd7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarMultiTableIT.java
@@ -31,11 +31,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PulsarContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
@@ -43,7 +39,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
@Slf4j
public class PulsarMultiTableIT extends TestSuiteBase implements TestResource {
@@ -60,14 +55,12 @@ public class PulsarMultiTableIT extends TestSuiteBase
implements TestResource {
@BeforeAll
public void startUp() throws Exception {
pulsarContainer =
- new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
- .withNetwork(NETWORK)
- .withNetworkAliases(PULSAR_HOST)
- .withStartupTimeout(Duration.ofMinutes(3))
- .withLogConsumer(
- new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
- Startables.deepStart(Stream.of(pulsarContainer)).join();
+ PulsarContainerSupport.startPulsarContainer(
+ dockerClient,
+ PULSAR_IMAGE_NAME,
+ NETWORK,
+ PULSAR_HOST,
+ Duration.ofMinutes(3));
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java
index 829c9b7cc5..252fdcbd12 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java
@@ -35,11 +35,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PulsarContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
@@ -49,7 +45,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
import static java.time.temporal.ChronoUnit.SECONDS;
@@ -65,15 +60,12 @@ public class PulsarSinkIT extends TestSuiteBase implements
TestResource {
@BeforeAll
public void startUp() throws Exception {
pulsarContainer =
- new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
- .withNetwork(NETWORK)
- .withNetworkAliases(PULSAR_HOST)
- .withStartupTimeout(Duration.of(400, SECONDS))
- .withLogConsumer(
- new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
-
- Startables.deepStart(Stream.of(pulsarContainer)).join();
+ PulsarContainerSupport.startPulsarContainer(
+ dockerClient,
+ PULSAR_IMAGE_NAME,
+ NETWORK,
+ PULSAR_HOST,
+ Duration.of(400, SECONDS));
Awaitility.given()
.ignoreExceptions()
.atLeast(100, TimeUnit.MILLISECONDS)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java
index 632c44cbed..e12c5b6c01 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/trace/StainTracePayload.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.trace;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
@@ -88,7 +89,11 @@ public final class StainTracePayload {
}
byte[] newPayload = Arrays.copyOf(payload, payload.length +
ENTRY_LENGTH);
ByteBuffer buffer =
ByteBuffer.wrap(newPayload).order(ByteOrder.BIG_ENDIAN);
- buffer.position(payload.length);
+ // Cast to Buffer (superclass) so that position(int) resolves to
Buffer#position(int)
+ // returning Buffer, which exists on JDK 8. ByteBuffer overrode this
method with a
+ // covariant ByteBuffer return type only in JDK 9+; calling the
ByteBuffer override on
+ // JDK 8 would throw NoSuchMethodError at runtime.
+ ((Buffer) buffer).position(payload.length);
buffer.put(stage.getCode());
buffer.putLong(taskId);
buffer.putLong(tsMs);