This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 970489eb928 IGNITE-26755 Allow to create table for already existing 
cache with defined schema - Fixes #12453.
970489eb928 is described below

commit 970489eb928fd59c52d69e0743a2af6d69f8cfb3
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Nov 1 16:47:50 2025 +0300

    IGNITE-26755 Allow to create table for already existing cache with defined 
schema - Fixes #12453.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../integration/AbstractDdlIntegrationTest.java    |  2 +
 .../integration/TableDdlIntegrationTest.java       | 82 ++++++++++++++++++++--
 .../cache/query/IgniteQueryErrorCode.java          |  3 +
 .../processors/query/GridQueryProcessor.java       | 49 ++++++++-----
 .../internal/processors/query/QueryUtils.java      |  5 ++
 .../query/schema/SchemaOperationException.java     |  6 ++
 6 files changed, 124 insertions(+), 23 deletions(-)

diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
index 5496b9c448e..2a4a6c5b280 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.integration;
 
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -57,6 +58,7 @@ public class AbstractDdlIntegrationTest extends 
AbstractBasicIntegrationTest {
         return super.getConfiguration(igniteInstanceName)
             .setSqlConfiguration(
                 new SqlConfiguration().setSqlSchemas("MY_SCHEMA")
+                    .setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration())
             )
             .setDataStorageConfiguration(
                 new DataStorageConfiguration()
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 052d7fe2d2b..cba85ba7e9a 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -31,9 +32,9 @@ import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -392,15 +393,84 @@ public class TableDdlIntegrationTest extends 
AbstractDdlIntegrationTest {
      */
     @Test
     public void createTableOnExistingCache() {
-        IgniteCache<Object, Object> cache = 
client.getOrCreateCache("my_cache");
+        // Cache without SQL configuration.
+        IgniteCache<Object, Object> cache = 
client.getOrCreateCache("my_cache0");
 
-        sql("create table my_schema.my_table (f1 int, f2 varchar) with 
cache_name=\"my_cache\"");
+        // DDL with explicit schema.
+        sql("create table my_schema.my_table (f1 int, f2 varchar) with 
cache_name=\"my_cache0\"");
 
-        sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+        insertAndCheckSize(cache, "my_schema");
+
+        // Cache without SQL configuration.
+        cache = client.getOrCreateCache("my_cache1");
+
+        // DDL with implicit PUBLIC schema.
+        sql("create table my_table (f1 int, f2 varchar) with 
cache_name=\"my_cache1\"");
+
+        insertAndCheckSize(cache, "public");
+
+        // Cache with defined schema.
+        cache = client.getOrCreateCache(new 
CacheConfiguration<>("my_cache2").setSqlSchema("my_schema2"));
+
+        // DDL with explicit correct schema.
+        sql("create table my_schema2.my_table (f1 int, f2 varchar) with 
cache_name=\"my_cache2\"");
+
+        insertAndCheckSize(cache, "my_schema2");
+
+        // Cache with defined schema.
+        cache = client.getOrCreateCache(new 
CacheConfiguration<>("my_cache3").setSqlSchema("my_schema3"));
+
+        // DDL with explicit wrong schema.
+        assertThrows("create table my_schema.my_table2 (f1 int, f2 varchar) 
with cache_name=\"my_cache3\"",
+            IgniteSQLException.class, "Invalid schema: MY_SCHEMA");
+
+        // DDL with explicit wrong schema.
+        assertThrows("create table public.my_table2 (f1 int, f2 varchar) with 
cache_name=\"my_cache3\"",
+            IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+        // DDL with implicit wrong schema.
+        assertThrows("create table my_table2 (f1 int, f2 varchar) with 
cache_name=\"my_cache3\"",
+            IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+        // DDL with implicit cache schema.
+        cache.query(new SqlFieldsQuery("create table my_table (f1 int, f2 
varchar) with cache_name=\"my_cache3\""));
+
+        insertAndCheckSize(cache, "my_schema3");
+
+        // Cache with defined SQL functions, schema is defined by cache name.
+        cache = client.getOrCreateCache(new 
CacheConfiguration<>("my_cache4").setSqlFunctionClasses(getClass()));
+
+        // DDL with explicit wrong schema.
+        assertThrows("create table public.my_table2 (f1 int, f2 varchar) with 
cache_name=\"my_cache4\"",
+            IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+        // DDL with explicit correct schema.
+        sql("create table \"my_cache4\".my_table (f1 int, f2 varchar) with 
cache_name=\"my_cache4\"");
+
+        insertAndCheckSize(cache, "\"my_cache4\"");
+
+        // Cache with defined query entities.
+        client.getOrCreateCache(new CacheConfiguration<>("my_cache5")
+            .setQueryEntities(Collections.singleton(new 
QueryEntity(Integer.class, Integer.class))));
+
+        assertThrows("create table \"my_cache5\".my_table (f1 int, f2 varchar) 
with cache_name=\"my_cache5\"",
+            IgniteSQLException.class, "Cache is already indexed");
+
+        // Cache with indexed types.
+        client.getOrCreateCache(new CacheConfiguration<>("my_cache6")
+            .setIndexedTypes(Integer.class, Integer.class));
+
+        assertThrows("create table \"my_cache6\".my_table (f1 int, f2 varchar) 
with cache_name=\"my_cache6\"",
+            IgniteSQLException.class, "Cache is already indexed");
+    }
+
+    /** */
+    private void insertAndCheckSize(IgniteCache<?, ?> cache, String schema) {
+        sql("insert into " + schema + ".my_table(f1, f2) values (1, '1'),(2, 
'2')");
 
-        assertThat(sql("select * from my_schema.my_table"), hasSize(2));
+        assertThat(sql("select * from " + schema + ".my_table"), hasSize(2));
 
-        assertEquals(2, cache.size(CachePeekMode.PRIMARY));
+        assertEquals(2, cache.size());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
index 9558b733b81..45ded69d7a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
@@ -95,6 +95,9 @@ public final class IgniteQueryErrorCode {
     /** View does not exist. */
     public static final int VIEW_NOT_FOUND = 3018;
 
+    /** Schema is invalid. */
+    public static final int INVALID_SCHEMA = 3019;
+
     /* 4xxx - cache related runtime errors */
 
     /** Attempt to INSERT a key that is already in cache. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 467016f464c..894382807e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -250,8 +250,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Coordinator node (initialized lazily). */
     private ClusterNode crd;
 
-    /** Registered cache names. */
-    private final Collection<String> cacheNames = 
ConcurrentHashMap.newKeySet();
+    /** Registered cache names to schema mapping. */
+    private final Map<String, String> cacheNamesToSchema = new 
ConcurrentHashMap();
 
     /** ID history for index create/drop discovery messages. */
     private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist 
=
@@ -976,7 +976,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         boolean cacheExists = cacheDesc != null && 
Objects.equals(msg.deploymentId(), cacheDesc.deploymentId());
 
-        boolean cacheRegistered = cacheExists && 
cacheNames.contains(cacheName);
+        boolean cacheRegistered = cacheExists && 
cacheNamesToSchema.containsKey(cacheName);
 
         // Validate schema state and decide whether we should proceed or not.
         SchemaAbstractOperation op = msg.operation();
@@ -1696,8 +1696,21 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             }
         }
         else if (op instanceof SchemaAddQueryEntityOperation) {
-            if (cacheNames.contains(op.cacheName()))
-                err = new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, 
op.cacheName());
+            String cacheSchema = cacheNamesToSchema.get(cacheName);
+
+            if (cacheSchema != null) {
+                if (!Objects.equals(cacheSchema, op.schemaName()))
+                    err = new 
SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA, 
op.schemaName());
+                else {
+                    for (QueryTypeIdKey t : types.keySet()) {
+                        if (Objects.equals(t.cacheName(), cacheName)) {
+                            err = new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, 
cacheName);
+
+                            break;
+                        }
+                    }
+                }
+            }
         }
         else
             err = new SchemaOperationException("Unsupported operation: " + op);
@@ -1745,8 +1758,12 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         SchemaOperationException err = null;
 
         if (op instanceof SchemaAddQueryEntityOperation) {
-            if (cacheSupportSql(desc.cacheConfiguration()))
+            CacheConfiguration<?, ?> ccfg = desc.cacheConfiguration();
+
+            if (!F.isEmpty(ccfg.getQueryEntities()))
                 err = new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, 
desc.cacheName());
+            else if (!F.isEmpty(ccfg.getSqlSchema()) && 
!Objects.equals(ccfg.getSqlSchema(), op.schemaName()))
+                err = new 
SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA, 
op.schemaName());
 
             return new T2<>(nop, err);
         }
@@ -2173,15 +2190,13 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             else if (op instanceof SchemaAddQueryEntityOperation) {
                 SchemaAddQueryEntityOperation op0 = 
(SchemaAddQueryEntityOperation)op;
 
-                if (!cacheNames.contains(op0.cacheName())) {
-                    cacheInfo.onSchemaAddQueryEntity(op0);
+                cacheInfo.onSchemaAddQueryEntity(op0);
 
-                    T3<Collection<QueryTypeCandidate>, Map<String, 
QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
-                        candRes = createQueryCandidates(op0.cacheName(), 
op0.schemaName(), cacheInfo, op0.entities(),
-                        op0.isSqlEscape());
+                T3<Collection<QueryTypeCandidate>, Map<String, 
QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
+                    candRes = createQueryCandidates(op0.cacheName(), 
op0.schemaName(), cacheInfo, op0.entities(),
+                    op0.isSqlEscape());
 
-                    registerCache0(op0.cacheName(), op.schemaName(), 
cacheInfo, candRes.get1(), false);
-                }
+                registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, 
candRes.get1(), false);
 
                 if 
(idxRebuildFutStorage.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), 
null).isEmpty())
                     rebuildIndexesFromHash0(cacheInfo.cacheContext(), false, 
cancelTok);
@@ -2350,7 +2365,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         boolean isSql
     ) throws IgniteCheckedException {
         synchronized (stateMux) {
-            if (moduleEnabled()) {
+            if (moduleEnabled() && !cacheNamesToSchema.containsKey(cacheName)) 
{
                 
ctx.indexProcessor().idxRowCacheRegistry().onCacheRegistered(cacheInfo);
 
                 schemaMgr.onCacheCreated(cacheName, schemaName, 
cacheInfo.config().getSqlFunctionClasses());
@@ -2391,7 +2406,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         schemaMgr.onCacheTypeCreated(cacheInfo, desc, isSql);
                 }
 
-                cacheNames.add(CU.mask(cacheName));
+                cacheNamesToSchema.putIfAbsent(CU.mask(cacheName), schemaName);
             }
             catch (IgniteCheckedException | RuntimeException e) {
                 onCacheStop0(cacheInfo, true, true);
@@ -2410,7 +2425,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param clearIdx Clear flag.
      */
     public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, 
boolean clearIdx) {
-        if (!moduleEnabled() || !cacheNames.contains(cacheInfo.name()))
+        if (!moduleEnabled() || 
!cacheNamesToSchema.containsKey(cacheInfo.name()))
             return;
 
         String cacheName = cacheInfo.name();
@@ -2462,7 +2477,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                 U.error(log, "Failed to clear schema manager on cache 
unregister (will ignore): " + cacheName, e);
             }
 
-            cacheNames.remove(cacheName);
+            cacheNamesToSchema.remove(cacheName);
 
             Iterator<Long> missedCacheTypeIter = missedCacheTypes.iterator();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index 2c6c0769949..a815c4ce450 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -1700,6 +1700,11 @@ public class QueryUtils {
 
                 break;
 
+            case SchemaOperationException.CODE_INVALID_SCHEMA:
+                sqlCode = IgniteQueryErrorCode.INVALID_SCHEMA;
+
+                break;
+
             default:
                 sqlCode = IgniteQueryErrorCode.UNKNOWN;
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
index 02b4282afed..a6f0efc33fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -63,6 +63,9 @@ public class SchemaOperationException extends 
IgniteCheckedException {
     /** Code: schema not found. */
     public static final int CODE_SCHEMA_NOT_FOUND = 11;
 
+    /** Code: schema not found. */
+    public static final int CODE_INVALID_SCHEMA = 12;
+
     /** Error code. */
     private final int code;
 
@@ -165,6 +168,9 @@ public class SchemaOperationException extends 
IgniteCheckedException {
             case CODE_SCHEMA_NOT_FOUND:
                 return "Schema doesn't exist: " + objName;
 
+            case CODE_INVALID_SCHEMA:
+                return "Invalid schema: " + objName;
+
             default:
                 assert false;
 

Reply via email to