This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 970489eb928 IGNITE-26755 Allow to create table for already existing
cache with defined schema - Fixes #12453.
970489eb928 is described below
commit 970489eb928fd59c52d69e0743a2af6d69f8cfb3
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Nov 1 16:47:50 2025 +0300
IGNITE-26755 Allow to create table for already existing cache with defined
schema - Fixes #12453.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../integration/AbstractDdlIntegrationTest.java | 2 +
.../integration/TableDdlIntegrationTest.java | 82 ++++++++++++++++++++--
.../cache/query/IgniteQueryErrorCode.java | 3 +
.../processors/query/GridQueryProcessor.java | 49 ++++++++-----
.../internal/processors/query/QueryUtils.java | 5 ++
.../query/schema/SchemaOperationException.java | 6 ++
6 files changed, 124 insertions(+), 23 deletions(-)
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
index 5496b9c448e..2a4a6c5b280 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.internal.processors.query.calcite.integration;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -57,6 +58,7 @@ public class AbstractDdlIntegrationTest extends
AbstractBasicIntegrationTest {
return super.getConfiguration(igniteInstanceName)
.setSqlConfiguration(
new SqlConfiguration().setSqlSchemas("MY_SCHEMA")
+ .setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration())
)
.setDataStorageConfiguration(
new DataStorageConfiguration()
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 052d7fe2d2b..cba85ba7e9a 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -20,6 +20,7 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -31,9 +32,9 @@ import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -392,15 +393,84 @@ public class TableDdlIntegrationTest extends
AbstractDdlIntegrationTest {
*/
@Test
public void createTableOnExistingCache() {
- IgniteCache<Object, Object> cache =
client.getOrCreateCache("my_cache");
+ // Cache without SQL configuration.
+ IgniteCache<Object, Object> cache =
client.getOrCreateCache("my_cache0");
- sql("create table my_schema.my_table (f1 int, f2 varchar) with
cache_name=\"my_cache\"");
+ // DDL with explicit schema.
+ sql("create table my_schema.my_table (f1 int, f2 varchar) with
cache_name=\"my_cache0\"");
- sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
+ insertAndCheckSize(cache, "my_schema");
+
+ // Cache without SQL configuration.
+ cache = client.getOrCreateCache("my_cache1");
+
+ // DDL with implicit PUBLIC schema.
+ sql("create table my_table (f1 int, f2 varchar) with
cache_name=\"my_cache1\"");
+
+ insertAndCheckSize(cache, "public");
+
+ // Cache with defined schema.
+ cache = client.getOrCreateCache(new
CacheConfiguration<>("my_cache2").setSqlSchema("my_schema2"));
+
+ // DDL with explicit correct schema.
+ sql("create table my_schema2.my_table (f1 int, f2 varchar) with
cache_name=\"my_cache2\"");
+
+ insertAndCheckSize(cache, "my_schema2");
+
+ // Cache with defined schema.
+ cache = client.getOrCreateCache(new
CacheConfiguration<>("my_cache3").setSqlSchema("my_schema3"));
+
+ // DDL with explicit wrong schema.
+ assertThrows("create table my_schema.my_table2 (f1 int, f2 varchar)
with cache_name=\"my_cache3\"",
+ IgniteSQLException.class, "Invalid schema: MY_SCHEMA");
+
+ // DDL with explicit wrong schema.
+ assertThrows("create table public.my_table2 (f1 int, f2 varchar) with
cache_name=\"my_cache3\"",
+ IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+ // DDL with implicit wrong schema.
+ assertThrows("create table my_table2 (f1 int, f2 varchar) with
cache_name=\"my_cache3\"",
+ IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+ // DDL with implicit cache schema.
+ cache.query(new SqlFieldsQuery("create table my_table (f1 int, f2
varchar) with cache_name=\"my_cache3\""));
+
+ insertAndCheckSize(cache, "my_schema3");
+
+ // Cache with defined SQL functions, schema is defined by cache name.
+ cache = client.getOrCreateCache(new
CacheConfiguration<>("my_cache4").setSqlFunctionClasses(getClass()));
+
+ // DDL with explicit wrong schema.
+ assertThrows("create table public.my_table2 (f1 int, f2 varchar) with
cache_name=\"my_cache4\"",
+ IgniteSQLException.class, "Invalid schema: PUBLIC");
+
+ // DDL with explicit correct schema.
+ sql("create table \"my_cache4\".my_table (f1 int, f2 varchar) with
cache_name=\"my_cache4\"");
+
+ insertAndCheckSize(cache, "\"my_cache4\"");
+
+ // Cache with defined query entities.
+ client.getOrCreateCache(new CacheConfiguration<>("my_cache5")
+ .setQueryEntities(Collections.singleton(new
QueryEntity(Integer.class, Integer.class))));
+
+ assertThrows("create table \"my_cache5\".my_table (f1 int, f2 varchar)
with cache_name=\"my_cache5\"",
+ IgniteSQLException.class, "Cache is already indexed");
+
+ // Cache with indexed types.
+ client.getOrCreateCache(new CacheConfiguration<>("my_cache6")
+ .setIndexedTypes(Integer.class, Integer.class));
+
+ assertThrows("create table \"my_cache6\".my_table (f1 int, f2 varchar)
with cache_name=\"my_cache6\"",
+ IgniteSQLException.class, "Cache is already indexed");
+ }
+
+ /** */
+ private void insertAndCheckSize(IgniteCache<?, ?> cache, String schema) {
+ sql("insert into " + schema + ".my_table(f1, f2) values (1, '1'),(2,
'2')");
- assertThat(sql("select * from my_schema.my_table"), hasSize(2));
+ assertThat(sql("select * from " + schema + ".my_table"), hasSize(2));
- assertEquals(2, cache.size(CachePeekMode.PRIMARY));
+ assertEquals(2, cache.size());
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
index 9558b733b81..45ded69d7a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
@@ -95,6 +95,9 @@ public final class IgniteQueryErrorCode {
/** View does not exist. */
public static final int VIEW_NOT_FOUND = 3018;
+ /** Schema is invalid. */
+ public static final int INVALID_SCHEMA = 3019;
+
/* 4xxx - cache related runtime errors */
/** Attempt to INSERT a key that is already in cache. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 467016f464c..894382807e1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -250,8 +250,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
/** Coordinator node (initialized lazily). */
private ClusterNode crd;
- /** Registered cache names. */
- private final Collection<String> cacheNames =
ConcurrentHashMap.newKeySet();
+ /** Registered cache names to schema mapping. */
+ private final Map<String, String> cacheNamesToSchema = new
ConcurrentHashMap();
/** ID history for index create/drop discovery messages. */
private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist
=
@@ -976,7 +976,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
boolean cacheExists = cacheDesc != null &&
Objects.equals(msg.deploymentId(), cacheDesc.deploymentId());
- boolean cacheRegistered = cacheExists &&
cacheNames.contains(cacheName);
+ boolean cacheRegistered = cacheExists &&
cacheNamesToSchema.containsKey(cacheName);
// Validate schema state and decide whether we should proceed or not.
SchemaAbstractOperation op = msg.operation();
@@ -1696,8 +1696,21 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
}
}
else if (op instanceof SchemaAddQueryEntityOperation) {
- if (cacheNames.contains(op.cacheName()))
- err = new
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED,
op.cacheName());
+ String cacheSchema = cacheNamesToSchema.get(cacheName);
+
+ if (cacheSchema != null) {
+ if (!Objects.equals(cacheSchema, op.schemaName()))
+ err = new
SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA,
op.schemaName());
+ else {
+ for (QueryTypeIdKey t : types.keySet()) {
+ if (Objects.equals(t.cacheName(), cacheName)) {
+ err = new
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED,
cacheName);
+
+ break;
+ }
+ }
+ }
+ }
}
else
err = new SchemaOperationException("Unsupported operation: " + op);
@@ -1745,8 +1758,12 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
SchemaOperationException err = null;
if (op instanceof SchemaAddQueryEntityOperation) {
- if (cacheSupportSql(desc.cacheConfiguration()))
+ CacheConfiguration<?, ?> ccfg = desc.cacheConfiguration();
+
+ if (!F.isEmpty(ccfg.getQueryEntities()))
err = new
SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED,
desc.cacheName());
+ else if (!F.isEmpty(ccfg.getSqlSchema()) &&
!Objects.equals(ccfg.getSqlSchema(), op.schemaName()))
+ err = new
SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA,
op.schemaName());
return new T2<>(nop, err);
}
@@ -2173,15 +2190,13 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
else if (op instanceof SchemaAddQueryEntityOperation) {
SchemaAddQueryEntityOperation op0 =
(SchemaAddQueryEntityOperation)op;
- if (!cacheNames.contains(op0.cacheName())) {
- cacheInfo.onSchemaAddQueryEntity(op0);
+ cacheInfo.onSchemaAddQueryEntity(op0);
- T3<Collection<QueryTypeCandidate>, Map<String,
QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
- candRes = createQueryCandidates(op0.cacheName(),
op0.schemaName(), cacheInfo, op0.entities(),
- op0.isSqlEscape());
+ T3<Collection<QueryTypeCandidate>, Map<String,
QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
+ candRes = createQueryCandidates(op0.cacheName(),
op0.schemaName(), cacheInfo, op0.entities(),
+ op0.isSqlEscape());
- registerCache0(op0.cacheName(), op.schemaName(),
cacheInfo, candRes.get1(), false);
- }
+ registerCache0(op0.cacheName(), op.schemaName(), cacheInfo,
candRes.get1(), false);
if
(idxRebuildFutStorage.prepareRebuildIndexes(singleton(cacheInfo.cacheId()),
null).isEmpty())
rebuildIndexesFromHash0(cacheInfo.cacheContext(), false,
cancelTok);
@@ -2350,7 +2365,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
boolean isSql
) throws IgniteCheckedException {
synchronized (stateMux) {
- if (moduleEnabled()) {
+ if (moduleEnabled() && !cacheNamesToSchema.containsKey(cacheName))
{
ctx.indexProcessor().idxRowCacheRegistry().onCacheRegistered(cacheInfo);
schemaMgr.onCacheCreated(cacheName, schemaName,
cacheInfo.config().getSqlFunctionClasses());
@@ -2391,7 +2406,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
schemaMgr.onCacheTypeCreated(cacheInfo, desc, isSql);
}
- cacheNames.add(CU.mask(cacheName));
+ cacheNamesToSchema.putIfAbsent(CU.mask(cacheName), schemaName);
}
catch (IgniteCheckedException | RuntimeException e) {
onCacheStop0(cacheInfo, true, true);
@@ -2410,7 +2425,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @param clearIdx Clear flag.
*/
public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy,
boolean clearIdx) {
- if (!moduleEnabled() || !cacheNames.contains(cacheInfo.name()))
+ if (!moduleEnabled() ||
!cacheNamesToSchema.containsKey(cacheInfo.name()))
return;
String cacheName = cacheInfo.name();
@@ -2462,7 +2477,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
U.error(log, "Failed to clear schema manager on cache
unregister (will ignore): " + cacheName, e);
}
- cacheNames.remove(cacheName);
+ cacheNamesToSchema.remove(cacheName);
Iterator<Long> missedCacheTypeIter = missedCacheTypes.iterator();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index 2c6c0769949..a815c4ce450 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -1700,6 +1700,11 @@ public class QueryUtils {
break;
+ case SchemaOperationException.CODE_INVALID_SCHEMA:
+ sqlCode = IgniteQueryErrorCode.INVALID_SCHEMA;
+
+ break;
+
default:
sqlCode = IgniteQueryErrorCode.UNKNOWN;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
index 02b4282afed..a6f0efc33fc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -63,6 +63,9 @@ public class SchemaOperationException extends
IgniteCheckedException {
/** Code: schema not found. */
public static final int CODE_SCHEMA_NOT_FOUND = 11;
+ /** Code: schema not found. */
+ public static final int CODE_INVALID_SCHEMA = 12;
+
/** Error code. */
private final int code;
@@ -165,6 +168,9 @@ public class SchemaOperationException extends
IgniteCheckedException {
case CODE_SCHEMA_NOT_FOUND:
return "Schema doesn't exist: " + objName;
+ case CODE_INVALID_SCHEMA:
+ return "Invalid schema: " + objName;
+
default:
assert false;