This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new dad18c29490 IGNITE-26452 Skip read-through in DML operations (#12446)
dad18c29490 is described below
commit dad18c29490d12aecf530782b25201214f83c46c
Author: Nikita Amelchev <[email protected]>
AuthorDate: Sat Oct 25 09:35:41 2025 +0300
IGNITE-26452 Skip read-through in DML operations (#12446)
---
.../query/calcite/exec/rel/ModifyNode.java | 3 +-
.../AbstractMultiEngineIntegrationTest.java | 3 +-
.../query/calcite/integration/CacheStoreTest.java | 202 +++++++++++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
.../ignite/configuration/CacheConfiguration.java | 10 +-
.../processors/cache/CacheOperationContext.java | 39 ++++
.../processors/cache/GridCacheAdapter.java | 26 +++
.../processors/cache/GridCacheContext.java | 12 +-
.../processors/cache/GridCacheProxyImpl.java | 30 +++
.../processors/cache/IgniteInternalCache.java | 3 +
.../distributed/GridDistributedLockRequest.java | 21 +++
.../cache/distributed/dht/GridDhtLockFuture.java | 9 +-
.../cache/distributed/dht/GridDhtLockRequest.java | 2 +
.../dht/GridDhtTransactionalCacheAdapter.java | 7 +
.../distributed/dht/GridDhtTxLocalAdapter.java | 6 +
.../distributed/dht/GridDhtTxPrepareFuture.java | 2 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 3 +
.../distributed/dht/atomic/GridDhtAtomicCache.java | 9 +-
.../atomic/GridNearAtomicAbstractUpdateFuture.java | 5 +
.../GridNearAtomicAbstractUpdateRequest.java | 41 ++++-
.../atomic/GridNearAtomicFullUpdateRequest.java | 2 +-
.../GridNearAtomicSingleUpdateFilterRequest.java | 2 +-
.../atomic/GridNearAtomicSingleUpdateFuture.java | 8 +-
.../GridNearAtomicSingleUpdateInvokeRequest.java | 2 +-
.../atomic/GridNearAtomicSingleUpdateRequest.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 12 +-
.../dht/colocated/GridDhtColocatedCache.java | 10 +
.../dht/colocated/GridDhtColocatedLockFuture.java | 7 +
.../distributed/near/GridNearAtomicCache.java | 1 +
.../distributed/near/GridNearCacheAdapter.java | 4 +-
.../cache/distributed/near/GridNearCacheEntry.java | 1 +
.../cache/distributed/near/GridNearLockFuture.java | 6 +
.../distributed/near/GridNearLockRequest.java | 2 +
.../near/GridNearTransactionalCache.java | 4 +
.../cache/distributed/near/GridNearTxLocal.java | 25 +++
.../cache/distributed/near/GridNearTxRemote.java | 3 +
.../cache/transactions/IgniteTxEntry.java | 25 +++
.../cache/transactions/IgniteTxHandler.java | 1 +
.../cache/transactions/IgniteTxLocalAdapter.java | 3 +
.../datastructures/GridCacheQueueAdapter.java | 1 +
.../internal/processors/query/QueryUtils.java | 9 +
.../processors/query/h2/dml/DmlBatchSender.java | 3 +-
.../internal/processors/query/h2/dml/DmlUtils.java | 9 +-
43 files changed, 540 insertions(+), 37 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index 5a9f3dac4ef..b84f5fe0b2e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import static
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.CONCURRENT_UPDATE;
import static
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
+import static
org.apache.ignite.internal.processors.query.QueryUtils.cacheForDML;
/**
*
@@ -218,7 +219,7 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
GridCacheProxyImpl<Object, Object> cache
) throws IgniteCheckedException {
Map<Object, EntryProcessor<Object, Object, Long>> map =
invokeMap(tuples);
- Map<Object, EntryProcessorResult<Long>> res = cache.invokeAll(map);
+ Map<Object, EntryProcessorResult<Long>> res =
cacheForDML(cache).invokeAll(map);
long updated =
res.values().stream().mapToLong(EntryProcessorResult::get).sum();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractMultiEngineIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractMultiEngineIntegrationTest.java
index e2dd32874fe..119bebfce4a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractMultiEngineIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractMultiEngineIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
@@ -36,7 +37,7 @@ public class AbstractMultiEngineIntegrationTest extends
AbstractBasicIntegration
/** */
@Parameterized.Parameters(name = "Query engine={0}")
- public static Iterable<Object> params() {
+ public static Collection<?> params() {
return Arrays.asList(CalciteQueryEngineConfiguration.ENGINE_NAME,
IndexingQueryEngineConfiguration.ENGINE_NAME);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheStoreTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheStoreTest.java
new file mode 100644
index 00000000000..948a34f9f35
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheStoreTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.cache.Cache;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.indexing.IndexingQueryEngineConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static java.util.Arrays.asList;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+
+/** */
+public class CacheStoreTest extends AbstractMultiEngineIntegrationTest {
+ /** */
+ @Parameterized.Parameter(1)
+ public CacheAtomicityMode atomicityMode;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameterized.Parameter(3)
+ public int backups;
+
+ /** */
+ @Parameterized.Parameter(4)
+ public boolean loadPreviousValue;
+
+ /** */
+ private static final List<Object> writeThroughEntries = new
CopyOnWriteArrayList<>();
+
+ /** */
+ private static final List<Object> readThroughEntries = new
CopyOnWriteArrayList<>();
+
+ /** */
+ @Parameterized.Parameters(name = "Engine={0}, atomicityMode={1},
cacheMode={2}, backups={3}, loadPreviousValue={4}")
+ public static Collection<?> params() {
+ return cartesianProduct(
+ asList(CalciteQueryEngineConfiguration.ENGINE_NAME,
IndexingQueryEngineConfiguration.ENGINE_NAME),
+ asList(CacheAtomicityMode.ATOMIC,
CacheAtomicityMode.TRANSACTIONAL),
+ asList(CacheMode.PARTITIONED, CacheMode.REPLICATED),
+ asList(0, 1),
+ asList(false, true)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setBackups(backups)
+ .setAtomicityMode(atomicityMode)
+ .setCacheMode(cacheMode)
+ .setCacheStoreFactory(FactoryBuilder.factoryOf(new
TestCacheStore<>(loadPreviousValue)))
+ .setReadThrough(true)
+ .setWriteThrough(true)
+ .setLoadPreviousValue(loadPreviousValue)
+ .setSqlSchema(QueryUtils.DFLT_SCHEMA)
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class,
String.class)
+ .setTableName("tbl")
+ .setKeyFieldName("id")
+ .setValueFieldName("val")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("val", String.class.getName(), null)
+ )));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ writeThroughEntries.clear();
+ readThroughEntries.clear();
+ }
+
+ /** */
+ @Test
+ public void testCacheStoreOnDML() throws Exception {
+ IgniteEx srv = startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ sql(srv, "INSERT INTO tbl(id, val) VALUES (?, ?)", 1, "val1");
+ checkWriteThrough(1);
+ checkReadThrough(1);
+
+ sql(srv, "INSERT INTO tbl(id, val) VALUES (?, ?),(?, ?)", 2, "val2",
3, "val3");
+ checkWriteThrough(2, 3);
+ checkReadThrough(2, 3);
+
+ if (CalciteQueryEngineConfiguration.ENGINE_NAME.equals(engine)) {
+ sql(srv, "MERGE INTO tbl dst USING (VALUES (?, ?),(?, ?)) AS
src(id, val) ON dst.id = src.id " +
+ "WHEN MATCHED THEN UPDATE SET dst.val = src.val " +
+ "WHEN NOT MATCHED THEN INSERT (id, val) VALUES (src.id,
src.val)", 3, "val3new", 4, "val4");
+ checkReadThrough(4);
+ }
+ else {
+ sql(srv, "MERGE INTO tbl(id, val) VALUES (?, ?),(?, ?)", 3,
"val3new", 4, "val4");
+ checkReadThrough();
+ }
+
+ checkWriteThrough(3, 4);
+
+ sql(srv, "UPDATE tbl SET val='newVal'");
+ checkWriteThrough(1, 2, 3, 4);
+ checkReadThrough();
+
+ sql(srv, "DELETE FROM tbl WHERE id=1");
+ checkWriteThrough(1);
+ checkReadThrough();
+
+ sql(srv, "INSERT INTO tbl(id, val) SELECT id+1000, val FROM tbl");
+ checkWriteThrough(1002, 1003, 1004);
+ checkReadThrough(1002, 1003, 1004);
+ }
+
+ /** */
+ private void checkWriteThrough(Object... keys) {
+ assertEqualsCollectionsIgnoringOrder(F.asList(keys),
writeThroughEntries);
+
+ writeThroughEntries.clear();
+ }
+
+ /** */
+ private void checkReadThrough(Object... keys) {
+ if (loadPreviousValue)
+ assertEqualsCollectionsIgnoringOrder(F.asList(keys),
readThroughEntries);
+ else
+ assertEquals(0, readThroughEntries.size());
+
+ readThroughEntries.clear();
+ }
+
+ /** Test cache store. */
+ private static class TestCacheStore<K, V> extends CacheStoreAdapter<K, V>
implements Serializable {
+ /** */
+ private final boolean shouldLoadPrevVal;
+
+ /** */
+ private TestCacheStore(boolean value) {
+ shouldLoadPrevVal = value;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V load(K key) {
+ if (!shouldLoadPrevVal)
+ throw new RuntimeException("CacheStore.load should not be
called");
+
+ readThroughEntries.add(key);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> e)
throws CacheWriterException {
+ assertNotNull(e.getValue());
+
+ writeThroughEntries.add(e.getKey());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object k) throws CacheWriterException {
+ writeThroughEntries.add(k);
+ }
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 7491933d644..ce775611114 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalc
import
org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.CalciteBasicSecondaryIndexIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.CalcitePlanningDumpTest;
@@ -171,6 +172,7 @@ import org.junit.runners.Suite;
CalcitePlanningDumpTest.class,
KeyClassChangeIntegrationTest.class,
QueryEntityValueColumnAliasTest.class,
+ CacheStoreTest.class,
})
public class IntegrationTestSuite {
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index a35b230741d..c0c5100248f 100644
---
a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -943,8 +943,8 @@ public class CacheConfiguration<K, V> extends
MutableConfiguration<K, V> {
}
/**
- * Gets flag indicating whether value should be loaded from store if it is
not in the cache
- * for following cache operations:
+ * Indicates whether a value should be loaded from storage if it's missing
in the cache during
+ * SQL Data Manipulation Language (DML) operations or specific cache
methods:
* <ul>
* <li>{@link IgniteCache#putIfAbsent(Object, Object)}</li>
* <li>{@link IgniteCache#replace(Object, Object)}</li>
@@ -955,6 +955,7 @@ public class CacheConfiguration<K, V> extends
MutableConfiguration<K, V> {
* <li>{@link IgniteCache#getAndReplace(Object, Object)}</li>
* <li>{@link IgniteCache#getAndPutIfAbsent(Object, Object)}</li>
*</ul>
+ * Default value is {@link #DFLT_LOAD_PREV_VAL}.
*
* @return Load previous value flag.
*/
@@ -963,8 +964,8 @@ public class CacheConfiguration<K, V> extends
MutableConfiguration<K, V> {
}
/**
- * Sets flag indicating whether value should be loaded from store if it is
not in the cache
- * for following cache operations:
+ * Indicates whether a value should be loaded from storage if it's missing
in the cache during
+ * SQL Data Manipulation Language (DML) operations or specific cache
methods:
* <ul>
* <li>{@link IgniteCache#putIfAbsent(Object, Object)}</li>
* <li>{@link IgniteCache#replace(Object, Object)}</li>
@@ -975,6 +976,7 @@ public class CacheConfiguration<K, V> extends
MutableConfiguration<K, V> {
* <li>{@link IgniteCache#getAndReplace(Object, Object)}</li>
* <li>{@link IgniteCache#getAndPutIfAbsent(Object, Object)}</li>
*</ul>
+ *
* When not set, default value is {@link #DFLT_LOAD_PREV_VAL}.
*
* @param loadPrevVal Load previous value flag.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index febda399d9f..e701ae5a93f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -37,6 +37,10 @@ public class CacheOperationContext implements Serializable {
@GridToStringInclude
private final boolean skipStore;
+ /** Skip store. */
+ @GridToStringInclude
+ private final boolean skipReadThrough;
+
/** No retries flag. */
@GridToStringInclude
private final boolean noRetries;
@@ -64,6 +68,7 @@ public class CacheOperationContext implements Serializable {
*/
public CacheOperationContext() {
skipStore = false;
+ skipReadThrough = false;
keepBinary = false;
expiryPlc = null;
noRetries = false;
@@ -75,6 +80,7 @@ public class CacheOperationContext implements Serializable {
/**
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param keepBinary Keep binary flag.
* @param expiryPlc Expiry policy.
* @param dataCenterId Data center id.
@@ -83,6 +89,7 @@ public class CacheOperationContext implements Serializable {
*/
public CacheOperationContext(
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
@Nullable ExpiryPolicy expiryPlc,
boolean noRetries,
@@ -92,6 +99,7 @@ public class CacheOperationContext implements Serializable {
@Nullable Map<String, String> appAttrs
) {
this.skipStore = skipStore;
+ this.skipReadThrough = skipReadThrough;
this.keepBinary = keepBinary;
this.expiryPlc = expiryPlc;
this.noRetries = noRetries;
@@ -123,6 +131,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext keepBinary() {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
true,
expiryPlc,
noRetries,
@@ -157,6 +166,30 @@ public class CacheOperationContext implements Serializable
{
public CacheOperationContext setSkipStore(boolean skipStore) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
+ keepBinary,
+ expiryPlc,
+ noRetries,
+ dataCenterId,
+ recovery,
+ readRepairStrategy,
+ appAttrs);
+ }
+
+ /** @return Skip read-through cache store. */
+ public boolean skipReadThrough() {
+ return skipReadThrough;
+ }
+
+ /**
+ * See {@link IgniteInternalCache#withSkipReadThrough()}.
+ *
+ * @return New instance of CacheOperationContext with skip store flag.
+ */
+ public CacheOperationContext withSkipReadThrough() {
+ return new CacheOperationContext(
+ skipStore,
+ true,
keepBinary,
expiryPlc,
noRetries,
@@ -182,6 +215,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext withExpiryPolicy(ExpiryPolicy plc) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
plc,
noRetries,
@@ -198,6 +232,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext setNoRetries(boolean noRetries) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
@@ -214,6 +249,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext setDataCenterId(byte dataCenterId) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
@@ -230,6 +266,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext setRecovery(boolean recovery) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
@@ -246,6 +283,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext setReadRepairStrategy(ReadRepairStrategy
readRepairStrategy) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
@@ -262,6 +300,7 @@ public class CacheOperationContext implements Serializable {
public CacheOperationContext setApplicationAttributes(Map<String, String>
appAttrs) {
return new CacheOperationContext(
skipStore,
+ skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6f2ebe7ade6..e0fd3c3ff42 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -471,6 +471,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
CacheOperationContext opCtx = new CacheOperationContext(
true,
false,
+ false,
null,
false,
null,
@@ -481,9 +482,32 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalCache<K, V> withSkipReadThrough() {
+ CacheOperationContext opCtx = this.ctx.operationContextPerCall();
+
+ if (opCtx == null) {
+ opCtx = new CacheOperationContext(
+ false,
+ true,
+ false,
+ null,
+ false,
+ null,
+ false,
+ null,
+ null);
+ }
+ else
+ opCtx = opCtx.withSkipReadThrough();
+
+ return new GridCacheProxyImpl<>(this.ctx, this, opCtx);
+ }
+
/** {@inheritDoc} */
@Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
CacheOperationContext opCtx = new CacheOperationContext(
+ false,
false,
true,
null,
@@ -506,6 +530,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
assert !CU.isUtilityCache(ctx.name());
CacheOperationContext opCtx = new CacheOperationContext(
+ false,
false,
false,
plc,
@@ -521,6 +546,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public final IgniteInternalCache<K, V> withNoRetries() {
CacheOperationContext opCtx = new CacheOperationContext(
+ false,
false,
false,
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index b900f5e1bf3..2cd06663ae0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1376,6 +1376,16 @@ public class GridCacheContext<K, V> implements
Externalizable {
return (opCtx != null && opCtx.skipStore());
}
+ /** @return {@code true} if the skip read-through cache store flag is set.
*/
+ public boolean skipReadThrough() {
+ if (nearContext())
+ return dht().near().context().skipReadThrough();
+
+ CacheOperationContext opCtx = opCtxPerCall.get();
+
+ return (opCtx != null && opCtx.skipReadThrough());
+ }
+
/**
* @return {@code True} if need check near cache context.
*/
@@ -1429,7 +1439,7 @@ public class GridCacheContext<K, V> implements
Externalizable {
* @return {@code True} if store read-through mode is enabled.
*/
public boolean readThrough() {
- return config().isReadThrough() && !skipStore();
+ return config().isReadThrough() && !skipStore() && !skipReadThrough();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index e23b3019e3a..bf22e0aa722 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -263,6 +263,33 @@ public class GridCacheProxyImpl<K, V> implements
IgniteInternalCache<K, V>, Exte
new CacheOperationContext(
skipStore,
false,
+ false,
+ null,
+ false,
+ null,
+ false,
+ null,
+ null));
+ }
+ finally {
+ gate.leave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalCache<K, V> withSkipReadThrough() {
+ CacheOperationContext prev = gate.enter(opCtx);
+
+ try {
+ if (opCtx != null && opCtx.skipReadThrough())
+ return this;
+
+ return new GridCacheProxyImpl<>(ctx, delegate,
+ opCtx != null ? opCtx.withSkipReadThrough() :
+ new CacheOperationContext(
+ false,
+ true,
+ false,
null,
false,
null,
@@ -284,6 +311,7 @@ public class GridCacheProxyImpl<K, V> implements
IgniteInternalCache<K, V>, Exte
(GridCacheAdapter<K1, V1>)delegate,
opCtx != null ? opCtx.keepBinary() :
new CacheOperationContext(false,
+ false,
true,
null,
false,
@@ -1533,6 +1561,7 @@ public class GridCacheProxyImpl<K, V> implements
IgniteInternalCache<K, V>, Exte
return new GridCacheProxyImpl<>(ctx, delegate,
opCtx != null ? opCtx.withExpiryPolicy(plc) :
new CacheOperationContext(
+ false,
false,
false,
plc,
@@ -1554,6 +1583,7 @@ public class GridCacheProxyImpl<K, V> implements
IgniteInternalCache<K, V>, Exte
try {
return new GridCacheProxyImpl<>(ctx, delegate,
new CacheOperationContext(
+ false,
false,
false,
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index b7fb897959d..f5b50d8f035 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -227,6 +227,9 @@ public interface IgniteInternalCache<K, V> extends
Iterable<Cache.Entry<K, V>> {
*/
public IgniteInternalCache<K, V> setSkipStore(boolean skipStore);
+ /** @return New internal cache instance based on this one, but with skip
read-through cache store flag enabled. */
+ public IgniteInternalCache<K, V> withSkipReadThrough();
+
/**
* Creates projection that will operate with binary objects.
* <p>
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index fe522e94bda..bc1d37828ec 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -47,6 +47,9 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
/** */
private static final int STORE_USED_FLAG_MASK = 0x04;
+ /** */
+ private static final int SKIP_READ_THROUGH_FLAG_MASK = 0x08;
+
/** Sender node ID. */
@Order(7)
private UUID nodeId;
@@ -142,6 +145,7 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
int keyCnt,
int txSize,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean addDepInfo
) {
@@ -166,6 +170,7 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
retVals = new boolean[keyCnt];
skipStore(skipStore);
+ skipReadThrough(skipReadThrough);
keepBinary(keepBinary);
}
@@ -305,6 +310,22 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
return (flags & SKIP_STORE_FLAG_MASK) == 1;
}
+ /**
+ * Sets skip store flag value.
+ *
+ * @param skipReadThrough Skip read-through cache store flag.
+ */
+ private void skipReadThrough(boolean skipReadThrough) {
+ flags = skipReadThrough ? (byte)(flags | SKIP_READ_THROUGH_FLAG_MASK)
: (byte)(flags & ~SKIP_READ_THROUGH_FLAG_MASK);
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipReadThrough() {
+ return (flags & SKIP_READ_THROUGH_FLAG_MASK) != 0;
+ }
+
/**
* @param keepBinary Keep binary flag.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index d139f6523cb..3da4ef43ed5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -184,6 +184,9 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
/** Skip store flag. */
private final boolean skipStore;
+ /** Skip read-through cache store flag. */
+ private final boolean skipReadThrough;
+
/** Keep binary. */
private final boolean keepBinary;
@@ -215,6 +218,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
super(CU.boolReducer());
@@ -234,6 +238,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.skipStore = skipStore;
+ this.skipReadThrough = skipReadThrough;
this.keepBinary = keepBinary;
if (tx != null)
@@ -916,6 +921,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
skipStore,
+ skipReadThrough,
cctx.store().configured(),
keepBinary,
cctx.deploymentEnabled(),
@@ -1047,7 +1053,8 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
*
*/
private void loadMissingFromStore() {
- if (!skipStore && (read || cctx.loadPreviousValue()) &&
cctx.readThrough() && (needReturnVal || read)) {
+ if (!skipStore && !skipReadThrough
+ && (read || cctx.loadPreviousValue()) && cctx.readThrough() &&
(needReturnVal || read)) {
final Map<KeyCacheObject, GridDhtCacheEntry> loadMap = new
LinkedHashMap<>();
final GridCacheVersion ver = version();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 4c1a601c1a0..321417ed44e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -132,6 +132,7 @@ public class GridDhtLockRequest extends
GridDistributedLockRequest {
int taskNameHash,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean storeUsed,
boolean keepBinary,
boolean addDepInfo,
@@ -151,6 +152,7 @@ public class GridDhtLockRequest extends
GridDistributedLockRequest {
dhtCnt,
txSize,
skipStore,
+ skipReadThrough,
keepBinary,
addDepInfo);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 0b187c481ab..f6348b142c7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -259,6 +259,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
null,
req.accessTtl(),
req.skipStore(),
+ req.skipReadThrough(),
req.keepBinary());
}
@@ -738,6 +739,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
createTtl,
accessTtl,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary());
}
@@ -754,6 +756,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @return Lock future.
*/
public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable
Collection<KeyCacheObject> keys,
@@ -766,6 +769,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
if (keys == null || keys.isEmpty())
return new GridDhtFinishedFuture<>(true);
@@ -788,6 +792,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
if (fut.isDone()) // Possible in case of cancellation or timeout or
rollback.
@@ -970,6 +975,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
req.createTtl(),
req.accessTtl(),
req.skipStore(),
+ req.skipReadThrough(),
req.keepBinary());
// Add before mapping.
@@ -1042,6 +1048,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
req.createTtl(),
req.accessTtl(),
req.skipStore(),
+ req.skipReadThrough(),
req.keepBinary(),
req.nearCache());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 0dd3c27a5c8..e8dfb2b1f6f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -544,6 +544,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean nearCache
) {
@@ -616,6 +617,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
-1L,
null,
skipStore,
+ skipReadThrough,
keepBinary,
nearCache);
@@ -659,6 +661,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
}
catch (IgniteCheckedException e) {
@@ -677,6 +680,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @return Future for lock acquisition.
*/
private IgniteInternalFuture<GridCacheReturn> obtainLockAsync(
@@ -688,6 +692,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
final long createTtl,
final long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock on keys [keys=" +
passedKeys + ']');
@@ -718,6 +723,7 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
return new GridEmbeddedFuture<>(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 5d4cb463892..36cfb822380 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -399,7 +399,7 @@ public final class GridDhtTxPrepareFuture extends
GridCacheCompoundFuture<Ignite
tx.nearOnOriginatingNode() || tx.hasInterceptor();
if (readOld) {
- boolean readThrough = !txEntry.skipStore() &&
+ boolean readThrough = !txEntry.skipStore() &&
!txEntry.skipReadThrough() &&
(txEntry.op() == TRANSFORM || ((retVal || hasFilters)
&& cacheCtx.config().isLoadPreviousValue()));
boolean evt = retVal || txEntry.op() == TRANSFORM;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 4685034a601..e60041e34ed 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -300,6 +300,7 @@ public class GridDhtTxRemote extends
GridDistributedTxRemoteAdapter {
* @param entryProcessors Entry processors.
* @param ttl TTL.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
*/
public void addWrite(GridCacheContext cacheCtx,
GridCacheOperation op,
@@ -308,6 +309,7 @@ public class GridDhtTxRemote extends
GridDistributedTxRemoteAdapter {
@Nullable Collection<T2<EntryProcessor<Object, Object, Object>,
Object[]>> entryProcessors,
long ttl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
checkInternal(key);
@@ -325,6 +327,7 @@ public class GridDhtTxRemote extends
GridDistributedTxRemoteAdapter {
cached,
null,
skipStore,
+ skipReadThrough,
keepBinary);
txEntry.entryProcessors(entryProcessors);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d14cca8e63d..a309b7fad0a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1107,6 +1107,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
CU.filterArray(null),
taskNameHash,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
@@ -1294,6 +1295,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
filters,
ctx.kernalContext().job().currentTaskNameHash(),
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
@@ -1316,6 +1318,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
filters,
ctx.kernalContext().job().currentTaskNameHash(),
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
@@ -1374,6 +1377,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
CU.filterArray(null),
taskNameHash,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery(),
opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
@@ -2174,7 +2178,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
CacheObject old = entry.innerGet(
ver,
null,
- /*read through*/true,
+ /*read through*/!req.skipReadThrough(),
/*metrics*/true,
/*event*/true,
entryProc,
@@ -2575,7 +2579,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
writeVal,
req.invokeArguments(),
writeThrough() && !req.skipStore(),
- !req.skipStore(),
+ !req.skipStore() && !req.skipReadThrough(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
@@ -3168,6 +3172,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
req.filter(),
req.taskNameHash(),
req.skipStore(),
+ req.skipReadThrough(),
req.keepBinary(),
req.recovery(),
MAX_RETRIES,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 838c7efecdb..6c941ae6869 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -105,6 +105,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture
extends GridCacheFuture
/** Skip store flag. */
protected final boolean skipStore;
+ /** Skip read-through cache store flag. */
+ protected final boolean skipReadThrough;
+
/** Keep binary flag. */
protected final boolean keepBinary;
@@ -176,6 +179,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture
extends GridCacheFuture
CacheEntryPredicate[] filter,
int taskNameHash,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery,
int remapCnt,
@@ -196,6 +200,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture
extends GridCacheFuture
this.filter = filter;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
+ this.skipReadThrough = skipReadThrough;
this.keepBinary = keepBinary;
this.recovery = recovery;
deploymentLdrId =
U.contextDeploymentClassLoaderId(cctx.kernalContext());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 615361ac977..cb80feb7e2e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -74,6 +74,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
/** */
private static final int AFFINITY_MAPPING_FLAG_MASK = 0x80;
+ /** */
+ private static final int SKIP_READ_THROUGH_FLAG_MASK = 0x100;
+
/** Target node ID. */
protected UUID nodeId;
@@ -100,7 +103,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
/** Compressed boolean flags. Make sure 'toString' is updated when add new
flag. */
@GridToStringExclude
@Order(9)
- protected byte flags;
+ protected short flags;
/** */
private GridNearAtomicUpdateResponse res;
@@ -133,7 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
CacheWriteSynchronizationMode syncMode,
GridCacheOperation op,
int taskNameHash,
- byte flags,
+ short flags,
boolean addDepInfo
) {
this.cacheId = cacheId;
@@ -159,7 +162,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
* @param recovery Recovery mode flag.
* @return Flags.
*/
- static byte flags(
+ static short flags(
boolean nearCache,
boolean topLocked,
boolean retval,
@@ -167,8 +170,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
- boolean recovery) {
- byte flags = 0;
+ boolean recovery,
+ boolean skipReadThrough
+ ) {
+ short flags = 0;
if (nearCache)
flags |= NEAR_CACHE_FLAG_MASK;
@@ -194,6 +199,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
if (recovery)
flags |= RECOVERY_FLAG_MASK;
+ if (skipReadThrough)
+ flags |= SKIP_READ_THROUGH_FLAG_MASK;
+
return flags;
}
@@ -284,14 +292,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
/**
* @return Compressed boolean flags.
*/
- public byte flags() {
+ public short flags() {
return flags;
}
/**
* @param flags New compressed boolean flags.
*/
- public void flags(byte flags) {
+ public void flags(short flags) {
this.flags = flags;
}
@@ -420,6 +428,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
return isFlag(SKIP_STORE_FLAG_MASK);
}
+ /** */
+ public final boolean skipReadThrough() {
+ return isFlag(SKIP_READ_THROUGH_FLAG_MASK);
+ }
+
/**
* @param val Skip store flag.
*/
@@ -462,7 +475,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
* @param mask Mask.
*/
private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ flags = flag ? (short)(flags | mask) : (short)(flags & ~mask);
}
/**
@@ -589,7 +602,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
switch (writer.state()) {
case 4:
- if (!writer.writeByte(flags))
+ if (!writer.writeShort(flags))
return false;
writer.incrementState();
@@ -639,7 +652,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
switch (reader.state()) {
case 4:
- flags = reader.readByte();
+ flags = reader.readShort();
if (!reader.isLastRead())
return false;
@@ -695,16 +708,24 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
@Override public String toString() {
StringBuilder flags = new StringBuilder();
+ if (nearCache())
+ appendFlag(flags, "nearCache");
if (needPrimaryResponse())
appendFlag(flags, "needRes");
if (topologyLocked())
appendFlag(flags, "topLock");
+ if (affinityMapping())
+ appendFlag(flags, "affMapping");
if (skipStore())
appendFlag(flags, "skipStore");
if (keepBinary())
appendFlag(flags, "keepBinary");
if (returnValue())
appendFlag(flags, "retVal");
+ if (recovery())
+ appendFlag(flags, "recovery");
+ if (skipReadThrough())
+ appendFlag(flags, "skipReadThrough");
return S.toString(GridNearAtomicAbstractUpdateRequest.class, this,
"flags", flags.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 7ff682f3cbc..5105db3b9e3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -140,7 +140,7 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
@Nullable Object[] invokeArgs,
@Nullable CacheEntryPredicate[] filter,
int taskNameHash,
- byte flags,
+ short flags,
boolean addDepInfo,
int maxEntryCnt
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index edb273bc565..df39c01b127 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -69,7 +69,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends
GridNearAtomicSingl
GridCacheOperation op,
@Nullable CacheEntryPredicate[] filter,
int taskNameHash,
- byte flags,
+ short flags,
boolean addDepInfo
) {
super(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 52535038cc7..cdf72bab921 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -78,6 +78,7 @@ public class GridNearAtomicSingleUpdateFuture extends
GridNearAtomicAbstractUpda
* @param filter Entry filter.
* @param taskNameHash Task name hash code.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param keepBinary Keep binary flag.
* @param recovery {@code True} if cache operation is called in recovery
mode.
* @param remapCnt Maximum number of retries.
@@ -95,6 +96,7 @@ public class GridNearAtomicSingleUpdateFuture extends
GridNearAtomicAbstractUpda
final CacheEntryPredicate[] filter,
int taskNameHash,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery,
int remapCnt,
@@ -110,6 +112,7 @@ public class GridNearAtomicSingleUpdateFuture extends
GridNearAtomicAbstractUpda
filter,
taskNameHash,
skipStore,
+ skipReadThrough,
keepBinary,
recovery,
remapCnt,
@@ -542,14 +545,15 @@ public class GridNearAtomicSingleUpdateFuture extends
GridNearAtomicAbstractUpda
GridNearAtomicAbstractUpdateRequest req;
- byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+ short flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
topLocked,
retval,
mappingKnown,
needPrimaryRes,
skipStore,
keepBinary,
- recovery);
+ recovery,
+ skipReadThrough);
if (canUseSingleRequest()) {
if (op == TRANSFORM) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 9400d7bf14f..8fb464ccfab 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -91,7 +91,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends
GridNearAtomicSingl
GridCacheOperation op,
@Nullable Object[] invokeArgs,
int taskNameHash,
- byte flags,
+ short flags,
boolean addDepInfo
) {
super(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index d6cc7552e6c..ecf017cce00 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -87,7 +87,7 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractUpd
CacheWriteSynchronizationMode syncMode,
GridCacheOperation op,
int taskNameHash,
- byte flags,
+ short flags,
boolean addDepInfo
) {
super(cacheId,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e2ac139b1e4..ccde3cca66f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -127,6 +127,7 @@ public class GridNearAtomicUpdateFuture extends
GridNearAtomicAbstractUpdateFutu
final CacheEntryPredicate[] filter,
int taskNameHash,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery,
int remapCnt,
@@ -143,6 +144,7 @@ public class GridNearAtomicUpdateFuture extends
GridNearAtomicAbstractUpdateFutu
filter,
taskNameHash,
skipStore,
+ skipReadThrough,
keepBinary,
recovery,
remapCnt,
@@ -991,14 +993,15 @@ public class GridNearAtomicUpdateFuture extends
GridNearAtomicAbstractUpdateFutu
PrimaryRequestState mapped = pendingMappings.get(nodeId);
if (mapped == null) {
- byte flags =
GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+ short flags =
GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
topLocked,
retval,
mappingKnown,
needPrimaryRes,
skipStore,
keepBinary,
- recovery);
+ recovery,
+ skipReadThrough);
GridNearAtomicFullUpdateRequest req = new
GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
@@ -1104,14 +1107,15 @@ public class GridNearAtomicUpdateFuture extends
GridNearAtomicAbstractUpdateFutu
boolean needPrimaryRes = !mappingKnown || primary.isLocal() ||
nodes.size() == 1 || nearEnabled;
- byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+ short flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
topLocked,
retval,
mappingKnown,
needPrimaryRes,
skipStore,
keepBinary,
- recovery);
+ recovery,
+ skipReadThrough);
GridNearAtomicFullUpdateRequest req = new
GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 6f247a8384d..85f2066f040 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -204,6 +204,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
skipVals,
false,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
recovery,
readRepairStrategy,
needVer);
@@ -306,6 +307,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
skipVals,
false,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
recovery,
readRepairStrategy,
needVer);
@@ -658,6 +660,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
createTtl,
accessTtl,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery());
@@ -901,6 +904,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @return Lock future.
*/
IgniteInternalFuture<Exception> lockAllAsync(
@@ -916,6 +920,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
final long createTtl,
final long accessTtl,
final boolean skipStore,
+ final boolean skipReadThrough,
final boolean keepBinary
) {
assert keys != null;
@@ -940,6 +945,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
}
else {
@@ -961,6 +967,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
}
}
@@ -996,6 +1003,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
final long createTtl,
final long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
int cnt = keys.size();
@@ -1013,6 +1021,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
// Add before mapping.
@@ -1081,6 +1090,7 @@ public class GridDhtColocatedCache<K, V> extends
GridDhtTransactionalCacheAdapte
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
return new GridDhtEmbeddedFuture<>(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 1059edb4b43..6ffdec36725 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -167,6 +167,9 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
/** Skip store flag. */
private final boolean skipStore;
+ /** Skip read-through cache store flag. */
+ private final boolean skipReadThrough;
+
/** */
private Deque<GridNearLockMapping> mappings;
@@ -206,6 +209,7 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery
) {
@@ -222,6 +226,7 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.skipStore = skipStore;
+ this.skipReadThrough = skipReadThrough;
this.keepBinary = keepBinary;
this.recovery = recovery;
@@ -1080,6 +1085,7 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
read ? createTtl : -1L,
read ? accessTtl : -1L,
skipStore,
+ skipReadThrough,
keepBinary,
clientFirst,
false,
@@ -1253,6 +1259,7 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
// Add new future.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ac361269221..0fb4f1b5372 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -441,6 +441,7 @@ public class GridNearAtomicCache<K, V> extends
GridNearCacheAdapter<K, V> {
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
needVer);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index ee849168ec9..26e95bf5776 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -208,6 +208,7 @@ public abstract class GridNearCacheAdapter<K, V> extends
GridDistributedCacheAda
* @param expiryPlc Expiry policy.
* @param skipVal Skip value flag.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param needVer Need version.
* @return Loaded values.
*/
@@ -221,6 +222,7 @@ public abstract class GridNearCacheAdapter<K, V> extends
GridDistributedCacheAda
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
boolean skipStore,
+ boolean skipReadThrough,
boolean needVer
) {
if (F.isEmpty(keys))
@@ -232,7 +234,7 @@ public abstract class GridNearCacheAdapter<K, V> extends
GridDistributedCacheAda
GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
keys,
- !skipStore,
+ !skipStore && !skipReadThrough,
forcePrimary,
txx,
taskName,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 419a00a8fb3..5e98920b60c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -360,6 +360,7 @@ public class GridNearCacheEntry extends
GridDistributedCacheEntry {
null,
false,
/*skip store*/false,
+ /*Skip read-through*/false,
false
).get().get(keyValue(false));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 27c01c51658..51a5eaafbcf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -157,6 +157,9 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
/** Skip store flag. */
private final boolean skipStore;
+ /** Skip read-through cache store flag. */
+ private final boolean skipReadThrough;
+
/** Mappings to proceed. */
@GridToStringExclude
private Queue<GridNearLockMapping> mappings;
@@ -193,6 +196,7 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery
) {
@@ -210,6 +214,7 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.skipStore = skipStore;
+ this.skipReadThrough = skipReadThrough;
this.keepBinary = keepBinary;
this.recovery = recovery;
@@ -1064,6 +1069,7 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
read ? createTtl : -1L,
read ? accessTtl : -1L,
skipStore,
+ skipReadThrough,
keepBinary,
clientFirst,
true,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index a06f059e32c..4cb217dd9cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -130,6 +130,7 @@ public class GridNearLockRequest extends
GridDistributedLockRequest {
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean firstClientReq,
boolean nearCache,
@@ -151,6 +152,7 @@ public class GridNearLockRequest extends
GridDistributedLockRequest {
keyCnt,
txSize,
skipStore,
+ skipReadThrough,
keepBinary,
addDepInfo);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index d084ea3f335..68c4bc9331f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -132,6 +132,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
CacheOperationContext opCtx = ctx.operationContextPerCall();
final boolean skipStore = opCtx != null && opCtx.skipStore();
+ final boolean skipReadThrough = opCtx != null &&
opCtx.skipReadThrough();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@@ -143,6 +144,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
skipVals,
false,
skipStore,
+ skipReadThrough,
recovery,
readRepairStrategy,
needVer);
@@ -159,6 +161,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
skipStore,
+ skipReadThrough,
needVer);
}
@@ -303,6 +306,7 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
createTtl,
accessTtl,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
opCtx != null && opCtx.isKeepBinary(),
opCtx != null && opCtx.recovery());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 5552517d51f..3e15df5f20a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -606,6 +606,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
filters,
ret,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
keepBinary,
opCtx != null && opCtx.recovery(),
dataCenterId);
@@ -781,6 +782,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
drMap,
null,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
false,
keepBinary,
opCtx != null && opCtx.recovery(),
@@ -880,6 +882,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
* @param filter User filters.
* @param ret Return value.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param recovery Recovery flag.
* @param dataCenterId Optional data center Id.
* @return Future for entry values loading.
@@ -896,6 +899,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
final CacheEntryPredicate[] filter,
final GridCacheReturn ret,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean recovery,
Byte dataCenterId) {
@@ -933,6 +937,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
ret,
/*enlisted*/null,
skipStore,
+ skipReadThrough,
false,
hasFilters,
needVal,
@@ -999,6 +1004,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
* @param drPutMap DR put map (optional).
* @param drRmvMap DR remove map (optional).
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param singleRmv {@code True} for single key remove operation ({@link
Cache#remove(Object)}.
* @param keepBinary Keep binary flag.
* @param recovery Recovery flag.
@@ -1020,6 +1026,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
@Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
@Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
boolean skipStore,
+ boolean skipReadThrough,
final boolean singleRmv,
final boolean keepBinary,
final boolean recovery,
@@ -1124,6 +1131,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
ret,
enlisted,
skipStore,
+ skipReadThrough,
singleRmv,
hasFilters,
needVal,
@@ -1195,6 +1203,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
* @param ret Return value.
* @param enlisted Enlisted keys collection.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param singleRmv {@code True} for single remove operation.
* @param hasFilters {@code True} if filters not empty.
* @param needVal {@code True} if value is needed.
@@ -1217,6 +1226,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
final GridCacheReturn ret,
@Nullable final Collection<KeyCacheObject> enlisted,
boolean skipStore,
+ boolean skipReadThrough,
boolean singleRmv,
boolean hasFilters,
final boolean needVal,
@@ -1336,6 +1346,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
drExpireTime,
drVer,
skipStore,
+ skipReadThrough,
keepBinary,
CU.isNearEnabled(cacheCtx));
}
@@ -1352,6 +1363,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
-1L,
null,
skipStore,
+ skipReadThrough,
keepBinary,
CU.isNearEnabled(cacheCtx));
}
@@ -1388,6 +1400,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
drExpireTime,
drVer,
skipStore,
+ skipReadThrough,
keepBinary,
CU.isNearEnabled(cacheCtx));
@@ -1507,6 +1520,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
drExpireTime,
drVer,
skipStore,
+ skipReadThrough,
keepBinary,
CU.isNearEnabled(cacheCtx));
@@ -1658,6 +1672,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
null,
drMap,
opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.skipReadThrough(),
singleRmv,
keepBinary,
opCtx != null && opCtx.recovery(),
@@ -1793,6 +1808,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param readRepairStrategy Read Repair strategy.
* @return Future for this get.
*/
@@ -1805,6 +1821,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
final boolean skipVals,
final boolean keepCacheObjects,
final boolean skipStore,
+ final boolean skipReadThrough,
final boolean recovery,
final ReadRepairStrategy readRepairStrategy,
final boolean needVer) {
@@ -1847,6 +1864,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
skipVals,
keepCacheObjects,
skipStore,
+ skipReadThrough,
recovery,
readRepairStrategy,
needVer);
@@ -2034,6 +2052,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
null,
null,
skipStore,
+ skipReadThrough,
!deserializeBinary,
recovery,
null);
@@ -2167,6 +2186,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects flag.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param recovery Recovery flag.
* @return Enlisted keys.
* @throws IgniteCheckedException If failed.
@@ -2184,6 +2204,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
boolean skipVals,
boolean keepCacheObjects,
boolean skipStore,
+ boolean skipReadThrough,
boolean recovery,
ReadRepairStrategy readRepairStrategy,
final boolean needVer
@@ -2431,6 +2452,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
-1L,
null,
skipStore,
+ skipReadThrough,
!deserializeBinary,
CU.isNearEnabled(cacheCtx));
@@ -3855,6 +3877,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
* @param accessTtl Access ttl.
* @param <K> Key type.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param keepBinary Keep binary flag.
* @return Future with respond.
*/
@@ -3865,6 +3888,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
long createTtl,
long accessTtl,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary) {
assert pessimistic();
@@ -3900,6 +3924,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
createTtl,
accessTtl,
skipStore,
+ skipReadThrough,
keepBinary);
return new GridEmbeddedFuture<>(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index a1293ed4030..936033d8b38 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -256,6 +256,7 @@ public class GridNearTxRemote extends
GridDistributedTxRemoteAdapter {
* @param val Value.
* @param drVer Data center replication version.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @throws IgniteCheckedException If failed.
* @return {@code True} if entry has been enlisted.
*/
@@ -266,6 +267,7 @@ public class GridNearTxRemote extends
GridDistributedTxRemoteAdapter {
CacheObject val,
@Nullable GridCacheVersion drVer,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary
) throws IgniteCheckedException {
checkInternal(key);
@@ -300,6 +302,7 @@ public class GridNearTxRemote extends
GridDistributedTxRemoteAdapter {
cached,
drVer,
skipStore,
+ skipReadThrough,
keepBinary);
txState.addWriteEntry(key, txEntry);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 37ad1ff8466..2dd67952f08 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -98,6 +98,9 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
/** Flag indicating that 'invoke' operation was no-op on primary. */
private static final int TX_ENTRY_NOOP_ON_PRIMARY = 1 << 4;
+ /** Skip read-through cache store flag bit mask. */
+ private static final int TX_ENTRY_SKIP_READ_THROUGH_FLAG_MASK = 1 << 5;
+
/** Prepared flag updater. */
private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD
=
AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -243,6 +246,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
* @param entry Cache entry.
* @param conflictVer Data center replication version.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
*/
public IgniteTxEntry(GridCacheContext<?, ?> ctx,
IgniteInternalTx tx,
@@ -253,6 +257,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
GridCacheEntryEx entry,
@Nullable GridCacheVersion conflictVer,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary
) {
assert ctx != null;
@@ -269,6 +274,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
this.conflictVer = conflictVer;
skipStore(skipStore);
+ skipReadThrough(skipReadThrough);
keepBinary(keepBinary);
key = entry.key();
@@ -290,6 +296,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
* @param filters Put filters.
* @param conflictVer Data center replication version.
* @param skipStore Skip store flag.
+ * @param skipReadThrough Skip read-through cache store flag.
* @param addReader Add reader flag.
*/
public IgniteTxEntry(GridCacheContext<?, ?> ctx,
@@ -303,6 +310,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
CacheEntryPredicate[] filters,
GridCacheVersion conflictVer,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean addReader
) {
@@ -320,6 +328,7 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
this.conflictVer = conflictVer;
skipStore(skipStore);
+ skipReadThrough(skipReadThrough);
keepBinary(keepBinary);
addReader(addReader);
@@ -518,6 +527,22 @@ public class IgniteTxEntry implements GridPeerDeployAware,
Message {
return isFlag(TX_ENTRY_SKIP_STORE_FLAG_MASK);
}
+ /**
+ * Sets skip store flag value.
+ *
+ * @param skipReadThrough Skip read-through cache store flag.
+ */
+ public void skipReadThrough(boolean skipReadThrough) {
+ setFlag(skipReadThrough, TX_ENTRY_SKIP_READ_THROUGH_FLAG_MASK);
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipReadThrough() {
+ return isFlag(TX_ENTRY_SKIP_READ_THROUGH_FLAG_MASK);
+ }
+
/**
* @param oldValOnPrimary {@code True} If old value for was non null on
primary node.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 989065fa15b..10f38a3d105 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1789,6 +1789,7 @@ public class IgniteTxHandler {
if (cacheCtx.readThroughConfigured() &&
!entry.skipStore() &&
+ !entry.skipReadThrough() &&
entry.op() == TRANSFORM &&
entry.oldValueOnPrimary() &&
!entry.hasValue()) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b45173c8c70..b027ff10719 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1369,6 +1369,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
long drExpireTime,
@Nullable GridCacheVersion drVer,
boolean skipStore,
+ boolean skipReadThrough,
boolean keepBinary,
boolean addReader
) {
@@ -1410,6 +1411,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
// Keep old skipStore and keepBinary flags.
old.skipStore(skipStore);
+ old.skipReadThrough(skipReadThrough);
old.keepBinary(keepBinary);
// Update ttl if specified.
@@ -1440,6 +1442,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
filter,
drVer,
skipStore,
+ skipReadThrough,
keepBinary,
addReader);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 6edb0f55643..fd144b035c6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -434,6 +434,7 @@ public abstract class GridCacheQueueAdapter<T> extends
AbstractCollection<T> imp
return (GridCacheQueueAdapter<V1>)this;
opCtx = opCtx == null ? new CacheOperationContext(
+ false,
false,
true,
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index 172e94713bd..2c6c0769949 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -56,6 +56,7 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import
org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
@@ -1786,6 +1787,14 @@ public class QueryUtils {
return false;
}
+ /** */
+ public static <K, V> IgniteInternalCache<K, V>
cacheForDML(IgniteInternalCache<K, V> c) {
+ if (!c.configuration().isReadThrough() ||
c.configuration().isLoadPreviousValue())
+ return c;
+ else
+ return c.withSkipReadThrough();
+ }
+
/**
* Private constructor.
*/
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
index 5e4ad2830ff..ecc7185222f 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import static
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+import static
org.apache.ignite.internal.processors.query.QueryUtils.cacheForDML;
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static
org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
@@ -237,7 +238,7 @@ public class DmlBatchSender {
Map<Object, EntryProcessorResult<Boolean>> res;
try {
- res = cctx.cache().invokeAll(batch.rowProcessors());
+ res = cacheForDML(cctx.cache()).invokeAll(batch.rowProcessors());
}
catch (IgniteCheckedException e) {
for (Integer rowNum : batch.rowNumbers().values()) {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
index 44bc49b07be..95122b2e429 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -59,6 +59,7 @@ import org.h2.value.ValueTimestamp;
import static
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
import static
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+import static
org.apache.ignite.internal.processors.query.QueryUtils.cacheForDML;
import static
org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
@@ -205,7 +206,7 @@ public class DmlUtils {
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> "1"))
) {
- if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
+ if (cacheForDML(cctx.cache()).putIfAbsent(t.getKey(),
t.getValue()))
return 1;
else
throw new IgniteSQLException(errMsg + '[' + t.getKey() +
"]]", DUPLICATE_KEY);
@@ -315,7 +316,7 @@ public class DmlUtils {
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () -> "1"))
) {
- cctx.cache().put(t.getKey(), t.getValue());
+ cacheForDML(cctx.cache()).put(t.getKey(), t.getValue());
}
return 1;
@@ -338,7 +339,7 @@ public class DmlUtils {
.create(SQL_CACHE_UPDATE, MTC.span())
.addTag(SQL_CACHE_UPDATES, () ->
Integer.toString(rows.size())))
) {
- cctx.cache().putAll(rows);
+ cacheForDML(cctx.cache()).putAll(rows);
resCnt += rows.size();
@@ -554,7 +555,7 @@ public class DmlUtils {
if (opCtx == null)
// Mimics behavior of GridCacheAdapter#keepBinary and
GridCacheProxyImpl#keepBinary
- newOpCtx = new CacheOperationContext(false, true, null, false,
null, false, null, null);
+ newOpCtx = new CacheOperationContext(false, false, true, null,
false, null, false, null, null);
else if (!opCtx.isKeepBinary())
newOpCtx = opCtx.keepBinary();