This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f07b3500d38 IGNITE-25715 Add Java client compute compatibility tests 
(#6638)
f07b3500d38 is described below

commit f07b3500d3891e63748d46788f9592ea7b171076
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Sep 24 12:45:04 2025 +0300

    IGNITE-25715 Add Java client compute compatibility tests (#6638)
---
 modules/compatibility-tests/build.gradle           |   4 +
 modules/compatibility-tests/jobs.gradle            |  44 +++++
 .../internal/client/ClientCompatibilityTests.java  | 187 +++++++++++++++++++--
 ...urrentClientWithOldServerCompatibilityTest.java |   2 +-
 .../ignite/internal/client/DeploymentUtils.java    |  60 +++++++
 ...ldClientWithCurrentServerCompatibilityTest.java |  44 ++++-
 .../org/apache/ignite/internal/compute/Echo.java   |  32 ++++
 .../ignite/internal/compute/EchoReceiver.java      |  42 +++++
 8 files changed, 398 insertions(+), 17 deletions(-)

diff --git a/modules/compatibility-tests/build.gradle 
b/modules/compatibility-tests/build.gradle
index e667a72ab31..6488fa7e8a5 100644
--- a/modules/compatibility-tests/build.gradle
+++ b/modules/compatibility-tests/build.gradle
@@ -22,6 +22,7 @@ apply from: "$rootDir/buildscripts/java-core.gradle"
 apply from: "$rootDir/buildscripts/java-integration-test.gradle"
 apply from: "$rootDir/buildscripts/java-junit5.gradle"
 apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+apply from: 'jobs.gradle'
 
 description = 'ignite-compatibility-tests'
 
@@ -33,11 +34,14 @@ repositories {
 dependencies {
     testImplementation libs.swagger.parser
 
+    integrationTestImplementation libs.netty.common
+
     integrationTestImplementation 
testFixtures(project(':ignite-compatibility-tests'))
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation project(':ignite-api')
     integrationTestImplementation project(':ignite-client')
     integrationTestImplementation project(':ignite-runner')
+    integrationTestImplementation project(':ignite-cli')
 
     testFixturesImplementation libs.gradle.tooling.api
     testFixturesImplementation libs.awaitility
diff --git a/modules/compatibility-tests/jobs.gradle 
b/modules/compatibility-tests/jobs.gradle
new file mode 100644
index 00000000000..8697193e40d
--- /dev/null
+++ b/modules/compatibility-tests/jobs.gradle
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+sourceSets {
+    jobs
+}
+
+def registerJarTask(SourceSet sourceSet, String baseName) {
+    tasks.register(sourceSet.jarTaskName, Jar) {
+        group = 'build'
+        archiveBaseName = baseName
+        archiveVersion = '1.0-SNAPSHOT'
+        from sourceSet.output
+    }
+}
+
+registerJarTask(sourceSets.jobs, 'ignite-integration-test-jobs')
+
+processIntegrationTestResources {
+    into('units') {
+        from jobsJar
+    }
+}
+
+dependencies {
+    jobsImplementation project(':ignite-api')
+    jobsImplementation project(':ignite-core')
+
+    integrationTestImplementation sourceSets.jobs.output
+}
diff --git 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
index ad9cfcb6ec0..f50d0599aa4 100644
--- 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
+++ 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/ClientCompatibilityTests.java
@@ -36,10 +36,13 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
@@ -47,11 +50,11 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.BroadcastJobTarget;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.deployment.DeploymentUnit;
-import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.CompatibilityTestCommon;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.BatchedArguments;
@@ -71,14 +74,17 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matchers;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Client compatibility tests. Interface to allow "multiple inheritance" of 
test methods.
  */
 @SuppressWarnings({"resource", "DataFlowIssue"})
 public interface ClientCompatibilityTests {
+    DeploymentUnit JOBS_UNIT = new DeploymentUnit("compat-test-jobs", "1.0");
+
     IgniteClient client();
 
     AtomicInteger idGen();
@@ -525,6 +531,57 @@ public interface ClientCompatibilityTests {
         assertThat(ex.getMessage(), containsString("Cannot load job class by 
name 'test'"));
     }
 
+    @ParameterizedTest
+    @MethodSource("jobArgs")
+    default void testComputeArgs(Object arg) {
+        JobTarget target = JobTarget.anyNode(clusterNodes());
+        JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+        Object jobRes = client().compute().execute(target, desc, arg);
+
+        if (arg instanceof byte[]) {
+            assertArrayEquals((byte[]) arg, (byte[]) jobRes);
+        } else {
+            assertEquals(arg, jobRes);
+        }
+    }
+
+    @Test
+    default void testComputeExecute() {
+        JobTarget target = JobTarget.anyNode(clusterNodes());
+        JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+        Object jobRes = client().compute().execute(target, desc, "test");
+        assertEquals("test", jobRes);
+    }
+
+    @Test
+    default void testComputeExecuteColocated() {
+        JobTarget target = JobTarget.colocated(TABLE_NAME_TEST, 
Tuple.create().set("id", 1));
+        JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+        Object jobRes = client().compute().execute(target, desc, "test");
+        assertEquals("test", jobRes);
+    }
+
+    @Test
+    default void testComputeExecuteBroadcast() {
+        BroadcastJobTarget target = BroadcastJobTarget.nodes(clusterNodes());
+        JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+        Collection<Object> jobRes = client().compute().execute(target, desc, 
"test");
+        assertEquals("test", jobRes.iterator().next());
+    }
+
+    @Test
+    default void testComputeExecuteBroadcastTable() {
+        BroadcastJobTarget target = BroadcastJobTarget.table(TABLE_NAME_TEST);
+        JobDescriptor<Object, Object> desc = echoJobDescriptor();
+
+        Collection<Object> jobRes = client().compute().execute(target, desc, 
"test");
+        assertEquals("test", jobRes.iterator().next());
+    }
+
     @Test
     default void testStreamer() {
         RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
@@ -550,41 +607,83 @@ public interface ClientCompatibilityTests {
     }
 
     @Test
-    @Disabled("IGNITE-25715")
     default void testStreamerWithReceiver() {
         RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
 
         CompletableFuture<Void> streamFut;
 
-        DataStreamerReceiverDescriptor<Tuple, String, Integer> desc = 
DataStreamerReceiverDescriptor
-                .<Tuple, String, Integer>builder("my-receiver")
-                .units(new DeploymentUnit("my-unit", Version.LATEST))
+        DataStreamerReceiverDescriptor<Integer, Object, Integer> desc = 
DataStreamerReceiverDescriptor
+                .<Integer, Object, 
Integer>builder("org.apache.ignite.internal.compute.EchoReceiver")
+                .units(JOBS_UNIT)
                 .build();
 
-        try (var publisher = new SubmissionPublisher<Tuple>()) {
+        var subscriber = new TestSubscriber<Integer>();
+        List<Integer> expected = new ArrayList<>();
+
+        try (var publisher = new SubmissionPublisher<Integer>()) {
             streamFut = view.streamData(
                     publisher,
                     desc,
-                    x -> Tuple.create().set("id", x.intValue("a")),
+                    x -> Tuple.create().set("id", x),
                     Function.identity(),
-                    "arg",
                     null,
-                    DataStreamerOptions.builder().pageSize(5).build());
+                    subscriber,
+                    DataStreamerOptions.builder().pageSize(3).build());
 
-            for (int i = 0; i < 100; i++) {
-                Tuple item = Tuple.create().set("a", i).set("b", "b_" + i);
-                publisher.submit(item);
+            for (int i = 0; i < 10; i++) {
+                publisher.submit(i);
+                expected.add(i);
+            }
+        }
+
+        streamFut.join();
+
+        List<Integer> sortedResults = subscriber.items.stream()
+                .sorted()
+                .collect(Collectors.toList());
+
+        assertEquals(expected, sortedResults);
+    }
+
+    @Test
+    default void testStreamerWithReceiverArg() {
+        RecordView<Tuple> view = table(TABLE_NAME_TEST).recordView();
+
+        CompletableFuture<Void> streamFut;
+
+        DataStreamerReceiverDescriptor<Integer, String, String> desc = 
DataStreamerReceiverDescriptor
+                .<Integer, String, 
String>builder("org.apache.ignite.internal.compute.EchoReceiver")
+                .units(JOBS_UNIT)
+                .build();
+
+        var subscriber = new TestSubscriber<String>();
+
+        try (var publisher = new SubmissionPublisher<Integer>()) {
+            streamFut = view.streamData(
+                    publisher,
+                    desc,
+                    x -> Tuple.create().set("id", x),
+                    Function.identity(),
+                    "arg",
+                    subscriber,
+                    DataStreamerOptions.builder().pageSize(2).build());
+
+            for (int i = 0; i < 10; i++) {
+                publisher.submit(i);
             }
         }
 
         streamFut.join();
+
+        assertEquals("arg", subscriber.items.iterator().next());
     }
 
     /**
-     * Creates default tables for testing.
+     * Initialize test data in the given Ignite instance.
      */
-    default void createDefaultTables(Ignite ignite) {
+    default void initTestData(Ignite ignite) {
         CompatibilityTestCommon.createDefaultTables(ignite);
+        DeploymentUtils.deployJobs();
     }
 
     default void close() {
@@ -602,4 +701,62 @@ public interface ClientCompatibilityTests {
     private Table table(String tableName) {
         return client().tables().table(tableName);
     }
+
+    /**
+     * Arguments for job execution tests.
+     *
+     * @return Array of arguments.
+     */
+    static Object[] jobArgs() {
+        return new Object[]{
+                true,
+                (byte) 1,
+                (short) 2,
+                3,
+                4L,
+                5.5f,
+                6.6d,
+                new BigDecimal("7.7"),
+                LocalDate.now(),
+                LocalTime.now(),
+                LocalDateTime.now(),
+                Instant.ofEpochSecond(123456),
+                UUID.randomUUID(),
+                "test",
+                new byte[]{1, 2, 3, 4},
+                null
+        };
+    }
+
+    private static JobDescriptor<Object, Object> echoJobDescriptor() {
+        return JobDescriptor
+                .builder("org.apache.ignite.internal.compute.Echo")
+                .units(JOBS_UNIT)
+                .build();
+    }
+
+    /**
+     * Test subscriber.
+     */
+    class TestSubscriber<T> implements Subscriber<T> {
+        List<T> items = Collections.synchronizedList(new ArrayList<>());
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            subscription.request(Long.MAX_VALUE);
+        }
+
+        @Override
+        public void onNext(T item) {
+            items.add(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+        }
+
+        @Override
+        public void onComplete() {
+        }
+    }
 }
diff --git 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
index 57f7114aed6..1bf0cbd7538 100644
--- 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
+++ 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/CurrentClientWithOldServerCompatibilityTest.java
@@ -41,7 +41,7 @@ public class CurrentClientWithOldServerCompatibilityTest 
extends CompatibilityTe
 
     @Override
     protected void setupBaseVersion(Ignite baseIgnite) {
-        createDefaultTables(baseIgnite);
+        initTestData(baseIgnite);
     }
 
     @Override
diff --git 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
new file mode 100644
index 00000000000..18cff1c293a
--- /dev/null
+++ 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/DeploymentUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static 
org.apache.ignite.internal.client.ClientCompatibilityTests.JOBS_UNIT;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.sneakyThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.internal.cli.call.cluster.unit.DeployUnitClient;
+import org.apache.ignite.rest.client.invoker.ApiClient;
+import org.apache.ignite.rest.client.invoker.ApiException;
+import org.apache.ignite.rest.client.model.DeployMode;
+
+/**
+ * Utility class for deploying jobs.
+ */
+class DeploymentUtils {
+    static void deployJobs() {
+        File jobsJar = Path.of(
+                getResourcePath(ClientCompatibilityTests.class, ""),
+                
"../../../libs/ignite-integration-test-jobs-1.0-SNAPSHOT.jar").toFile();
+
+        deployUnit(List.of(jobsJar), JOBS_UNIT.name(), 
JOBS_UNIT.version().render());
+    }
+
+    private static void deployUnit(List<File> unitFiles, String unitName, 
String unitVersion) {
+        // TODO IGNITE-26418 Netty buffer leaks in REST API
+        ResourceLeakDetector.setLevel(Level.DISABLED);
+
+        DeployUnitClient deployUnitClient = new DeployUnitClient(new 
ApiClient());
+
+        try {
+            Boolean deployRes = deployUnitClient.deployUnit(unitName, 
unitFiles, unitVersion, DeployMode.ALL, List.of());
+            assertTrue(deployRes);
+        } catch (ApiException e) {
+            sneakyThrow(e);
+        }
+    }
+}
diff --git 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
index 82aee9e2c4d..72c1da66454 100644
--- 
a/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
+++ 
b/modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/client/OldClientWithCurrentServerCompatibilityTest.java
@@ -45,6 +45,7 @@ import 
org.junit.jupiter.params.AfterParameterizedClassInvocation;
 import org.junit.jupiter.params.BeforeParameterizedClassInvocation;
 import org.junit.jupiter.params.Parameter;
 import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
@@ -70,7 +71,7 @@ public class OldClientWithCurrentServerCompatibilityTest 
extends BaseIgniteAbstr
         cluster = CompatibilityTestBase.createCluster(testInfo, workDir, 
CompatibilityTestBase.NODE_BOOTSTRAP_CFG_TEMPLATE);
         cluster.startEmbedded(1, true);
 
-        createDefaultTables(cluster.node(0));
+        initTestData(cluster.node(0));
 
         delegate = createTestInstanceWithOldClient(clientVersion);
     }
@@ -206,12 +207,53 @@ public class OldClientWithCurrentServerCompatibilityTest 
extends BaseIgniteAbstr
         delegate.testComputeMissingJob();
     }
 
+    @Override
+    @ParameterizedTest
+    @MethodSource("jobArgs")
+    public void testComputeArgs(Object arg) {
+        delegate.testComputeArgs(arg);
+    }
+
+    @Test
+    @Override
+    public void testComputeExecute() {
+        delegate.testComputeExecute();
+    }
+
+    @Test
+    @Override
+    public void testComputeExecuteColocated() {
+        delegate.testComputeExecuteColocated();
+    }
+
+    @Test
+    @Override
+    public void testComputeExecuteBroadcast() {
+        delegate.testComputeExecuteBroadcast();
+    }
+
+    @Test
+    @Override
+    public void testComputeExecuteBroadcastTable() {
+        delegate.testComputeExecuteBroadcastTable();
+    }
+
     @Test
     @Override
     public void testStreamer() {
         delegate.testStreamer();
     }
 
+    @Override
+    public void testStreamerWithReceiver() {
+        delegate.testStreamerWithReceiver();
+    }
+
+    @Override
+    public void testStreamerWithReceiverArg() {
+        delegate.testStreamerWithReceiverArg();
+    }
+
     private static ClientCompatibilityTests 
createTestInstanceWithOldClient(String igniteVersion)
             throws Exception {
         var loader = OldClientLoader.getIsolatedClassLoader(igniteVersion);
diff --git 
a/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
 
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
new file mode 100644
index 00000000000..b45c11bb2b4
--- /dev/null
+++ 
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/Echo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * A simple echo job that returns the input argument as the result.
+ */
+public class Echo implements ComputeJob<Object, Object> {
+    @Override
+    public CompletableFuture<Object> executeAsync(JobExecutionContext context, 
Object arg) {
+        return CompletableFuture.completedFuture(arg);
+    }
+}
diff --git 
a/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
 
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
new file mode 100644
index 00000000000..cc073686aef
--- /dev/null
+++ 
b/modules/compatibility-tests/src/jobs/java/org/apache/ignite/internal/compute/EchoReceiver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.table.DataStreamerReceiver;
+import org.apache.ignite.table.DataStreamerReceiverContext;
+
+/**
+ * Test receiver.
+ */
+public class EchoReceiver implements DataStreamerReceiver<Object, Object, 
Object> {
+    @Override
+    public CompletableFuture<List<Object>> receive(
+            List<Object> page,
+            DataStreamerReceiverContext ctx,
+            Object arg) {
+        if (arg != null) {
+            return completedFuture(List.of(arg));
+        }
+
+        return completedFuture(page);
+    }
+}

Reply via email to