This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 02f8f5d9a68 IGNITE-27594 Optimize observable timestamp handling in
compute jobs (#7611)
02f8f5d9a68 is described below
commit 02f8f5d9a6895446574cff2c6c24b0066223a917
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Mar 24 06:00:29 2026 +0300
IGNITE-27594 Optimize observable timestamp handling in compute jobs (#7611)
---
modules/compute/build.gradle | 1 +
modules/compute/jobs.gradle | 3 +
.../ignite/internal/compute/ItComputeBaseTest.java | 32 +--
.../jobs/embedded/ObservableTimestampJob.java | 40 ++++
.../jobs/embedded/ObservableTimestampResult.java | 34 +++
.../jobs/standalone/ObservableTimestampJob.java | 40 ++++
.../jobs/standalone/ObservableTimestampResult.java | 34 +++
.../internal/compute/ComputeComponentImpl.java | 9 +-
.../internal/compute/ComputeIgniteFactory.java | 40 ++++
.../ignite/internal/compute/ExecutionContext.java | 3 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 5 +-
.../compute/executor/ComputeExecutorImpl.java | 22 +-
.../internal/compute/ComputeComponentImplTest.java | 13 +-
.../compute/executor/ComputeExecutorTest.java | 9 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 11 +-
.../ignite/internal/app/JobScopedIgnite.java | 103 +++++++++
.../ignite/internal/sql/api/IgniteSqlImpl.java | 64 +++++-
.../internal/sql/api/JobScopedIgniteSql.java | 231 +++++++++++++++++++++
.../internal/tx/impl/IgniteTransactionsImpl.java | 5 +
19 files changed, 658 insertions(+), 41 deletions(-)
diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index 0be2d5189ff..f874b6f5c2b 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -66,6 +66,7 @@ dependencies {
integrationTestImplementation project(':ignite-system-view-api')
integrationTestImplementation project(':ignite-client-common')
integrationTestImplementation project(':ignite-eventlog')
+ integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
}
diff --git a/modules/compute/jobs.gradle b/modules/compute/jobs.gradle
index b897fafdb22..50e758e70b8 100644
--- a/modules/compute/jobs.gradle
+++ b/modules/compute/jobs.gradle
@@ -50,6 +50,9 @@ processIntegrationTestResources {
dependencies {
jobsImplementation project(':ignite-api')
jobsImplementation project(':ignite-core')
+ jobsImplementation project(':ignite-transactions')
+ jobsImplementation project(':ignite-runner')
+ jobsImplementation testFixtures(project(':ignite-runner'))
unit1Implementation project(':ignite-api')
unit2Implementation project(':ignite-api')
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 68aee720172..63147f204cf 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.compute;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.compute.JobStatus.CANCELED;
@@ -50,6 +49,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -73,11 +73,9 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ComputeException;
-import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
@@ -96,6 +94,7 @@ import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.table.partition.Partition;
+import org.example.jobs.embedded.ObservableTimestampResult;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
@@ -892,16 +891,28 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
HybridTimestamp localObservableTs = currentObservableTimestamp();
assertNotNull(localObservableTs);
- JobExecution<Long> execution = submit(
+ // Capture the target node's global tracker before the job runs.
+ HybridTimestamp targetNodeTsBefore =
unwrapIgniteImpl(node(1)).observableTimeTracker().get();
+
+ JobExecution<ObservableTimestampResult> execution = submit(
JobTarget.node(clusterNode(node(1))),
-
JobDescriptor.builder(ObservableTimestampJob.class).units(units()).build(),
+ JobDescriptor.<Void,
ObservableTimestampResult>builder(jobClassName("ObservableTimestampJob"))
+ .resultClass(ObservableTimestampResult.class)
+ .units(units())
+ .build(),
null
);
- Long jobRes = execution.resultAsync().join();
- HybridTimestamp jobObservableTs =
HybridTimestamp.nullableHybridTimestamp(jobRes);
+ ObservableTimestampResult jobRes = execution.resultAsync().join();
+ // The per-job tracker should have the client's observable timestamp.
+ HybridTimestamp jobObservableTs =
HybridTimestamp.nullableHybridTimestamp(jobRes.perJobTimestamp);
assertThat(jobObservableTs, is(localObservableTs));
+
+ // The node's global tracker should NOT be updated by the compute job.
+ HybridTimestamp targetNodeTsAfter =
HybridTimestamp.nullableHybridTimestamp(jobRes.nodeGlobalTimestamp);
+ assertThat(targetNodeTsAfter, is(targetNodeTsBefore));
+ assertThat(targetNodeTsAfter, not(jobObservableTs));
}
protected @Nullable HybridTimestamp currentObservableTimestamp() {
@@ -980,11 +991,4 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
.or(instanceOf(CancellationException.class))
);
}
-
- private static class ObservableTimestampJob implements ComputeJob<Object,
Long> {
- @Override
- public CompletableFuture<Long> executeAsync(JobExecutionContext
context, Object arg) {
- return
completedFuture(unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong());
- }
- }
}
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
new file mode 100644
index 00000000000..716c4b15868
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteTransactionsImpl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Compute job that returns both the per-job scoped observable timestamp and
the node's global observable timestamp.
+ * Used to verify that compute jobs receive the client's observable timestamp
without polluting the node's global tracker.
+ */
+public class ObservableTimestampJob implements ComputeJob<Void,
ObservableTimestampResult> {
+ @Override
+ public CompletableFuture<ObservableTimestampResult>
executeAsync(JobExecutionContext context, Void arg) {
+ long perJobTs =
unwrapIgniteTransactionsImpl(context.ignite().transactions()).observableTimestampTracker().getLong();
+ long nodeGlobalTs =
unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong();
+
+ return completedFuture(new ObservableTimestampResult(perJobTs,
nodeGlobalTs));
+ }
+}
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
new file mode 100644
index 00000000000..73e1df6299b
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+/**
+ * Result of {@link ObservableTimestampJob} containing both per-job and node
global observable timestamps.
+ */
+public class ObservableTimestampResult {
+ public long perJobTimestamp;
+ public long nodeGlobalTimestamp;
+
+ public ObservableTimestampResult() {
+ }
+
+ public ObservableTimestampResult(long perJobTimestamp, long
nodeGlobalTimestamp) {
+ this.perJobTimestamp = perJobTimestamp;
+ this.nodeGlobalTimestamp = nodeGlobalTimestamp;
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
new file mode 100644
index 00000000000..508a01245e5
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteTransactionsImpl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Compute job that returns both the per-job scoped observable timestamp and
the node's global observable timestamp.
+ * Used to verify that compute jobs receive the client's observable timestamp
without polluting the node's global tracker.
+ */
+public class ObservableTimestampJob implements ComputeJob<Void,
ObservableTimestampResult> {
+ @Override
+ public CompletableFuture<ObservableTimestampResult>
executeAsync(JobExecutionContext context, Void arg) {
+ long perJobTs =
unwrapIgniteTransactionsImpl(context.ignite().transactions()).observableTimestampTracker().getLong();
+ long nodeGlobalTs =
unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong();
+
+ return completedFuture(new ObservableTimestampResult(perJobTs,
nodeGlobalTs));
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
new file mode 100644
index 00000000000..975e3a1bc08
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+/**
+ * Result of {@link ObservableTimestampJob} containing both per-job and node
global observable timestamps.
+ */
+public class ObservableTimestampResult {
+ public long perJobTimestamp;
+ public long nodeGlobalTimestamp;
+
+ public ObservableTimestampResult() {
+ }
+
+ public ObservableTimestampResult(long perJobTimestamp, long
nodeGlobalTimestamp) {
+ this.perJobTimestamp = perJobTimestamp;
+ this.nodeGlobalTimestamp = nodeGlobalTimestamp;
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index ab6e966b019..1eea4f59e18 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -47,7 +47,6 @@ import
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.future.InFlightFutures;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -90,8 +89,6 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
private final EventLog eventLog;
- private final HybridTimestampTracker observableTimestampTracker;
-
private final ComputeMessaging messaging;
private final ExecutionManager executionManager;
@@ -111,15 +108,13 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
UnitsContextManager jobContextManager,
ComputeExecutor executor,
ComputeConfiguration computeConfiguration,
- EventLog eventLog,
- HybridTimestampTracker observableTimestampTracker
+ EventLog eventLog
) {
this.topologyService = topologyService;
this.logicalTopologyService = logicalTopologyService;
this.jobContextManager = jobContextManager;
this.executor = executor;
this.eventLog = eventLog;
- this.observableTimestampTracker = observableTimestampTracker;
executionManager = new ExecutionManager(computeConfiguration,
topologyService);
messaging = new ComputeMessaging(executionManager, messagingService,
topologyService);
failoverExecutor = Executors.newSingleThreadExecutor(
@@ -137,8 +132,6 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
}
try {
-
observableTimestampTracker.update(executionContext.observableTimestamp());
-
CompletableFuture<UnitsClassLoaderContext> classLoaderFut =
jobContextManager.acquireClassLoader(executionContext.units(),
executionContext.jobClassName());
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
new file mode 100644
index 00000000000..6ae7f9fb796
--- /dev/null
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+
+/**
+ * Factory for creating per-job scoped {@link Ignite} instances.
+ *
+ * <p>Each compute job carries an observable timestamp from its initiator
client.
+ * Instead of updating a global tracker (which would cause "timestamp
pollution"
+ * across unrelated jobs), a per-job {@link HybridTimestampTracker} is created
+ * and used to scope the {@link Ignite} instance given to the job.
+ */
+@FunctionalInterface
+public interface ComputeIgniteFactory {
+ /**
+ * Creates an {@link Ignite} instance scoped to a specific compute job.
+ *
+ * @param tracker Per-job hybrid timestamp tracker.
+ * @return An Ignite instance that uses the given tracker for transactions
and SQL.
+ */
+ Ignite createForJob(HybridTimestampTracker tracker);
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
index a65a610eb00..606958ccb49 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -121,9 +121,10 @@ public class ExecutionContext {
* Gets the observable timestamp from the job initiator client.
* This ensures that the job sees the changes made by the client up to the
point of job submission.
*
+ * @param arg Job argument.
* @return Observable timestamp or {@link
HybridTimestamp#NULL_HYBRID_TIMESTAMP} if not set.
*/
- public long observableTimestamp() {
+ public static long observableTimestamp(@Nullable ComputeJobDataHolder arg)
{
if (arg == null) {
return HybridTimestamp.NULL_HYBRID_TIMESTAMP;
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 533f0b927a4..b5e7f1c4e46 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -403,13 +403,12 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
ExecutionContext executionContext,
@Nullable CancellationToken cancellationToken
) {
- Set<InternalClusterNode> candidates1 = new HashSet<>();
+ Set<InternalClusterNode> candidates = new HashSet<>();
for (InternalClusterNode node : nodes) {
if (topologyService.getByConsistentId(node.name()) != null) {
- candidates1.add(node);
+ candidates.add(node);
}
}
- Set<InternalClusterNode> candidates = candidates1;
if (candidates.isEmpty()) {
Set<String> nodeNames =
nodes.stream().map(InternalClusterNode::name).collect(Collectors.toSet());
return failedFuture(new NodeNotFoundException(nodeNames));
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index da589a47294..8f4911b577b 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.internal.compute.ComputeUtils.getJobExecuteArgum
import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
import static org.apache.ignite.internal.compute.ComputeUtils.taskClass;
import static
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+import static
org.apache.ignite.internal.hlc.HybridTimestampTracker.atomicTracker;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
@@ -34,9 +36,11 @@ import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobExecutorType;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.ComputeIgniteFactory;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeJobDataType;
import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
import org.apache.ignite.internal.compute.ExecutionOptions;
import org.apache.ignite.internal.compute.JobExecutionContextImpl;
import org.apache.ignite.internal.compute.SharedComputeUtils;
@@ -53,6 +57,7 @@ import
org.apache.ignite.internal.compute.task.TaskExecutionInternal;
import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyService;
@@ -68,6 +73,8 @@ public class ComputeExecutorImpl implements ComputeExecutor {
private final Ignite ignite;
+ private final ComputeIgniteFactory igniteFactory;
+
private final ComputeConfiguration configuration;
private final ComputeStateMachine stateMachine;
@@ -86,6 +93,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
* Constructor.
*
* @param ignite Ignite instance for public API access.
+ * @param igniteFactory Factory for creating per-job scoped Ignite
instances.
* @param stateMachine Compute jobs state machine.
* @param configuration Compute configuration.
* @param topologyService Topology service.
@@ -93,6 +101,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
*/
public ComputeExecutorImpl(
Ignite ignite,
+ ComputeIgniteFactory igniteFactory,
ComputeStateMachine stateMachine,
ComputeConfiguration configuration,
TopologyService topologyService,
@@ -100,6 +109,7 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
EventLog eventLog
) {
this.ignite = ignite;
+ this.igniteFactory = igniteFactory;
this.configuration = configuration;
this.stateMachine = stateMachine;
this.topologyService = topologyService;
@@ -121,8 +131,9 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
) {
assert executorService != null;
+ Ignite scopedIgnite = createIgniteForJob(arg);
AtomicBoolean isInterrupted = new AtomicBoolean();
- JobExecutionContext context = new JobExecutionContextImpl(ignite,
isInterrupted, classLoader, options.partition());
+ JobExecutionContext context = new
JobExecutionContextImpl(scopedIgnite, isInterrupted, classLoader,
options.partition());
metadataBuilder
.jobClassName(jobClassName)
@@ -143,6 +154,15 @@ public class ComputeExecutorImpl implements
ComputeExecutor {
return new JobExecutionInternal<>(execution, isInterrupted, null,
false, topologyService.localMember());
}
+ /**
+ * Extracts observable timestamp from the client payload, creates a
per-job tracker and a wrapper for the Ignite instance.
+ */
+ private Ignite createIgniteForJob(@Nullable ComputeJobDataHolder arg) {
+ long obsTs = ExecutionContext.observableTimestamp(arg);
+ HybridTimestampTracker jobTracker =
atomicTracker(nullableHybridTimestamp(obsTs));
+ return igniteFactory.createForJob(jobTracker);
+ }
+
private static Callable<CompletableFuture<ComputeJobDataHolder>>
addObservableTimestamp(
Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable,
ClockService clockService) {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 27d6f640250..bb07bfc6875 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -94,7 +94,6 @@ import
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -172,7 +171,14 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
InMemoryComputeStateMachine stateMachine = new
InMemoryComputeStateMachine(computeConfiguration, INSTANCE_NAME);
ComputeExecutor computeExecutor = new ComputeExecutorImpl(
- ignite, stateMachine, computeConfiguration, topologyService,
new TestClockService(new HybridClockImpl()), EventLog.NOOP);
+ ignite,
+ tracker -> ignite,
+ stateMachine,
+ computeConfiguration,
+ topologyService,
+ new TestClockService(new HybridClockImpl()),
+ EventLog.NOOP
+ );
computeComponent = new ComputeComponentImpl(
INSTANCE_NAME,
@@ -182,8 +188,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
unitsContextManager,
computeExecutor,
computeConfiguration,
- EventLog.NOOP,
- HybridTimestampTracker.emptyTracker()
+ EventLog.NOOP
);
assertThat(computeComponent.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index ab113c518c1..4792737a2c2 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -85,7 +85,14 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
void setUp() {
InMemoryComputeStateMachine stateMachine = new
InMemoryComputeStateMachine(computeConfiguration, "testNode");
computeExecutor = new ComputeExecutorImpl(
- ignite, stateMachine, computeConfiguration, topologyService,
new TestClockService(new HybridClockImpl()), EventLog.NOOP);
+ ignite,
+ tracker -> ignite,
+ stateMachine,
+ computeConfiguration,
+ topologyService,
+ new TestClockService(new HybridClockImpl()),
+ EventLog.NOOP
+ );
computeExecutor.start();
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a3ccf925804..56fd3527233 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -331,6 +331,8 @@ public class IgniteImpl implements Ignite {
private final Path workDir;
+ private final Executor asyncContinuationExecutor;
+
/** Lifecycle manager. */
private final LifecycleManager lifecycleManager;
@@ -553,6 +555,7 @@ public class IgniteImpl implements Ignite {
) {
this.name = node.name();
this.workDir = workDir;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
longJvmPauseDetector = new LongJvmPauseDetector(name);
@@ -1262,6 +1265,7 @@ public class IgniteImpl implements Ignite {
InMemoryComputeStateMachine stateMachine = new
InMemoryComputeStateMachine(computeCfg, name);
ComputeExecutorImpl computeExecutor = new ComputeExecutorImpl(
this,
+ this::createJobScopedIgnite,
stateMachine,
computeCfg,
clusterSvc.topologyService(),
@@ -1293,8 +1297,7 @@ public class IgniteImpl implements Ignite {
),
computeExecutor,
computeCfg,
- eventLog,
- observableTimestampTracker
+ eventLog
);
systemViewManager.register(computeComponent);
@@ -1362,6 +1365,10 @@ public class IgniteImpl implements Ignite {
publicCluster = new PublicApiThreadingIgniteCluster(new
IgniteClusterImpl(clusterSvc.topologyService(), clusterIdService));
}
+ private JobScopedIgnite createJobScopedIgnite(HybridTimestampTracker
tracker) {
+ return new JobScopedIgnite(this, tracker, txManager, sql,
asyncContinuationExecutor);
+ }
+
private GroupStoragesContextResolver createGroupStoragesContextResolver() {
Map<String, LogStorageManager> logStorageManagerByGroupName = Map.of(
PARTITION_GROUP_NAME, partitionsLogStorageManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
new file mode 100644
index 00000000000..74c6c85be72
--- /dev/null
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import java.util.concurrent.Executor;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.catalog.IgniteCatalog;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
+import org.apache.ignite.internal.sql.api.JobScopedIgniteSql;
+import org.apache.ignite.internal.sql.api.PublicApiThreadingIgniteSql;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.network.IgniteCluster;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.IgniteTables;
+import org.apache.ignite.tx.IgniteTransactions;
+
+/**
+ * A lightweight wrapper around {@link IgniteImpl} that scopes {@link
#transactions()} and {@link #sql()} to a per-job
+ * {@link HybridTimestampTracker}. This prevents one compute job from
polluting another job's observable timestamp.
+ */
+class JobScopedIgnite implements Ignite, Wrapper {
+ private final Ignite delegate;
+
+ private final IgniteTransactions scopedTransactions;
+
+ private final IgniteSql scopedSql;
+
+ JobScopedIgnite(
+ Ignite delegate,
+ HybridTimestampTracker jobTracker,
+ TxManager txManager,
+ IgniteSqlImpl sql,
+ Executor asyncContinuationExecutor
+ ) {
+ this.delegate = delegate;
+ this.scopedTransactions = new PublicApiThreadingIgniteTransactions(
+ new IgniteTransactionsImpl(txManager, jobTracker),
asyncContinuationExecutor
+ );
+ this.scopedSql = new PublicApiThreadingIgniteSql(
+ new JobScopedIgniteSql(sql, jobTracker),
asyncContinuationExecutor
+ );
+ }
+
+ @Override
+ public String name() {
+ return delegate.name();
+ }
+
+ @Override
+ public IgniteTables tables() {
+ return delegate.tables();
+ }
+
+ @Override
+ public IgniteTransactions transactions() {
+ return scopedTransactions;
+ }
+
+ @Override
+ public IgniteSql sql() {
+ return scopedSql;
+ }
+
+ @Override
+ public IgniteCompute compute() {
+ return delegate.compute();
+ }
+
+ @Override
+ public IgniteCatalog catalog() {
+ return delegate.catalog();
+ }
+
+ @Override
+ public IgniteCluster cluster() {
+ return delegate.cluster();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return classToUnwrap.cast(delegate);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 958a2b301f3..0d6c1f12adf 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -259,7 +259,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
@Nullable Object... arguments) {
Objects.requireNonNull(statement);
- CompletableFuture<AsyncResultSet<T>> future =
executeAsync(transaction, mapper, statement, arguments);
+ CompletableFuture<AsyncResultSet<T>> future =
executeAsync(transaction, mapper, cancellationToken, statement, arguments);
return new SyncResultSetAdapter<>(sync(future));
}
@@ -307,7 +307,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
String query,
@Nullable Object... arguments
) {
- return executeAsyncInternal(transaction, cancellationToken,
createStatement(query), arguments);
+ return executeAsync(transaction, cancellationToken,
createStatement(query), arguments);
}
/** {@inheritDoc} */
@@ -318,7 +318,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
Statement statement,
@Nullable Object... arguments
) {
- return executeAsyncInternal(transaction, cancellationToken, statement,
arguments);
+ return executeAsyncInternal(observableTimestampTracker, transaction,
cancellationToken, statement, arguments);
}
/** {@inheritDoc} */
@@ -346,7 +346,18 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
throw new UnsupportedOperationException("Not implemented yet.");
}
- private CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
+ /**
+ * Executes a single SQL statement asynchronously using the given
observable timestamp tracker.
+ *
+ * @param tracker Observable timestamp tracker to use.
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @return Operation future.
+ */
+ protected CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
+ HybridTimestampTracker tracker,
@Nullable Transaction transaction,
@Nullable CancellationToken cancellationToken,
Statement statement,
@@ -369,7 +380,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
result = queryProcessor.queryAsync(
properties,
- observableTimestampTracker,
+ tracker,
(InternalTransaction) transaction,
cancellationToken,
statement.query(),
@@ -424,6 +435,27 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
@Nullable CancellationToken cancellationToken,
Statement statement,
BatchedArguments batch
+ ) {
+ return executeBatchAsyncInternal(observableTimestampTracker,
transaction, cancellationToken, statement, batch);
+ }
+
+ /**
+ * Executes a batched SQL statement asynchronously using the given
observable timestamp tracker.
+ *
+ * @param tracker Observable timestamp tracker to use.
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param statement SQL statement to execute.
+ * @param batch List of batch rows, where each row is a list of statement
arguments.
+ * @return Operation Future completed with the number of rows affected by
each query in the batch
+ * (if the batch succeeds), future completed with the {@link
SqlBatchException} (if the batch fails).
+ */
+ protected CompletableFuture<long[]> executeBatchAsyncInternal(
+ HybridTimestampTracker tracker,
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ BatchedArguments batch
) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(nodeIsStoppingException());
@@ -434,7 +466,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
return executeBatchCore(
queryProcessor,
- observableTimestampTracker,
+ tracker,
(InternalTransaction) transaction,
cancellationToken,
statement.query(),
@@ -578,6 +610,24 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
public CompletableFuture<Void> executeScriptAsync(
@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments
+ ) {
+ return executeScriptAsyncInternal(observableTimestampTracker,
cancellationToken, query, arguments);
+ }
+
+ /**
+ * Executes a multi-statement SQL query using the given observable
timestamp tracker.
+ *
+ * @param tracker Observable timestamp tracker to use.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the template (optional).
+ * @return Operation future.
+ */
+ protected CompletableFuture<Void> executeScriptAsyncInternal(
+ HybridTimestampTracker tracker,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(nodeIsStoppingException());
@@ -586,7 +636,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent, Wrapper {
try {
return executeScriptCore(
queryProcessor,
- observableTimestampTracker,
+ tracker,
busyLock::enterBusy,
busyLock::leaveBusy,
query,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
new file mode 100644
index 00000000000..a8e8421118a
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.apache.ignite.internal.util.IgniteUtils.getInterruptibly;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A scoped view of {@link IgniteSqlImpl} that uses a per-job {@link
HybridTimestampTracker} instead of the node's global one.
+ * Delegates all operations to the original {@link IgniteSqlImpl} instance,
sharing its lifecycle, busy lock, and cursor tracking.
+ */
+public class JobScopedIgniteSql implements IgniteSql, Wrapper {
+ private final IgniteSqlImpl delegate;
+
+ private final HybridTimestampTracker jobTracker;
+
+ public JobScopedIgniteSql(IgniteSqlImpl delegate, HybridTimestampTracker
jobTracker) {
+ this.delegate = delegate;
+ this.jobTracker = jobTracker;
+ }
+
+ @Override
+ public Statement createStatement(String query) {
+ return delegate.createStatement(query);
+ }
+
+ @Override
+ public StatementBuilder statementBuilder() {
+ return delegate.statementBuilder();
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ Objects.requireNonNull(query);
+
+ return new SyncResultSetAdapter<>(sync(executeAsync(transaction,
cancellationToken, query, arguments)));
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ Objects.requireNonNull(statement);
+
+ return new SyncResultSetAdapter<>(sync(executeAsync(transaction,
cancellationToken, statement, arguments)));
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ Objects.requireNonNull(query);
+
+ return new SyncResultSetAdapter<>(sync(executeAsync(transaction,
mapper, cancellationToken, query, arguments)));
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ Objects.requireNonNull(statement);
+
+ return new SyncResultSetAdapter<>(sync(executeAsync(transaction,
mapper, cancellationToken, statement, arguments)));
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return executeAsync(transaction, cancellationToken,
createStatement(query), arguments);
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return delegate.executeAsyncInternal(jobTracker, transaction,
cancellationToken, statement, arguments);
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ // TODO: IGNITE-18695.
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ // TODO: IGNITE-18695.
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ @Override
+ public long[] executeBatch(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String dmlQuery,
+ BatchedArguments batch
+ ) {
+ return sync(executeBatchAsync(transaction, cancellationToken,
dmlQuery, batch));
+ }
+
+ @Override
+ public long[] executeBatch(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement dmlStatement,
+ BatchedArguments batch
+ ) {
+ return sync(executeBatchAsync(transaction, cancellationToken,
dmlStatement, batch));
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ BatchedArguments batch
+ ) {
+ return executeBatchAsync(transaction, cancellationToken,
createStatement(query), batch);
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ BatchedArguments batch
+ ) {
+ return delegate.executeBatchAsyncInternal(jobTracker, transaction,
cancellationToken, statement, batch);
+ }
+
+ @Override
+ public void executeScript(String query, @Nullable Object... arguments) {
+ executeScript(null, query, arguments);
+ }
+
+ @Override
+ public void executeScript(@Nullable CancellationToken cancellationToken,
String query, @Nullable Object... arguments) {
+ Objects.requireNonNull(query);
+
+ sync(executeScriptAsync(cancellationToken, query, arguments));
+ }
+
+ @Override
+ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
+ return executeScriptAsync(null, query, arguments);
+ }
+
+ @Override
+ public CompletableFuture<Void> executeScriptAsync(
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return delegate.executeScriptAsyncInternal(jobTracker,
cancellationToken, query, arguments);
+ }
+
+ private static <T> T sync(CompletableFuture<T> future) {
+ return getInterruptibly(future);
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return delegate.unwrap(classToUnwrap);
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index 93b061c6569..14ca977acdc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -80,4 +80,9 @@ public class IgniteTransactionsImpl implements
IgniteTransactions {
public Transaction beginWithPriority(boolean readOnly, TxPriority
priority) {
return txManager.beginExplicit(observableTimestampTracker, readOnly,
InternalTxOptions.defaultsWithPriority(priority));
}
+
+ @TestOnly
+ public HybridTimestampTracker observableTimestampTracker() {
+ return observableTimestampTracker;
+ }
}