This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 420d3baa4b0 IGNITE-26060 FailureHandler isn't called on TX coordinator 
node in certain scenarios (#12825)
420d3baa4b0 is described below

commit 420d3baa4b00d81042e1772a082d39228f971449
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Wed Mar 11 09:23:32 2026 +0300

    IGNITE-26060 FailureHandler isn't called on TX coordinator node in certain 
scenarios (#12825)
---
 .../query/calcite/exec/exp/IgniteRexBuilder.java   |   2 +-
 .../query/calcite/exec/exp/IgniteSqlFunctions.java |   2 +-
 .../calcite/prepare/IgniteConvertletTable.java     |   2 +-
 .../calcite/prepare/IgniteSqlToRelConvertor.java   |   2 +-
 .../query/calcite/prepare/MappingQueryContext.java |   2 +-
 .../query/calcite/prepare/PlanExtractor.java       |   2 +-
 .../query/calcite/sql/IgniteSqlCreateTable.java    |   2 +-
 .../sql/stat/IgniteSqlStatisticsAnalyze.java       |   2 +-
 .../calcite/sql/stat/IgniteSqlStatisticsTable.java |   2 +-
 .../tx/TxWithExceptionalInterceptorTest.java       | 496 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../cache/IgniteCacheOffheapManagerImpl.java       |   9 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   8 +
 .../cache/transactions/IgniteTxStateImpl.java      |  14 +
 .../store/CacheStoreWithIgniteTxFailureTest.java   |  36 +-
 15 files changed, 563 insertions(+), 20 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteRexBuilder.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteRexBuilder.java
index fbf265f4f22..d4e2debbcad 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteRexBuilder.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteRexBuilder.java
@@ -27,7 +27,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /** */
 public class IgniteRexBuilder extends RexBuilder {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteSqlFunctions.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteSqlFunctions.java
index 63b27f3a8ac..cfca232e4c4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteSqlFunctions.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/IgniteSqlFunctions.java
@@ -35,7 +35,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.processors.query.calcite.util.IgniteMath.NUMERIC_ROUNDING_MODE;
 import static 
org.apache.ignite.internal.processors.query.calcite.util.IgniteMath.convertToBigDecimal;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteConvertletTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteConvertletTable.java
index b6e27621b54..77cd8ad0a9f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteConvertletTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteConvertletTable.java
@@ -38,7 +38,7 @@ import org.apache.calcite.sql2rel.SqlRexConvertlet;
 import org.apache.calcite.sql2rel.StandardConvertletTable;
 import 
org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteOwnSqlOperatorTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite convertlet table.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
index 86620510834..fa8b31d7eee 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlToRelConvertor.java
@@ -52,7 +52,7 @@ import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
index 2ab7c6c0fcc..bb66cf83eaf 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MappingQueryContext.java
@@ -29,7 +29,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQ
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Query mapping context.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
index e350895ac34..8119a383dbb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
@@ -34,7 +34,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.bounds.SearchBounds;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.util.typedef.F;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Sensitive data aware plan extractor.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
index f43e7ef7f88..48f7f4de60a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Parse tree for {@code CREATE TABLE} statement with Ignite specific features.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsAnalyze.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsAnalyze.java
index e9e79246394..5a57c8d4b2f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsAnalyze.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsAnalyze.java
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.util.ImmutableNullableList;
 import org.apache.ignite.internal.util.typedef.F;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /** */
 public class IgniteSqlStatisticsAnalyze extends IgniteSqlStatisticsCommand {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsTable.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsTable.java
index d81f959d309..59490751a74 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsTable.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/stat/IgniteSqlStatisticsTable.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.ignite.internal.util.typedef.F;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.Nullable;
 
 /** */
 public class IgniteSqlStatisticsTable extends SqlCall {
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxWithExceptionalInterceptorTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxWithExceptionalInterceptorTest.java
new file mode 100644
index 00000000000..b3aa281a82b
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxWithExceptionalInterceptorTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.tx;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.indexing.IndexingQueryEngineConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** Check per node data consistency after exceptionally interceptor method 
call. */
+@RunWith(Parameterized.class)
+public class TxWithExceptionalInterceptorTest extends GridCommonAbstractTest {
+    /** Node role. */
+    @Parameterized.Parameter(0)
+    public TxCoordNodeRole txCoord;
+
+    /** Persistence flag. */
+    @Parameterized.Parameter(1)
+    public boolean persistence;
+
+    /** Write through flag. */
+    @Parameterized.Parameter(2)
+    public boolean writeThrough;
+
+    /** */
+    private static final String INDEXING_ENGINE = 
IndexingQueryEngineConfiguration.ENGINE_NAME;
+
+    /** */
+    private static final String CALCITE_ENGINE = 
CalciteQueryEngineConfiguration.ENGINE_NAME;
+
+    /** Num of exception raised from interceptor. */
+    private static final AtomicInteger exceptionRaised = new AtomicInteger();
+
+    /** Cache with or without writeThrough configured. */
+    private static final String PROC_CACHE_NAME = "PROC_CACHE";
+
+    /** Tx involved cache. */
+    private static final String COMMON_CACHE_NAME = "COMMON_CACHE";
+
+    /** Default client name. */
+    private static final String CLIENT_NAME = "client";
+
+    /** Write through emulation store strategy. */
+    private static final MapCacheStoreStrategy strategy = new 
MapCacheStoreStrategy();
+
+    /** */
+    @Parameterized.Parameters(name = "txCoordRole={0}, persistence={1}, 
writeThrough={2}")
+    public static Collection<?> parameters() {
+        return GridTestUtils.cartesianProduct(
+            List.of(TxCoordNodeRole.PRIMARY, TxCoordNodeRole.BACKUP, 
TxCoordNodeRole.COORDINATOR_NO_DATA, TxCoordNodeRole.THICK_CLIENT),
+            List.of(true, false),
+            List.of(true, false)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        if (persistence)
+            cleanPersistenceDir();
+
+        strategy.resetStore();
+
+        exceptionRaised.set(0);
+    }
+
+    /** */
+    private static class FilterDefinedNode implements 
IgnitePredicate<ClusterNode> {
+        /** */
+        Object predicate;
+
+        /** */
+        private FilterDefinedNode(Object predicate) {
+            this.predicate = predicate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            assert predicate != null;
+
+            return !node.consistentId().equals(predicate);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        cfg.getSqlConfiguration().setQueryEnginesConfiguration(
+            new IndexingQueryEngineConfiguration(), new 
CalciteQueryEngineConfiguration().setDefault(true));
+
+        if (persistence) {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                    ));
+        }
+
+        CacheConfiguration<Integer, Integer> ccfg1 = new 
CacheConfiguration<>(PROC_CACHE_NAME);
+        ccfg1.setAtomicityMode(TRANSACTIONAL);
+        ccfg1.setCacheMode(REPLICATED);
+
+        ccfg1.setInterceptor(new CustomInterceptor(writeThrough, txCoord, 
getTestIgniteInstanceName(0)));
+        ccfg1.setQueryEntities(List.of(queryEntity(PROC_CACHE_NAME)));
+
+        // All server nodes besides zero-indexed
+        if (txCoord == TxCoordNodeRole.COORDINATOR_NO_DATA)
+            ccfg1.setNodeFilter(new 
FilterDefinedNode(getTestIgniteInstanceName(0)));
+
+        if (writeThrough) {
+            Factory<? extends CacheStore<Object, Object>> storeFactory = 
strategy.getStoreFactory();
+            ccfg1.setReadThrough(true);
+            ccfg1.setWriteThrough(true);
+            ccfg1.setCacheStoreFactory(storeFactory);
+        }
+
+        CacheConfiguration<Integer, Integer> ccfg2 = new 
CacheConfiguration<>(COMMON_CACHE_NAME);
+        ccfg2.setAtomicityMode(TRANSACTIONAL);
+        ccfg2.setCacheMode(REPLICATED);
+        ccfg2.setQueryEntities(List.of(queryEntity(COMMON_CACHE_NAME)));
+
+        cfg.setCacheConfiguration(ccfg1, ccfg2);
+
+        return cfg;
+    }
+
+    /** */
+    private static class CustomInterceptor implements 
CacheInterceptor<Integer, Integer> {
+        /** Initial value holder. */
+        private Object initialVal;
+
+        /** First call trigger, with enabled writeThrough interceptor is 
called twice. */
+        private final AtomicBoolean called = new AtomicBoolean();
+
+        /** writeThrough flag */
+        private final boolean writeThrough;
+
+        /** Tx coordinator node role. */
+        private final TxCoordNodeRole txCoord;
+
+        /** Tx initiator node name. */
+        private final String actorNode;
+
+        /** */
+        private CustomInterceptor(
+            boolean writeThrough,
+            TxCoordNodeRole txCoord,
+            String actorNode
+        ) {
+            this.writeThrough = writeThrough;
+            this.txCoord = txCoord;
+            this.actorNode = actorNode;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Integer onGet(Integer key, @Nullable 
Integer val) {
+            return val;
+        }
+
+        /** Raised unchecked exception according to tx initiator node. */
+        @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer, 
Integer> entry, Integer newVal) {
+            if (initialVal != null && initialVal != newVal) {
+                if (!writeThrough || called.get()) {
+                    assertTrue(entry instanceof CacheLazyEntry);
+
+                    GridCacheContext ctx = GridTestUtils.getFieldValue(entry, 
CacheLazyEntry.class, "cctx");
+
+                    if (txCoord == TxCoordNodeRole.COORDINATOR_NO_DATA) {
+                        if (ctx.localNode().consistentId().equals(actorNode)) {
+                            // Unexpected interceptor call on node without data
+                            fail("Need to be called only on primary or backup 
nodes.");
+                        }
+                    }
+                    else {
+                        if (ctx.localNode().consistentId().equals(actorNode))
+                            raiseException();
+                    }
+                }
+
+                called.set(true);
+            }
+
+            if (initialVal == null)
+                initialVal = newVal;
+
+            return newVal * 100;
+        }
+
+        /** */
+        private void raiseException() {
+            exceptionRaised.incrementAndGet();
+
+            throw new RuntimeException("Interceptor unchecked exception");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable IgniteBiTuple<Boolean, Integer> 
onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAfterRemove(Cache.Entry<Integer, Integer> 
entry) {
+            // No-op.
+        }
+    }
+
+    /** */
+    private QueryEntity queryEntity(String cacheName) {
+        var entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Integer.class.getName());
+        entity.addQueryField("ID", Integer.class.getName(), null);
+        entity.addQueryField("VAL", Integer.class.getName(), null);
+        entity.setKeyFieldName("ID");
+        entity.setValueFieldName("VAL");
+        entity.setTableName(cacheName);
+
+        return entity;
+    }
+
+    /** */
+    @Test
+    public void testTxWithExceptionInterceptor() throws Exception {
+        Ignite ignite0 = startGrid(0);
+        startGrid(1);
+        startGrid(2);
+
+        Ignite client = startClientGrid(CLIENT_NAME);
+
+        Ignite txNode = txCoord == TxCoordNodeRole.THICK_CLIENT ? client : 
ignite0;
+
+        ignite0.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, Integer> processedCache = 
txNode.cache(PROC_CACHE_NAME);
+        IgniteCache<Integer, Integer> commonCache = 
txNode.cache(COMMON_CACHE_NAME);
+
+        Integer primaryKey = primaryKeyCoordAware(PROC_CACHE_NAME);
+        Integer primaryKeyCommon = primaryKeyCoordAware(PROC_CACHE_NAME);
+
+        try (Transaction tx = txNode.transactions().txStart()) {
+            processedCache.put(primaryKey, 1);
+            commonCache.put(primaryKeyCommon, 10);
+            tx.commit();
+        }
+
+        //noinspection EmptyCatchBlock
+        try (Transaction tx = txNode.transactions().txStart()) {
+            processedCache.put(primaryKey, 2);
+            commonCache.put(primaryKeyCommon, 20);
+            tx.commit();
+        }
+        catch (Throwable th) {
+            // No op.
+        }
+
+        // 2 server nodes + 1 thick client
+        if ((txCoord == TxCoordNodeRole.BACKUP || txCoord == 
TxCoordNodeRole.PRIMARY) ||
+            !writeThrough && txCoord == TxCoordNodeRole.THICK_CLIENT)
+            waitForTopology(3);
+
+        checkExceptionRaised();
+
+        // external storage stored result
+        Object storeVal = null;
+
+        if (writeThrough)
+            storeVal = strategy.getFromStore(primaryKey);
+
+        // Processed cache kv result
+        Object kvVal = null;
+
+        List<Ignite> grids = new ArrayList<>(G.allGrids());
+
+        grids.sort(new Comparator<>() {
+            @Override public int compare(Ignite ignite, Ignite t1) {
+                return ignite.name().compareTo(t1.name());
+            }
+        });
+
+        // client first
+        assertTrue(grids.get(0).name().contains(CLIENT_NAME));
+
+        for (Ignite node : grids) {
+            if (txCoord == TxCoordNodeRole.PRIMARY) {
+                if (!writeThrough) {
+                    getSqlResultByKey(node, PROC_CACHE_NAME, primaryKey, true);
+                    getKVResultByKey(node, PROC_CACHE_NAME, primaryKey, true);
+                }
+
+                getSqlResultByKey(node, COMMON_CACHE_NAME, primaryKey, true);
+                getKVResultByKey(node, COMMON_CACHE_NAME, primaryKey, true);
+
+                continue;
+            }
+
+            // obtain sql results first, kv api can eventually recover 
results, thus for more clear test - let`s check sql first
+            Object sqlVal = getSqlResultByKey(node, PROC_CACHE_NAME, 
primaryKey, false);
+
+            if (kvVal == null)
+                kvVal = getKVResultByKey(grid(1), PROC_CACHE_NAME, primaryKey, 
false);
+
+            if (writeThrough) {
+                kvVal = getKVResultByKey(node, PROC_CACHE_NAME, primaryKey, 
false);
+                // TODO: IGNITE-28005 Interceptor is not called if coordinator 
is not a primary or backup node
+                if (txCoord == TxCoordNodeRole.BACKUP) {
+                    assertEquals("node: " + node.name() + ", storeVal=" + 
storeVal + ", cacheVal=" + kvVal,
+                        storeVal, kvVal);
+                    assertEquals("node: " + node.name() + ", storeVal=" + 
storeVal + ", sqlVal=" + sqlVal,
+                        storeVal, sqlVal);
+                }
+            }
+            else {
+                Object cacheVal = getKVResultByKey(node, PROC_CACHE_NAME, 
primaryKey, false);
+
+                assertEquals("node: " + node.name() + ", refVal=" + kvVal + ", 
cacheVal=" + cacheVal,
+                    kvVal, cacheVal);
+
+                assertEquals("node: " + node.name() + ", refVal=" + kvVal + ", 
sqlVal=" + sqlVal,
+                    kvVal, sqlVal);
+            }
+
+            Object commonSqlRes = getSqlResultByKey(node, COMMON_CACHE_NAME, 
primaryKey, false);
+            Object commonKvRes = getKVResultByKey(node, COMMON_CACHE_NAME, 
primaryKey, false);
+
+            assertEquals("node: " + node.name() + ", commonSqlRes=" + 
commonSqlRes + ", commonKvRes=" + commonKvRes,
+                commonSqlRes, commonKvRes);
+        }
+    }
+
+    /** */
+    private void checkExceptionRaised() {
+        if (txCoord == TxCoordNodeRole.PRIMARY || txCoord == 
TxCoordNodeRole.BACKUP)
+            assertEquals(1, exceptionRaised.get());
+        else if (txCoord == TxCoordNodeRole.THICK_CLIENT) {
+            if (writeThrough) {
+                // TODO: IGNITE-28005 Interceptor is not called if coordinator 
is not a primary or backup node
+                assertEquals(0, exceptionRaised.get());
+            }
+            else
+                assertEquals(1, exceptionRaised.get());
+        }
+        else if (txCoord == TxCoordNodeRole.COORDINATOR_NO_DATA) {
+            // TODO: IGNITE-28005 Interceptor is not called if coordinator is 
not a primary or backup node
+            assertEquals(0, exceptionRaised.get());
+        }
+    }
+
+    /** Get result through kv api. */
+    private Object getKVResultByKey(Ignite node, String cacheName, Integer 
key, boolean checkResIsEmpty) {
+        Object kvVal;
+
+        try (Transaction tx = node.transactions().txStart()) {
+            kvVal = node.cache(cacheName).get(key);
+            tx.commit();
+        }
+
+        if (checkResIsEmpty) {
+            assertNull(kvVal);
+
+            return null;
+        }
+
+        assertNotNull("Value is empty on: " + node.name(), kvVal);
+
+        return kvVal;
+    }
+
+    /** Get result through sql api. */
+    private Integer getSqlResultByKey(Ignite node, String cacheName, Integer 
key, boolean checkResIsEmpty) {
+        List<List<?>> resCalcite = node.cache(cacheName).query(
+            new SqlFieldsQuery("SELECT /*+ QUERY_ENGINE('" + CALCITE_ENGINE + 
"') */ val FROM " +
+                cacheName + " WHERE id = ?").setArgs(key)).getAll();
+
+        List<List<?>> resIdx = node.cache(cacheName).query(
+            new SqlFieldsQuery("SELECT /*+ QUERY_ENGINE('" + INDEXING_ENGINE + 
"') */ val FROM " +
+                cacheName + " WHERE id = ?").setArgs(key)).getAll();
+
+        if (checkResIsEmpty) {
+            assertTrue("Expect empty result", resCalcite.isEmpty());
+            assertTrue("Expect empty result", resIdx.isEmpty());
+
+            return null;
+        }
+        else {
+            Object firstResCalcite = resCalcite.get(0).get(0);
+            Object firstResIdx = resIdx.get(0).get(0);
+
+            assertEquals(firstResCalcite, firstResIdx);
+
+            return (Integer)firstResIdx;
+        }
+    }
+
+    /** Calculate primary key according tx initiator node. */
+    protected Integer primaryKeyCoordAware(String cacheName) {
+        switch (txCoord) {
+            case PRIMARY:
+            case THICK_CLIENT: {
+                IgniteCache<Integer, Integer> cache = grid(0).cache(cacheName);
+                return primaryKey(cache);
+            }
+            case COORDINATOR_NO_DATA:
+            case BACKUP: {
+                IgniteCache<Integer, Integer> cache = grid(1).cache(cacheName);
+                return primaryKey(cache);
+            }
+            default:
+                throw new IllegalArgumentException(txCoord.name());
+        }
+    }
+
+    /**
+     * Type of tx initiator node
+     */
+    private enum TxCoordNodeRole {
+        /** Tx initiator the same as primary node for given key. */
+        PRIMARY,
+
+        /** Tx initiator differ from primary node for given key. */
+        BACKUP,
+
+        /** Tx initiator with filtered data.
+         *
+         * @see CacheConfiguration#setNodeFilter
+         */
+        COORDINATOR_NO_DATA,
+
+        /** Thick client as tx initiator. */
+        THICK_CLIENT
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index dfaca9b7827..9326c647854 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -91,6 +91,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rules.JoinOrderOptimi
 import 
org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
 import 
org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
 import 
org.apache.ignite.internal.processors.query.calcite.thin.MultiLineQueryTest;
+import 
org.apache.ignite.internal.processors.tx.TxWithExceptionalInterceptorTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -173,6 +174,7 @@ import org.junit.runners.Suite;
     QueryEntityValueColumnAliasTest.class,
     CacheStoreTest.class,
     MultiDcQueryMappingTest.class,
+    TxWithExceptionalInterceptorTest.class
 })
 public class IntegrationTestSuite {
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 47671d01a46..3d41f3ad095 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -392,7 +392,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridDhtLocalPartition part,
         OffheapInvokeClosure c)
         throws IgniteCheckedException {
-        dataStore(part).invoke(cctx, key, c);
+        ctx.database().checkpointReadLock();
+
+        try {
+            dataStore(part).invoke(cctx, key, c);
+        }
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
+        }
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 3e15df5f20a..3c79a2f9083 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -37,6 +37,8 @@ import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.ReadRepairStrategy;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -76,6 +78,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxy
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.tracing.MTC;
+import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -91,6 +94,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -3211,6 +3215,10 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
                 U.warn(log, "Set transaction invalidation flag to true due to 
error [tx=" + this + ", err=" + err + ']');
             }
+
+            // Treat heuristic exception as critical.
+            if (X.hasCause(e, IgniteTxHeuristicCheckedException.class))
+                cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
         }
 
         if (err != null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 319a9c38166..3577c114add 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -74,6 +74,10 @@ public class IgniteTxStateImpl extends 
IgniteTxLocalStateAdapter {
     @GridToStringInclude
     private Boolean recovery;
 
+    /** Cached flag, {@code True} if {@link 
CacheStoreManager#isWriteThrough()} enabled for all caches involved. */
+    @GridToStringInclude
+    private Boolean storeWriteThrough;
+
     /** Async future. */
     @GridToStringExclude
     private final GridCacheAdapter.FutureHolder lastAsyncFut = new 
GridCacheAdapter.FutureHolder();
@@ -290,6 +294,16 @@ public class IgniteTxStateImpl extends 
IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public boolean storeWriteThrough(GridCacheSharedContext sctx) {
+        if (storeWriteThrough != null)
+            return storeWriteThrough;
+
+        storeWriteThrough = checkStoreWriteThrough(sctx);
+
+        return storeWriteThrough;
+    }
+
+    /** */
+    private boolean checkStoreWriteThrough(GridCacheSharedContext sctx) {
         if (!activeCacheIds.isEmpty()) {
             for (int i = 0; i < activeCacheIds.size(); i++) {
                 int cacheId = activeCacheIds.get(i);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
index 6f8495d8042..55e9b1c6d07 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
@@ -91,7 +91,7 @@ public class CacheStoreWithIgniteTxFailureTest extends 
GridCacheAbstractSelfTest
 
     /** */
     @Parameterized.Parameter(2)
-    public boolean withFaulireHnd;
+    public boolean withFailureHnd;
 
     /** */
     @Parameterized.Parameters(name = "faultyNodeType={0}, faultyNodeRole={1}, 
withFaulireHandler={2}")
@@ -140,7 +140,7 @@ public class CacheStoreWithIgniteTxFailureTest extends 
GridCacheAbstractSelfTest
 
     /** {@inheritDoc} */
     @Override protected FailureHandler getFailureHandler(String 
igniteInstanceName) {
-        return withFaulireHnd ? new StopNodeFailureHandler() : 
super.getFailureHandler(igniteInstanceName);
+        return withFailureHnd ? new StopNodeFailureHandler() : 
super.getFailureHandler(igniteInstanceName);
     }
 
     /** {@inheritDoc} */
@@ -182,17 +182,16 @@ public class CacheStoreWithIgniteTxFailureTest extends 
GridCacheAbstractSelfTest
         else
             updateKeysInTx(txCoordinator, keysOnFaultyNode);
 
-        if (withFaulireHnd) {
-            // FH doesn't fail TX coordinator node now, this behavior is wrong 
and should be fixed here:
-            // TODO https://issues.apache.org/jira/browse/IGNITE-26060
+        if (withFailureHnd) {
             if (faultyNodeRole != FaultyNodeRole.TX_COORDINATOR) {
                 waitForTopology(3);
 
-                assertTrue("Client node should survive test scenario",
-                    G.allGrids()
-                        .stream()
-                        .filter(ignite -> 
((IgniteEx)ignite).context().clientNode())
-                        .count() == 1);
+                checkClientIsAvailable();
+            }
+            else {
+                waitForTopology(2);
+
+                checkFаultyNodeLeftTopology(FAULTY_NODE_IDX);
             }
         }
         else
@@ -201,6 +200,23 @@ public class CacheStoreWithIgniteTxFailureTest extends 
GridCacheAbstractSelfTest
         checkKeysOnHealthyNodes(keysOnFaultyNode);
     }
 
+    /** Check client node availability. */
+    private void checkClientIsAvailable() {
+        assertTrue("Client node should survive test scenario",
+            G.allGrids()
+                .stream()
+                .filter(ignite -> ((IgniteEx)ignite).context().clientNode())
+                .count() == 1);
+    }
+
+    /** Check that faulty node is absent in current topology. */
+    private void checkFаultyNodeLeftTopology(int faultyNodeIdx) {
+        assertTrue("Faulty node should not survive test scenario, idx=" + 
faultyNodeIdx,
+            G.allGrids()
+                .stream()
+                .noneMatch(ignite -> 
ignite.name().equals(getTestIgniteInstanceName(faultyNodeIdx))));
+    }
+
     /** */
     private void fillCache(IgniteCache<Integer, Integer> cache, int numOfKeys) 
{
         for (int i = 0; i < numOfKeys; i++)


Reply via email to