This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 27ed13a6078 IGNITE-10516 Fixed duplicate index error after CREATE 
INDEX IF NOT EXISTS on different tables (#10940)
27ed13a6078 is described below

commit 27ed13a607815b385085b3d629e961265ff64b54
Author: Ilya Shishkov <[email protected]>
AuthorDate: Tue Oct 31 14:44:43 2023 +0300

    IGNITE-10516 Fixed duplicate index error after CREATE INDEX IF NOT EXISTS 
on different tables (#10940)
---
 docs/_docs/sql-reference/ddl.adoc                  |   4 +-
 .../calcite/IndexWithSameNameCalciteTest.java      |  30 ++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../ignite/common/ComputeTaskPermissionsTest.java  |   7 +-
 .../processors/query/GridQueryProcessor.java       |  19 +-
 .../query/schema/SchemaOperationManager.java       |  35 +-
 .../message/SchemaFinishDiscoveryMessage.java      |  14 +-
 .../message/SchemaOperationStatusMessage.java      |  30 +-
 .../processors/compute/ComputeJobStatusTest.java   |   7 +-
 .../query/schema/IndexWithSameNameTestBase.java    | 355 +++++++++++++++++++++
 .../ignite/internal/util/IgniteUtilsSelfTest.java  |  22 +-
 .../ignite/internal/util/lang/ConsumerX.java       |  40 +++
 .../processors/query/IndexWithSameNameH2Test.java  |  30 ++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   4 +-
 14 files changed, 538 insertions(+), 61 deletions(-)

diff --git a/docs/_docs/sql-reference/ddl.adoc 
b/docs/_docs/sql-reference/ddl.adoc
index 3f997738288..b5849da92c1 100644
--- a/docs/_docs/sql-reference/ddl.adoc
+++ b/docs/_docs/sql-reference/ddl.adoc
@@ -343,11 +343,11 @@ index_option := {INLINE_SIZE size | PARALLEL 
parallelism_level}
 
 Parameters:
 
-* `indexName` - the name of the index to be created.
+* `indexName` - the name of the index to be created. The index name must be 
unique per schema.
 * `ASC` - specifies ascending sort order (default).
 * `DESC` - specifies descending sort order.
 * `SPATIAL` - create the spatial index. Presently, only geometry types are 
supported.
-* `IF NOT EXISTS` - do not throw an error if an index with the same name 
already exists. The database checks indexes' names only, and does not consider 
columns types or count.
+* `IF NOT EXISTS` - do not throw an error if an index with the same name 
already exists. The database checks indexes' names only, and does not consider 
columns types or count. The index creation will be skipped if an index with the 
same name exist in the schema.
 * `index_option` - additional options for index creation:
 ** `INLINE_SIZE` - specifies index inline size in bytes. Depending on the 
size, Ignite will place the whole indexed value or a part of it directly into 
index pages, thus omitting extra calls to data pages and increasing queries' 
performance. Index inlining is enabled by default and the size is 
pre-calculated automatically based on the table structure. To disable inlining, 
set the size to 0 (not recommended). Refer to the 
link:SQL/sql-tuning#increasing-index-inline-size[Increasing Index I [...]
 ** `PARALLEL` - specifies the number of threads to be used in parallel for 
index creation. The greater number is set, the faster the index is created and 
built. If the value exceeds the number of CPUs, then it will be decreased to 
the number of cores. If the parameter is not specified, then the number of 
threads is calculated as 25% of the CPU cores available.
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexWithSameNameCalciteTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexWithSameNameCalciteTest.java
new file mode 100644
index 00000000000..d901d302d81
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexWithSameNameCalciteTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.QueryEngineConfiguration;
+import 
org.apache.ignite.internal.processors.query.schema.IndexWithSameNameTestBase;
+
+/** */
+public class IndexWithSameNameCalciteTest extends IndexWithSameNameTestBase {
+    /** {@inheritDoc} */
+    @Override protected QueryEngineConfiguration getEngineConfiguration() {
+        return new CalciteQueryEngineConfiguration().setDefault(true);
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index f6fc3c82b69..0d4d6122a3a 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
+import 
org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest;
 import 
org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
 import 
org.apache.ignite.internal.processors.query.calcite.UnstableTopologyTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
@@ -129,6 +130,7 @@ import org.junit.runners.Suite;
     TimeoutIntegrationTest.class,
     PartitionPruneTest.class,
     JoinRehashIntegrationTest.class,
+    IndexWithSameNameCalciteTest.class,
 })
 public class IntegrationTestSuite {
 }
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
index 817536347d0..e3e337468ac 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java
@@ -59,6 +59,7 @@ import 
org.apache.ignite.internal.processors.security.SecurityContext;
 import 
org.apache.ignite.internal.processors.security.compute.ComputePermissionCheckTest;
 import org.apache.ignite.internal.processors.security.impl.TestSecurityData;
 import 
org.apache.ignite.internal.processors.security.impl.TestSecurityPluginProvider;
+import org.apache.ignite.internal.util.lang.ConsumerX;
 import org.apache.ignite.internal.util.lang.RunnableX;
 import 
org.apache.ignite.internal.util.lang.gridfunc.AtomicIntegerFactoryCallable;
 import org.apache.ignite.internal.util.lang.gridfunc.RunnableWrapperClosure;
@@ -776,12 +777,6 @@ public class ComputeTaskPermissionsTest extends 
AbstractSecurityTest {
         T get() throws Exception;
     }
 
-    /** */
-    private interface ConsumerX<T> {
-        /** */
-        void accept(T t) throws Exception;
-    }
-
     /** */
     private interface BiConsumerX<T, U> {
         /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index d5b53e0af6c..4e5d54763a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -874,7 +874,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             assert proposeMsg != null;
 
             // Apply changes to public cache schema if operation is successful 
and original cache is still there.
-            if (!msg.hasError()) {
+            if (!msg.hasError() && !msg.nop()) {
                 DynamicCacheDescriptor cacheDesc = 
ctx.cache().cacheDescriptor(msg.operation().cacheName());
 
                 if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), 
proposeMsg.deploymentId())) {
@@ -1889,9 +1889,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param op Operation.
      * @param err Error (if any).
      */
-    public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable 
SchemaOperationException err) {
+    public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable 
SchemaOperationException err, boolean nop) {
         synchronized (stateMux) {
-            SchemaFinishDiscoveryMessage msg = new 
SchemaFinishDiscoveryMessage(op, err);
+            SchemaFinishDiscoveryMessage msg = new 
SchemaFinishDiscoveryMessage(op, err, nop);
 
             try {
                 ctx.discovery().sendCustomEvent(msg);
@@ -3865,16 +3865,17 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param destNodeId Destination node ID.
      * @param opId Operation ID.
      * @param err Error.
+     * @param nop No-op flag.
      */
-    public void sendStatusMessage(UUID destNodeId, UUID opId, 
SchemaOperationException err) {
+    public void sendStatusMessage(UUID destNodeId, UUID opId, 
SchemaOperationException err, boolean nop) {
         if (log.isDebugEnabled())
             log.debug("Sending schema operation status message [opId=" + opId 
+ ", crdNode=" + destNodeId +
-                ", err=" + err + ']');
+                ", err=" + err + ", nop=" + nop + ']');
 
         try {
             byte[] errBytes = marshalSchemaError(opId, err);
 
-            SchemaOperationStatusMessage msg = new 
SchemaOperationStatusMessage(opId, errBytes);
+            SchemaOperationStatusMessage msg = new 
SchemaOperationStatusMessage(opId, errBytes, nop);
 
             // Messages must go to dedicated schema pool. We cannot push them 
to query pool because in this case
             // they could be blocked with other query requests.
@@ -3883,7 +3884,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         catch (IgniteCheckedException e) {
             if (log.isDebugEnabled())
                 log.debug("Failed to send schema status response [opId=" + 
opId + ", destNodeId=" + destNodeId +
-                    ", err=" + e + ']');
+                    ", err=" + e + ", nop=" + nop + ']');
         }
     }
 
@@ -3915,7 +3916,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         log.debug("Received status message [opId=" + 
msg.operationId() +
                             ", sndNodeId=" + msg.senderNodeId() + ']');
 
-                    op.manager().onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()));
+                    op.manager().onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()), msg.nop());
 
                     return;
                 }
@@ -3945,7 +3946,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             SchemaOperationStatusMessage msg = it.next();
 
             if (F.eq(msg.operationId(), opId)) {
-                mgr.onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()));
+                mgr.onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()), msg.nop());
 
                 it.remove();
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
index fbedc40212b..e9b252a8905 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
@@ -55,7 +56,7 @@ public class SchemaOperationManager {
     private Collection<UUID> nodeIds;
 
     /** Node results. */
-    private Map<UUID, SchemaOperationException> nodeRess;
+    private Map<UUID, T2<SchemaOperationException, Boolean>> nodeRess;
 
     /** Current coordinator node. */
     private ClusterNode crd;
@@ -132,9 +133,9 @@ public class SchemaOperationManager {
 
         synchronized (mux) {
             if (isLocalCoordinator())
-                onNodeFinished(ctx.localNodeId(), err);
+                onNodeFinished(ctx.localNodeId(), err, worker.nop());
             else
-                qryProc.sendStatusMessage(crd.id(), operationId(), err);
+                qryProc.sendStatusMessage(crd.id(), operationId(), err, 
worker.nop());
         }
     }
 
@@ -144,28 +145,29 @@ public class SchemaOperationManager {
      * @param nodeId Node ID.
      * @param err Error.
      */
-    public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException 
err) {
+    public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException 
err, boolean nop) {
         synchronized (mux) {
             assert isLocalCoordinator();
 
             if (nodeRess.containsKey(nodeId)) {
                 if (log.isDebugEnabled())
                     log.debug("Received duplicate result [opId=" + 
operationId() + ", nodeId=" + nodeId +
-                        ", err=" + err + ']');
+                        ", err=" + err + ", nop=" + nop + ']');
 
                 return;
             }
 
             if (nodeIds.contains(nodeId)) {
                 if (log.isDebugEnabled())
-                    log.debug("Received result [opId=" + operationId() + ", 
nodeId=" + nodeId + ", err=" + err + ']');
+                    log.debug("Received result [opId=" + operationId() + ", 
nodeId=" + nodeId + ", err=" + err +
+                        ", nop=" + nop + ']');
 
-                nodeRess.put(nodeId, err);
+                nodeRess.put(nodeId, new T2<>(err, nop));
             }
             else {
                 if (log.isDebugEnabled())
                     log.debug("Received result from non-tracked node (joined 
after operation started, will ignore) " +
-                        "[opId=" + operationId() + ", nodeId=" + nodeId + ", 
err=" + err + ']');
+                        "[opId=" + operationId() + ", nodeId=" + nodeId + ", 
err=" + err + ", nop=" + nop + ']');
             }
 
             checkFinished();
@@ -217,22 +219,27 @@ public class SchemaOperationManager {
             if (nodeIds.size() == nodeRess.size()) {
                 // Initiate finish request.
                 SchemaOperationException err = null;
+                boolean nop = false;
 
-                for (Map.Entry<UUID, SchemaOperationException> nodeRes : 
nodeRess.entrySet()) {
-                    if (nodeRes.getValue() != null) {
-                        err = nodeRes.getValue();
+                for (Map.Entry<UUID, T2<SchemaOperationException, Boolean>> 
nodeRes : nodeRess.entrySet()) {
+                    err = nodeRes.getValue().get1();
 
+                    if (err != null)
                         break;
-                    }
+
+                    nop |= nodeRes.getValue().get2();
                 }
 
                 if (log.isDebugEnabled())
                     log.debug("Collected all results, about to send finish 
message [opId=" + operationId() +
-                        ", err=" + err + ']');
+                        ", err=" + err + ", nop=" + nop + ']');
+
+                // In case of no-op operation results from all nodes must be 
the same.
+                assert err != null || !nop || 
nodeRess.entrySet().stream().allMatch(e -> e.getValue().get2()) : nodeRess;
 
                 crdFinished = true;
 
-                qryProc.onCoordinatorFinished(worker.operation(), err);
+                qryProc.onCoordinatorFinished(worker.operation(), err, nop);
             }
         }
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 2245b24667a..3ae253cb1d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -36,16 +36,21 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
     /** Original propose message. */
     private transient SchemaProposeDiscoveryMessage proposeMsg;
 
+    /** No-op flag. */
+    private final boolean nop;
+
     /**
      * Constructor.
      *
      * @param op Original operation.
      * @param err Error.
+     * @param nop No-op flag.
      */
-    public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, 
SchemaOperationException err) {
+    public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, 
SchemaOperationException err, boolean nop) {
         super(op);
 
         this.err = err;
+        this.nop = nop;
     }
 
     /** {@inheritDoc} */
@@ -91,6 +96,13 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
         this.proposeMsg = proposeMsg;
     }
 
+    /**
+     * @return <code>True</code> if message in no-op.
+     */
+    public boolean nop() {
+        return nop;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", 
super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
index 2a27cfa3c8e..9ecbb9e8477 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
@@ -45,6 +45,9 @@ public class SchemaOperationStatusMessage implements Message {
     @GridDirectTransient
     private UUID sndNodeId;
 
+    /** No-op flag. */
+    private boolean nop;
+
     /**
      * Default constructor.
      */
@@ -57,10 +60,12 @@ public class SchemaOperationStatusMessage implements 
Message {
      *
      * @param opId Operation ID.
      * @param errBytes Error bytes.
+     * @param nop No-op flag.
      */
-    public SchemaOperationStatusMessage(UUID opId, byte[] errBytes) {
+    public SchemaOperationStatusMessage(UUID opId, byte[] errBytes, boolean 
nop) {
         this.opId = opId;
         this.errBytes = errBytes;
+        this.nop = nop;
     }
 
     /**
@@ -114,6 +119,12 @@ public class SchemaOperationStatusMessage implements 
Message {
                     return false;
 
                 writer.incrementState();
+
+            case 2:
+                if (!writer.writeBoolean("nop", nop))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -138,6 +149,14 @@ public class SchemaOperationStatusMessage implements 
Message {
             case 1:
                 errBytes = reader.readByteArray("errBytes");
 
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                nop = reader.readBoolean("nop");
+
                 if (!reader.isLastRead())
                     return false;
 
@@ -147,6 +166,13 @@ public class SchemaOperationStatusMessage implements 
Message {
         return reader.afterMessageRead(SchemaOperationStatusMessage.class);
     }
 
+    /**
+     * @return <code>True</code> if message is no-op.
+     */
+    public boolean nop() {
+        return nop;
+    }
+
     /** {@inheritDoc} */
     @Override public short directType() {
         return -53;
@@ -154,7 +180,7 @@ public class SchemaOperationStatusMessage implements 
Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 3;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
index e6ece4a4b3f..f68e51e167d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobStatusTest.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.ConsumerX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -374,12 +375,6 @@ public class ComputeJobStatusTest extends 
GridCommonAbstractTest {
         }
     }
 
-    /** */
-    private interface ConsumerX<T> {
-        /** */
-        void accept(T t) throws Exception;
-    }
-
     /** */
     private static class SimpleTask extends ComputeTaskAdapter<Void, Void> {
         /** */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
new file mode 100644
index 00000000000..fce48f3bbbc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import 
org.apache.ignite.internal.processors.query.schema.management.IndexDescriptor;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
+import org.apache.ignite.internal.util.lang.ConsumerX;
+import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static 
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
+import static 
org.apache.ignite.internal.processors.query.schema.IndexWithSameNameTestBase.SchemaFinishListeningTcpDiscoverySpi.discoSpi;
+import static org.apache.ignite.internal.util.lang.GridFunc.asSet;
+import static org.apache.ignite.internal.util.lang.GridFunc.t;
+
+/** */
+@SuppressWarnings("deprecation")
+public abstract class IndexWithSameNameTestBase extends GridCommonAbstractTest 
{
+    /** Test index. */
+    public static final String TEST_INDEX = "TEST_IDX";
+
+    /** Baseline size. */
+    public static final int BASELINE_SIZE = 3;
+
+    /** All nodes count, including non-baseline nodes. */
+    public static final int NODES_COUNT = BASELINE_SIZE + 2;
+
+    /** Client node index. */
+    public static final int CLIENT_INDEX = NODES_COUNT - 1;
+
+    /** Correct index fields. */
+    public static final Set<String> CORRECT_FIELDS = asSet("K1", "V1");
+
+    /** Fields of duplicate index. */
+    public static final Set<String> DUPLICATE_FIELDS = asSet("K2", "V2");
+
+    /** Schema finish latch. */
+    public static CountDownLatch schemaFinishLatch;
+
+    /** Index create operation identifiers. */
+    public static Map<Set<String>, UUID> idxOps = new ConcurrentHashMap<>();
+
+    /** Schema status latch. */
+    private CountDownLatch schemaStatusLatch;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+
+        idxOps.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        SchemaFinishListeningTcpDiscoverySpi discoSpi = new 
SchemaFinishListeningTcpDiscoverySpi();
+        
discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());
+
+        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();
+
+        commSpi.record((n, msg) -> {
+            if (msg instanceof SchemaOperationStatusMessage) {
+                schemaStatusLatch.countDown();
+
+                return true;
+            }
+
+            return false;
+        });
+
+        return cfg.setDiscoverySpi(discoSpi)
+            .setCommunicationSpi(commSpi)
+            .setSqlConfiguration(new SqlConfiguration()
+                .setQueryEnginesConfiguration(getEngineConfiguration()))
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(true)));
+    }
+
+    /** */
+    protected abstract QueryEngineConfiguration getEngineConfiguration();
+
+    /** */
+    @Test
+    public void testSeparateRequests() throws Exception {
+        doTestWithRestart(qryProc -> {
+            checkIndexCreate(qryProc, t("TABLE1", CORRECT_FIELDS, /* nop */ 
false));
+            checkIndexCreate(qryProc, t("TABLE2", DUPLICATE_FIELDS, /* nop */ 
true));
+        });
+    }
+
+    /** */
+    @Test
+    public void testMultiLineRequest() throws Exception {
+        doTestWithRestart(qryProc -> checkIndexCreate(
+            qryProc,
+            t("TABLE1", CORRECT_FIELDS, /* nop */ false),
+            t("TABLE2", DUPLICATE_FIELDS, /* nop */ true)));
+    }
+
+    /** */
+    public void doTestWithRestart(ConsumerX<GridQueryProcessor> 
idxCreateAction) throws Exception {
+        // Ensure coordinator.
+        IgniteEx crd = startGrid(0);
+
+        startGridsMultiThreaded(1, BASELINE_SIZE - 1);
+
+        waitForTopology(BASELINE_SIZE);
+
+        crd.cluster().state(ClusterState.ACTIVE);
+        awaitPartitionMapExchange();
+
+        // Non-baseline nodes.
+        startGrid(BASELINE_SIZE);
+        startClientGrid(CLIENT_INDEX);
+
+        waitForTopology(NODES_COUNT);
+        assertEquals(BASELINE_SIZE, 
crd.cluster().currentBaselineTopology().size());
+
+        GridQueryProcessor qryProc = crd.context().query();
+
+        qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE TABLE1 (K1 INT 
PRIMARY KEY, V1 INT)"), true).getAll();
+        qryProc.querySqlFields(new SqlFieldsQuery("CREATE TABLE TABLE2 (K2 INT 
PRIMARY KEY, V2 INT)"), true).getAll();
+
+        idxCreateAction.accept(qryProc);
+
+        assertSingleIndex(NODES_COUNT);
+
+        crd.cluster().state(ClusterState.INACTIVE);
+        awaitPartitionMapExchange();
+        stopAllGrids();
+
+        startGrids(BASELINE_SIZE);
+
+        assertSingleIndex(BASELINE_SIZE);
+    }
+
+    /**
+     * @param qryProc Query processor.
+     * @param idxParams Expected index parameters.
+     */
+    private void checkIndexCreate(GridQueryProcessor qryProc, 
GridTuple3<String, Set<String>, Boolean>... idxParams)
+        throws InterruptedException {
+        List<GridTuple3<String, Set<String>, Boolean>> idxParams0 = 
Arrays.asList(idxParams);
+
+        // Multi-statement index create.
+        String idxCreateSql = idxParams0.stream()
+            .map(t -> String.format("CREATE INDEX IF NOT EXISTS %s ON %s 
(%s)", TEST_INDEX,
+                t.get1(),
+                String.join(",", t.get2())))
+            .collect(Collectors.joining("\n;"));
+
+        // SchemaFinishDiscoveryMessage is processed twice on coordinator.
+        schemaFinishLatch = new CountDownLatch((NODES_COUNT + 1) * 
idxParams0.size());
+
+        // SchemaOperationStatusMessage is not sent by coordinator and client.
+        schemaStatusLatch = new CountDownLatch((NODES_COUNT - 2) * 
idxParams0.size());
+
+        qryProc.querySqlFields(new SqlFieldsQuery(idxCreateSql), true, false)
+            .forEach(FieldsQueryCursor::getAll);
+
+        schemaStatusLatch.await(getTestTimeout(), MILLISECONDS);
+        schemaFinishLatch.await(getTestTimeout(), MILLISECONDS);
+
+        checkNoOpMessages(idxParams0);
+    }
+
+    /**
+     * @param idxParams Expected index parameters.
+     */
+    private void checkNoOpMessages(List<GridTuple3<String, Set<String>, 
Boolean>> idxParams) {
+        for (int i = 0; i < NODES_COUNT; i++) {
+            List<Object> commMsgs = spi(grid(i)).recordedMessages(false);
+
+            boolean crdOrClient = i == 0 || i == CLIENT_INDEX;
+
+            String commMsgErr = "Unexpected SchemaOperationStatusMessage count 
on node: igniteIndex=" + i +
+                ", commMsgsCnt=" + commMsgs.size();
+
+            int sqlCnt = idxParams.size();
+
+            // SchemaOperationStatusMessage is not sent by coordinator and 
client.
+            assertEquals(commMsgErr, crdOrClient ? 0 : sqlCnt, 
commMsgs.size());
+
+            List<SchemaFinishDiscoveryMessage> discoMsgs = 
discoSpi(grid(i)).recordedMessages();
+
+            String discoMsgErr = "Unexpected SchemaFinishDiscoveryMessage 
count on node: igniteIndex=" + i +
+                ", discoMsgsCnt=" + discoMsgs.size();
+
+            // SchemaFinishDiscoveryMessage is processed twice on coordinator.
+            assertEquals(discoMsgErr, i == 0 ? 2 * sqlCnt : sqlCnt, 
discoMsgs.size());
+
+            for (GridTuple3<String, Set<String>, Boolean> param : idxParams) {
+                Set<String> expIdxFields = param.get2();
+                boolean expNop = param.get3();
+
+                UUID opId = idxOps.get(expIdxFields);
+
+                String commNopErr = String.format(
+                    "Unexpected no-op flag in communication messages: opId=%s, 
igniteIndex=%d, expNop=%b, recordedMsgs=%s",
+                    opId, i, expNop, commMsgs);
+
+                assertTrue(commNopErr, commMsgs.stream()
+                    .map(msg -> ((SchemaOperationStatusMessage)msg))
+                    .filter(msg -> opId.equals(msg.operationId()))
+                    .allMatch(msg -> msg.nop() == expNop));
+
+                String discoNopErr = String.format(
+                    "Unexpected no-op flag in discovery messages: opId=%s, 
igniteIndex=%d, expNop=%b, recordedMsgs=%s",
+                    opId, i, expNop, discoMsgs);
+
+                assertTrue(discoNopErr, discoMsgs.stream()
+                    .filter(msg -> opId.equals(msg.operation().id()))
+                    .allMatch(msg -> msg.nop() == expNop));
+            }
+        }
+    }
+
+    /** */
+    private void assertSingleIndex(int nodesCnt) {
+        for (int i = 0; i < nodesCnt; i++) {
+            Collection<IndexDescriptor> indexes = grid(i).context()
+                .query()
+                .schemaManager()
+                .allIndexes();
+
+            List<IndexDescriptor> filteredIdxs = indexes.stream()
+                .filter(idx -> TEST_INDEX.equalsIgnoreCase(idx.name()))
+                .collect(Collectors.toList());
+
+            assertEquals("There should be only one index", 1, 
filteredIdxs.size());
+
+            Set<String> actualFields = filteredIdxs.get(0)
+                .keyDefinitions()
+                .keySet()
+                .stream()
+                .filter(f -> !KEY_FIELD_NAME.equalsIgnoreCase(f))
+                .collect(Collectors.toSet());
+
+            assertEqualsCollectionsIgnoringOrder(CORRECT_FIELDS, actualFields);
+        }
+    }
+
+    /** */
+    public static class SchemaFinishListeningTcpDiscoverySpi extends 
TcpDiscoverySpi {
+        /** Filtered messages. */
+        private final List<SchemaFinishDiscoveryMessage> recordedMsgs = new 
CopyOnWriteArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                try {
+                    DiscoverySpiCustomMessage spiCustomMsg = 
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
+                        U.resolveClassLoader(ignite().configuration()));
+
+                    DiscoveryCustomMessage discoCustomMsg = 
((CustomMessageWrapper)spiCustomMsg).delegate();
+
+                    if (discoCustomMsg instanceof 
SchemaFinishDiscoveryMessage) {
+                        SchemaFinishDiscoveryMessage finishMsg = 
(SchemaFinishDiscoveryMessage)discoCustomMsg;
+
+                        SchemaIndexCreateOperation op = 
(SchemaIndexCreateOperation)finishMsg.operation();
+
+                        idxOps.putIfAbsent(new 
HashSet<>(op.index().getFieldNames()), op.id());
+
+                        recordedMsgs.add(finishMsg);
+
+                        schemaFinishLatch.countDown();
+                    }
+                }
+                catch (Throwable e) {
+                    log.error("Unexpected error", e);
+
+                    fail(e.getMessage());
+                }
+            }
+        }
+
+        /** */
+        public static SchemaFinishListeningTcpDiscoverySpi discoSpi(Ignite 
ignite) {
+            return 
(SchemaFinishListeningTcpDiscoverySpi)ignite.configuration().getDiscoverySpi();
+        }
+
+        /** */
+        public List<SchemaFinishDiscoveryMessage> recordedMessages() {
+            List<SchemaFinishDiscoveryMessage> recordedMsgs0 = new 
ArrayList<>(recordedMsgs);
+
+            recordedMsgs.clear();
+
+            return recordedMsgs0;
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 86af58cf6fa..14913482dee 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.ConsumerX;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1471,7 +1472,7 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
      * @param consumer Consumer.
      * @throws Exception If failed.
      */
-    private void readLines(String rsrcName, ThrowableConsumer<String> 
consumer) throws Exception {
+    private void readLines(String rsrcName, ConsumerX<String> consumer) throws 
Exception {
         byte[] content = readResource(getClass().getClassLoader(), rsrcName);
 
         try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(new ByteArrayInputStream(content)))) {
@@ -1655,25 +1656,6 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
     /** */
     private interface I5 extends I4 {}
 
-    /**
-     * Represents an operation that accepts a single input argument and returns
-     * no result. Unlike most other functional interfaces,
-     * {@code ThrowableConsumer} is expected to operate via side-effects.
-     *
-     * Also it is able to throw {@link Exception} unlike {@link Consumer}.
-     *
-     * @param <T> The type of the input to the operation.
-     */
-    @FunctionalInterface
-    private static interface ThrowableConsumer<T> {
-        /**
-         * Performs this operation on the given argument.
-         *
-         * @param t the input argument.
-         */
-        void accept(@Nullable T t) throws Exception;
-    }
-
     /**
      * Test to verify the {@link U#hashToIndex(int, int)}.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
new file mode 100644
index 00000000000..9c3e4fed385
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.util.function.Consumer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents an operation that accepts a single input argument and returns
+ * no result. Unlike most other functional interfaces,
+ * {@code ConsumerX} is expected to operate via side-effects.
+ *
+ * Also it is able to throw {@link Exception} unlike {@link Consumer}.
+ *
+ * @param <T> The type of the input to the operation.
+ */
+@FunctionalInterface
+public interface ConsumerX<T> {
+    /**
+     * Performs this operation on the given argument.
+     *
+     * @param t the input argument.
+     */
+    public void accept(@Nullable T t) throws Exception;
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IndexWithSameNameH2Test.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IndexWithSameNameH2Test.java
new file mode 100644
index 00000000000..9211c4dcf86
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IndexWithSameNameH2Test.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.indexing.IndexingQueryEngineConfiguration;
+import 
org.apache.ignite.internal.processors.query.schema.IndexWithSameNameTestBase;
+
+/** */
+public class IndexWithSameNameH2Test extends IndexWithSameNameTestBase {
+    /** {@inheritDoc} */
+    @Override protected QueryEngineConfiguration getEngineConfiguration() {
+        return new IndexingQueryEngineConfiguration().setDefault(true);
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 0fcb2155431..e2d14f082f3 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.database.IgniteTwoRegionsRebuildInd
 import org.apache.ignite.internal.processors.database.RebuildIndexTest;
 import 
org.apache.ignite.internal.processors.database.RebuildIndexWithHistoricalRebalanceTest;
 import 
org.apache.ignite.internal.processors.database.WalDisabledDuringIndexRecreateTest;
+import org.apache.ignite.internal.processors.query.IndexWithSameNameH2Test;
 import 
org.apache.ignite.internal.processors.query.h2.maintenance.MaintenanceRebuildIndexUtilsSelfTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -72,7 +73,8 @@ import org.junit.runners.Suite;
     ResumeCreateIndexTest.class,
     RenameIndexTreeTest.class,
     DropIndexTest.class,
-    MaintenanceRebuildIndexUtilsSelfTest.class
+    MaintenanceRebuildIndexUtilsSelfTest.class,
+    IndexWithSameNameH2Test.class,
 })
 public class IgnitePdsWithIndexingTestSuite {
 }


Reply via email to