This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f07b3500d38 IGNITE-25715 Add Java client compute compatibility tests
(#6638)
f07b3500d38 is described below
commit f07b3500d3891e63748d46788f9592ea7b171076
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Sep 24 12:45:04 2025 +0300
IGNITE-25715 Add Java client compute compatibility tests (#6638)
---
modules/compatibility-tests/build.gradle | 4 +
modules/compatibility-tests/jobs.gradle | 44 +++++
.../internal/client/ClientCompatibilityTests.java | 187 +++++++++++++++++++--
...urrentClientWithOldServerCompatibilityTest.java | 2 +-
.../ignite/internal/client/DeploymentUtils.java | 60 +++++++
...ldClientWithCurrentServerCompatibilityTest.java | 44 ++++-
.../org/apache/ignite/internal/compute/Echo.java | 32 ++++
.../ignite/internal/compute/EchoReceiver.java | 42 +++++
8 files changed, 398 insertions(+), 17 deletions(-)
diff --git a/modules/compatibility-tests/build.gradle
b/modules/compatibility-tests/build.gradle
index e667a72ab31..6488fa7e8a5 100644
--- a/modules/compatibility-tests/build.gradle
+++ b/modules/compatibility-tests/build.gradle
@@ -22,6 +22,7 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/java-integration-test.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+apply from: 'jobs.gradle'
description = 'ignite-compatibility-tests'
@@ -33,11 +34,14 @@ repositories {
dependencies {
testImplementation libs.swagger.parser
+ integrationTestImplementation libs.netty.common
+
integrationTestImplementation
testFixtures(project(':ignite-compatibility-tests'))
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-client')
integrationTestImplementation project(':ignite-runner')
+ integrationTestImplementation project(':ignite-cli')
testFixturesImplementation libs.gradle.tooling.api
testFixturesImplementation libs.awaitility
diff --git a/modules/compatibility-tests/jobs.gradle
b/modules/compatibility-tests/jobs.gradle
new file mode 100644
index 00000000000..8697193e40d
--- /dev/null
+++ b/modules/compatibility-tests/jobs.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+sourceSets {
+ jobs
+}
+
+def registerJarTask(SourceSet sourceSet, String baseName) {
+ tasks.register(sourceSet.jarTaskName, Jar) {
+ group = 'build'
+ archiveBaseName = baseName
+ archiveVersion = '1.0-SNAPSHOT'
+ from sourceSet.output
+ }
+}
+
+registerJarTask(sourceSets.jobs, 'ignite-integration-test-jobs')
+
+processIntegrationTestResources {
+ into('units') {
+ from jobsJar
+ }
+}
+
+dependencies {
+ jobsImplementation project(':ignite-api')
+ jobsImplementation project(':ignite-core')
+
+ integrationTestImplementation sourceSets.jobs.output
+}
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
index ad9cfcb6ec0..f50d0599aa4 100644
---
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
@@ -36,10 +36,13 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -47,11 +50,11 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.CompatibilityTestCommon;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.BatchedArguments;
@@ -71,14 +74,17 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Client compatibility tests. Interface to allow "multiple inheritance" of
test methods.
*/
@SuppressWarnings({"resource", "DataFlowIssue"})
public interface ClientCompatibilityTests {
+ DeploymentUnit JOBS_UNIT = new DeploymentUnit("compat-test-jobs", "1.0");
+
IgniteClient client();
AtomicInteger idGen();
@@ -525,6 +531,57 @@ public interface ClientCompatibilityTests {
assertThat(ex.getMessage(), containsString("Cannot load job class by
name 'test'"));
}
+ @ParameterizedTest
+ @MethodSource("jobArgs")
+ default void testComputeArgs(Object arg) {
+ JobTarget target = JobTarget.anyNode(clusterNodes());
+ JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+ Object jobRes = client().compute().execute(target, desc, arg);
+
+ if (arg instanceof byte[]) {
+ assertArrayEquals((byte[]) arg, (byte[]) jobRes);
+ } else {
+ assertEquals(arg, jobRes);
+ }
+ }
+
+ @Test
+ default void testComputeExecute() {
+ JobTarget target = JobTarget.anyNode(clusterNodes());
+ JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+ Object jobRes = client().compute().execute(target, desc, "test");
+ assertEquals("test", jobRes);
+ }
+
+ @Test
+ default void testComputeExecuteColocated() {
+ JobTarget target = JobTarget.colocated(TABLE_NAME_TEST,
Tuple.create().set("id", 1));
+ JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+ Object jobRes = client().compute().execute(target, desc, "test");
+ assertEquals("test", jobRes);
+ }
+
+ @Test
+ default void testComputeExecuteBroadcast() {
+ BroadcastJobTarget target = BroadcastJobTarget.nodes(clusterNodes());
+ JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+ Collection<Object> jobRes = client().compute().execute(target, desc,
"test");
+ assertEquals("test", jobRes.iterator().next());
+ }
+
+ @Test
+ default void testComputeExecuteBroadcastTable() {
+ BroadcastJobTarget target = BroadcastJobTarget.table(TABLE_NAME_TEST);
+ JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+ Collection<Object> jobRes = client().compute().execute(target, desc,
"test");
+ assertEquals("test", jobRes.iterator().next());
+ }
+
@Test
default void testStreamer() {
RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
@@ -550,41 +607,83 @@ public interface ClientCompatibilityTests {
}
@Test
- @Disabled("IGNITE-25715")
default void testStreamerWithReceiver() {
RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
CompletableFuture<Void> streamFut;
- DataStreamerReceiverDescriptor<Tuple, String, Integer> desc =
DataStreamerReceiverDescriptor
- .<Tuple, String, Integer>builder("my-receiver")
- .units(new DeploymentUnit("my-unit", Version.LATEST))
+ DataStreamerReceiverDescriptor<Integer, Object, Integer> desc =
DataStreamerReceiverDescriptor
+ .<Integer, Object,
Integer>builder("org.apache.ignite.internal.compute.EchoReceiver")
+ .units(JOBS_UNIT)
.build();
- try (var publisher = new SubmissionPublisher<Tuple>()) {
+ var subscriber = new TestSubscriber<Integer>();
+ List<Integer> expected = new ArrayList<>();
+
+ try (var publisher = new SubmissionPublisher<Integer>()) {
streamFut = view.streamData(
publisher,
desc,
- x -> Tuple.create().set("id", x.intValue("a")),
+ x -> Tuple.create().set("id", x),
Function.identity(),
- "arg",
null,
- DataStreamerOptions.builder().pageSize(5).build());
+ subscriber,
+ DataStreamerOptions.builder().pageSize(3).build());
- for (int i = 0; i < 100; i++) {
- Tuple item = Tuple.create().set("a", i).set("b", "b_" + i);
- publisher.submit(item);
+ for (int i = 0; i < 10; i++) {
+ publisher.submit(i);
+ expected.add(i);
+ }
+ }
+
+ streamFut.join();
+
+ List<Integer> sortedResults = subscriber.items.stream()
+ .sorted()
+ .collect(Collectors.toList());
+
+ assertEquals(expected, sortedResults);
+ }
+
+ @Test
+ default void testStreamerWithReceiverArg() {
+ RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
+
+ CompletableFuture<Void> streamFut;
+
+ DataStreamerReceiverDescriptor<Integer, String, String> desc =
DataStreamerReceiverDescriptor
+ .<Integer, String,
String>builder("org.apache.ignite.internal.compute.EchoReceiver")
+ .units(JOBS_UNIT)
+ .build();
+
+ var subscriber = new TestSubscriber<String>();
+
+ try (var publisher = new SubmissionPublisher<Integer>()) {
+ streamFut = view.streamData(
+ publisher,
+ desc,
+ x -> Tuple.create().set("id", x),
+ Function.identity(),
+ "arg",
+ subscriber,
+ DataStreamerOptions.builder().pageSize(2).build());
+
+ for (int i = 0; i < 10; i++) {
+ publisher.submit(i);
}
}
streamFut.join();
+
+ assertEquals("arg", subscriber.items.iterator().next());
}
/**
- * Creates default tables for testing.
+ * Initialize test data in the given Ignite instance.
*/
- default void createDefaultTables(Ignite ignite) {
+ default void initTestData(Ignite ignite) {
CompatibilityTestCommon.createDefaultTables(ignite);
+ DeploymentUtils.deployJobs();
}
default void close() {
@@ -602,4 +701,62 @@ public interface ClientCompatibilityTests {
private Table table(String tableName) {
return client().tables().table(tableName);
}
+
+ /**
+ * Arguments for job execution tests.
+ *
+ * @return Array of arguments.
+ */
+ static Object[] jobArgs() {
+ return new Object[]{
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.5f,
+ 6.6d,
+ new BigDecimal("7.7"),
+ LocalDate.now(),
+ LocalTime.now(),
+ LocalDateTime.now(),
+ Instant.ofEpochSecond(123456),
+ UUID.randomUUID(),
+ "test",
+ new byte[]{1, 2, 3, 4},
+ null
+ };
+ }
+
+ private static JobDescriptor<Object, Object> echoJobDescriptor() {
+ return JobDescriptor
+ .builder("org.apache.ignite.internal.compute.Echo")
+ .units(JOBS_UNIT)
+ .build();
+ }
+
+ /**
+ * Test subscriber.
+ */
+ class TestSubscriber<T> implements Subscriber<T> {
+ List<T> items = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(T item) {
+ items.add(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ }
}
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
index 57f7114aed6..1bf0cbd7538 100644
---
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
@@ -41,7 +41,7 @@ public class CurrentClientWithOldServerCompatibilityTest
extends CompatibilityTe
@Override
protected void setupBaseVersion(Ignite baseIgnite) {
- createDefaultTables(baseIgnite);
+ initTestData(baseIgnite);
}
@Override
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
new file mode 100644
index 00000000000..18cff1c293a
--- /dev/null
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static
org.apache.ignite.internal.client.ClientCompatibilityTests.JOBS_UNIT;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.sneakyThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.internal.cli.call.cluster.unit.DeployUnitClient;
+import org.apache.ignite.rest.client.invoker.ApiClient;
+import org.apache.ignite.rest.client.invoker.ApiException;
+import org.apache.ignite.rest.client.model.DeployMode;
+
+/**
+ * Utility class for deploying jobs.
+ */
+class DeploymentUtils {
+ static void deployJobs() {
+ File jobsJar = Path.of(
+ getResourcePath(ClientCompatibilityTests.class, ""),
+
"../../../libs/ignite-integration-test-jobs-1.0-SNAPSHOT.jar").toFile();
+
+ deployUnit(List.of(jobsJar), JOBS_UNIT.name(),
JOBS_UNIT.version().render());
+ }
+
+ private static void deployUnit(List<File> unitFiles, String unitName,
String unitVersion) {
+ // TODO IGNITE-26418 Netty buffer leaks in REST API
+ ResourceLeakDetector.setLevel(Level.DISABLED);
+
+ DeployUnitClient deployUnitClient = new DeployUnitClient(new
ApiClient());
+
+ try {
+ Boolean deployRes = deployUnitClient.deployUnit(unitName,
unitFiles, unitVersion, DeployMode.ALL, List.of());
+ assertTrue(deployRes);
+ } catch (ApiException e) {
+ sneakyThrow(e);
+ }
+ }
+}
diff --git
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
index 82aee9e2c4d..72c1da66454 100644
---
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
+++
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
@@ -45,6 +45,7 @@ import
org.junit.jupiter.params.AfterParameterizedClassInvocation;
import org.junit.jupiter.params.BeforeParameterizedClassInvocation;
import org.junit.jupiter.params.Parameter;
import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/**
@@ -70,7 +71,7 @@ public class OldClientWithCurrentServerCompatibilityTest
extends BaseIgniteAbstr
cluster = CompatibilityTestBase.createCluster(testInfo, workDir,
CompatibilityTestBase.NODE_BOOTSTRAP_CFG_TEMPLATE);
cluster.startEmbedded(1, true);
- createDefaultTables(cluster.node(0));
+ initTestData(cluster.node(0));
delegate = createTestInstanceWithOldClient(clientVersion);
}
@@ -206,12 +207,53 @@ public class OldClientWithCurrentServerCompatibilityTest
extends BaseIgniteAbstr
delegate.testComputeMissingJob();
}
+ @Override
+ @ParameterizedTest
+ @MethodSource("jobArgs")
+ public void testComputeArgs(Object arg) {
+ delegate.testComputeArgs(arg);
+ }
+
+ @Test
+ @Override
+ public void testComputeExecute() {
+ delegate.testComputeExecute();
+ }
+
+ @Test
+ @Override
+ public void testComputeExecuteColocated() {
+ delegate.testComputeExecuteColocated();
+ }
+
+ @Test
+ @Override
+ public void testComputeExecuteBroadcast() {
+ delegate.testComputeExecuteBroadcast();
+ }
+
+ @Test
+ @Override
+ public void testComputeExecuteBroadcastTable() {
+ delegate.testComputeExecuteBroadcastTable();
+ }
+
@Test
@Override
public void testStreamer() {
delegate.testStreamer();
}
+ @Override
+ public void testStreamerWithReceiver() {
+ delegate.testStreamerWithReceiver();
+ }
+
+ @Override
+ public void testStreamerWithReceiverArg() {
+ delegate.testStreamerWithReceiverArg();
+ }
+
private static ClientCompatibilityTests
createTestInstanceWithOldClient(String igniteVersion)
throws Exception {
var loader = OldClientLoader.getIsolatedClassLoader(igniteVersion);
diff --git
a/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
new file mode 100644
index 00000000000..b45c11bb2b4
--- /dev/null
+++
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * A simple echo job that returns the input argument as the result.
+ */
+public class Echo implements ComputeJob<Object, Object> {
+ @Override
+ public CompletableFuture<Object> executeAsync(JobExecutionContext context,
Object arg) {
+ return CompletableFuture.completedFuture(arg);
+ }
+}
diff --git
a/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
new file mode 100644
index 00000000000..cc073686aef
--- /dev/null
+++
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.table.DataStreamerReceiver;
+import org.apache.ignite.table.DataStreamerReceiverContext;
+
+/**
+ * Test receiver.
+ */
+public class EchoReceiver implements DataStreamerReceiver<Object, Object,
Object> {
+ @Override
+ public CompletableFuture<List<Object>> receive(
+ List<Object> page,
+ DataStreamerReceiverContext ctx,
+ Object arg) {
+ if (arg != null) {
+ return completedFuture(List.of(arg));
+ }
+
+ return completedFuture(page);
+ }
+}