This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a835bdf IGNITE-12444: SQL: Query reduce can fail with NPE on retry.
This closes #7138.
a835bdf is described below
commit a835bdf7fce305df302241eea2d6b15ff7751db1
Author: Andrey V. Mashenkov <[email protected]>
AuthorDate: Mon Dec 16 15:10:59 2019 +0300
IGNITE-12444: SQL: Query reduce can fail with NPE on retry. This closes
#7138.
---
.../query/h2/twostep/GridReduceQueryExecutor.java | 14 ++--
.../h2/twostep/RetryCauseMessageSelfTest.java | 85 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 7 deletions(-)
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 5f3886c..d3e7772 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -394,11 +394,13 @@ public class GridReduceQueryExecutor {
}
if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() -
startTime > retryTimeout)) {
+ // There are few cases when 'retryCause' can be undefined, so
we should throw exception with proper message here.
+ if (lastRun == null || lastRun.retryCause() == null)
+ throw new CacheException("Failed to map SQL query to
topology during timeout: " + retryTimeout + "ms");
+
UUID retryNodeId = lastRun.retryNodeId();
String retryCause = lastRun.retryCause();
- assert !F.isEmpty(retryCause);
-
throw new CacheException("Failed to map SQL query to topology
on data node [dataNodeId=" + retryNodeId +
", msg=" + retryCause + ']');
}
@@ -469,11 +471,9 @@ public class GridReduceQueryExecutor {
partsMap = nodesParts.partitionsMap();
qryMap = nodesParts.queryPartitionsMap();
- if (nodes == null)
+ if (F.isEmpty(nodes))
continue; // Retry.
- assert !nodes.isEmpty();
-
if (isReplicatedOnly || qry.explain()) {
ClusterNode locNode = ctx.discovery().localNode();
@@ -785,7 +785,7 @@ public class GridReduceQueryExecutor {
Collection<ClusterNode> nodes = nodesParts.nodes();
- if (nodes == null)
+ if (F.isEmpty(nodes))
throw new CacheException("Failed to determine nodes participating
in the update. " +
"Explanation (Retry update once topology recovers).");
@@ -855,7 +855,7 @@ public class GridReduceQueryExecutor {
U.error(log, "Error during update [localNodeId=" +
ctx.localNodeId() + "]", e);
- throw new CacheException("Failed to run update. " +
e.getMessage(), e);
+ throw new CacheException("Failed to run SQL update query. " +
e.getMessage(), e);
}
finally {
if (release)
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index b2143fe..8f229c2 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
@@ -24,16 +26,20 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -64,6 +70,9 @@ public class RetryCauseMessageSelfTest extends
AbstractIndexingCommonTest {
private static final String ORG_SQL = "select * from Organization";
/** */
+ static final String UPDATE_SQL = "UPDATE Person SET name=lower(?) ";
+
+ /** */
private static final String ORG = "org";
/** */
@@ -296,6 +305,82 @@ public class RetryCauseMessageSelfTest extends
AbstractIndexingCommonTest {
fail();
}
+ /**
+ * Test query remap failure reason.
+ */
+ @Test
+ public void testQueryMappingFailureMessage() {
+ final GridReduceQueryExecutor rdcQryExec =
GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec");
+ final ReducePartitionMapper mapper =
GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class,
"mapper");
+
+ final IgniteLogger logger = GridTestUtils.getFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "log");
+ final GridKernalContext ctx = GridTestUtils.getFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "ctx");
+
+ GridTestUtils.setFieldValue(rdcQryExec, GridReduceQueryExecutor.class,
"mapper",
+ new ReducePartitionMapper(ctx, logger) {
+ @Override public ReducePartitionMapResult
nodesForPartitions(List<Integer> cacheIds,
+ AffinityTopologyVersion topVer, int[] parts, boolean
isReplicatedOnly, long qryId) {
+ final ReducePartitionMapResult res =
super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+
+ return new
ReducePartitionMapResult(Collections.emptyList(), res.partitionsMap(),
res.queryPartitionsMap());
+ }
+ });
+
+ try {
+ SqlFieldsQuery qry = new
SqlFieldsQuery(JOIN_SQL).setArgs("Organization #0");
+
+ final Throwable throwable = GridTestUtils.assertThrows(log, () -> {
+ return personCache.query(qry).getAll();
+ }, CacheException.class, "Failed to map SQL query to topology
during timeout:");
+
+ throwable.printStackTrace();
+ }
+ finally {
+ GridTestUtils.setFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "mapper", mapper);
+ }
+ }
+
+ /**
+ * Test update query remap failure reason.
+ */
+ @Test
+ public void testUpdateQueryMappingFailureMessage() {
+ final GridReduceQueryExecutor rdcQryExec =
GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec");
+ final ReducePartitionMapper mapper =
GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class,
"mapper");
+
+ final IgniteLogger logger = GridTestUtils.getFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "log");
+ final GridKernalContext ctx = GridTestUtils.getFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "ctx");
+
+ GridTestUtils.setFieldValue(rdcQryExec, GridReduceQueryExecutor.class,
"mapper",
+ new ReducePartitionMapper(ctx, logger) {
+ @Override public ReducePartitionMapResult
nodesForPartitions(List<Integer> cacheIds,
+ AffinityTopologyVersion topVer, int[] parts, boolean
isReplicatedOnly, long qryId) {
+ final ReducePartitionMapResult res =
super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+
+ return new
ReducePartitionMapResult(Collections.emptyList(), res.partitionsMap(),
res.queryPartitionsMap());
+ }
+ });
+
+ try {
+ final SqlFieldsQueryEx qry = new SqlFieldsQueryEx(UPDATE_SQL,
false)
+ .setArgs("New Name");
+
+ GridTestUtils.assertThrows(log, () -> {
+ return personCache.query(qry).getAll();
+ }, CacheException.class, "Failed to map SQL query to topology
during timeout");
+
+ qry.setArgs("Another Name");
+ qry.setSkipReducerOnUpdate(true);
+
+ GridTestUtils.assertThrows(log, () -> {
+ return personCache.query(qry).getAll();
+ }, CacheException.class, "Failed to determine nodes participating
in the update. ");
+ }
+ finally {
+ GridTestUtils.setFieldValue(rdcQryExec,
GridReduceQueryExecutor.class, "mapper", mapper);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName)
throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);