This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 81e5476 IGNITE-14077: Distributed schema manager. (#91)
81e5476 is described below
commit 81e54764023d60cd87896a5d550c542584c0fbe5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed May 19 12:07:34 2021 +0300
IGNITE-14077: Distributed schema manager. (#91)
---
.../schemas/table/TableConfigurationSchema.java | 2 +-
.../main/java/org/apache/ignite/table/Tuple.java | 18 +
.../configuration/internal/rest/JsonConverter.java | 2 +-
.../org/apache/ignite/internal/manager/Event.java | 2 +-
.../ignite/internal/manager/EventParameters.java | 2 +-
.../apache/ignite/internal/manager/Producer.java | 12 +-
.../runner/app/DynamicTableCreationTest.java | 162 +++++++
.../internal/runner/app/TableCreationTest.java | 190 ++++++++
.../apache/ignite/internal/app/IgnitionImpl.java | 4 +-
modules/schema/pom.xml | 7 +-
.../ignite/internal/schema/SchemaDescriptor.java | 15 +
.../ignite/internal/schema/SchemaManager.java | 303 +++++++++++-
.../ignite/internal/schema/SchemaRegistry.java} | 8 +-
.../ignite/internal/schema/event/SchemaEvent.java} | 16 +-
.../schema/event/SchemaEventParameters.java} | 42 +-
.../SchemaRegistrationConflictException.java} | 23 +-
.../schema/registry/SchemaRegistryException.java} | 25 +-
.../schema/registry/SchemaRegistryImpl.java | 147 ++++++
.../internal/schema/registry/package-info.java} | 8 +-
.../internal/schema/SchemaRegistryImplTest.java | 528 +++++++++++++++++++++
.../ignite/distributed/ITDistributedTableTest.java | 4 +-
.../schema/marshaller/TupleMarshaller.java | 2 +-
.../ignite/internal/table/AbstractTableView.java | 9 +-
.../ignite/internal/table/KVBinaryViewImpl.java | 11 +-
.../apache/ignite/internal/table/KVViewImpl.java | 9 +-
.../ignite/internal/table/RecordViewImpl.java | 9 +-
.../ignite/internal/table/RowChunkAdapter.java | 16 +
.../apache/ignite/internal/table/TableImpl.java | 21 +-
.../ignite/internal/table/TupleBuilderImpl.java | 12 +
.../ignite/internal/table/TupleMarshallerImpl.java | 13 +-
.../internal/table/distributed/TableManager.java | 84 +++-
.../internal/table/event/TableEventParameters.java | 14 +-
.../table/impl/DummySchemaManagerImpl.java | 4 +-
33 files changed, 1565 insertions(+), 159 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
index 6e5ae3e..0f76eb3 100644
---
a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
+++
b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
@@ -43,7 +43,7 @@ public class TableConfigurationSchema {
/** Count of table partition replicas. */
@Min(1)
@Value(hasDefault = true)
- public int replicas = 0;
+ public int replicas = 1;
/** Columns configuration. */
@NamedConfigValue
diff --git a/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
b/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
index 40fda9d..7b75ebe 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
@@ -17,6 +17,8 @@
package org.apache.ignite.table;
+import java.util.BitSet;
+import java.util.UUID;
import org.apache.ignite.binary.BinaryObject;
/**
@@ -97,4 +99,20 @@ public interface Tuple {
* @return Column value.
*/
String stringValue(String colName);
+
+ /**
+ * Gets {@code UUID} column value.
+ *
+ * @param colName Column name.
+ * @return Column value.
+ */
+ UUID uuidValue(String colName);
+
+ /**
+ * Gets {@code BitSet} column value.
+ *
+ * @param colName Column name.
+ * @return Column value.
+ */
+ BitSet bitmaskValue(String colName);
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
index 93eb39a..a62d21e 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
@@ -204,7 +204,7 @@ public class JsonConverter {
}
else {
throw new IllegalArgumentException(
- "'" + join(path) + "' configuration doesn't have
'" + key + "' subconfiguration"
+ "'" + join(path) + "' configuration doesn't
expects '" + key + "' subconfiguration"
);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
index c245b5b..39d0bbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.manager;
/**
* The event cas whcih is produced by event producer component.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * @see Producer#onEvent(Event, EventParameters, Throwable)
*/
public interface Event {
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
index 140047d..9a76509 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
/**
* Event parameters.
* This type passed to the event listener.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * @see Producer#onEvent(Event, EventParameters, Throwable)
*/
public interface EventParameters {
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index c198bba..f9639af 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -27,7 +27,7 @@ import java.util.function.BiPredicate;
*/
public abstract class Producer<T extends Event, P extends EventParameters> {
/** All listeners. */
- private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P,
Exception>>> listeners = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P,
Throwable>>> listeners = new ConcurrentHashMap<>();
/**
* Registers an event listener.
@@ -37,7 +37,7 @@ public abstract class Producer<T extends Event, P extends
EventParameters> {
* @param evt Event.
* @param closure Closure.
*/
- public void listen(T evt, BiPredicate<P, Exception> closure) {
+ public void listen(T evt, BiPredicate<P, Throwable> closure) {
listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>())
.offer(closure);
}
@@ -49,15 +49,15 @@ public abstract class Producer<T extends Event, P extends
EventParameters> {
* @param params Event parameters.
* @param err Exception when it was happened, or {@code null} otherwise.
*/
- protected void onEvent(T evt, P params, Exception err) {
- ConcurrentLinkedQueue<BiPredicate<P, Exception>> queue =
listeners.get(evt);
+ protected void onEvent(T evt, P params, Throwable err) {
+ ConcurrentLinkedQueue<BiPredicate<P, Throwable>> queue =
listeners.get(evt);
if (queue == null)
return;
- BiPredicate<P, Exception> closure;
+ BiPredicate<P, Throwable> closure;
- Iterator<BiPredicate<P, Exception>> iter = queue.iterator();
+ Iterator<BiPredicate<P, Throwable>> iter = queue.iterator();
while (iter.hasNext()) {
closure = iter.next();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
new file mode 100644
index 0000000..13b8217
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14389")
+class DynamicTableCreationTest {
+ /** Nodes bootstrap configuration. */
+ private final String[] nodesBootstrapCfg =
+ {
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node0,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3344,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}",
+
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node1,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3345,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}",
+
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node2,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3346,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}",
+ };
+
+ /**
+ * Check dynamic table creation.
+ */
+ @Test
+ void testDynamicSimpleTableCreation() {
+ List<Ignite> clusterNodex = new ArrayList<>();
+
+ for (String nodeBootstrapCfg : nodesBootstrapCfg)
+ clusterNodex.add(IgnitionManager.start(nodeBootstrapCfg));
+
+ assertEquals(3, clusterNodex.size());
+
+ // Create table on node 0.
+ clusterNodex.get(0).tables().createTable("tbl1", tbl -> tbl
+ .changeReplicas(1)
+ .changePartitions(10)
+ .changeColumns(cols -> cols
+ .create("key", c -> c.changeType(t -> t.changeType("INT64")))
+ .create("val", c -> c.changeType(t -> t.changeType("INT64")))
+ )
+ .changeIndices(idxs -> idxs
+ .create("PK", idx -> idx
+ .changeType("PRIMARY")
+ .changeColumns(c -> c
+ .create("key", t -> {
+ }))
+ .changeAffinityColumns(new String[] {"key"}))
+ ));
+
+ // Put data on node 1.
+ Table tbl1 = clusterNodex.get(1).tables().table("tbl1");
+ tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val",
111L).build());
+
+ // Get data on node 2.
+ Table tbl2 = clusterNodex.get(2).tables().table("tbl1");
+ assertEquals(111L, tbl2.get(tbl2.tupleBuilder().set("key",
1L).build()));
+ }
+
+ /**
+ * Check dynamic table creation.
+ */
+ @Test
+ void testDynamicTableCreation() {
+ List<Ignite> clusterNodes = new ArrayList<>();
+
+ for (String nodeBootstrapCfg : nodesBootstrapCfg)
+ clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg));
+
+ assertEquals(3, clusterNodes.size());
+
+ // Create table on node 0.
+ clusterNodes.get(0).tables().createTable("tbl1", tbl -> tbl
+ .changeReplicas(1)
+ .changePartitions(10)
+ .changeColumns(cols -> cols
+ .create("key", c -> c.changeType(t -> t.changeType("UUID")))
+ .create("affKey", c -> c.changeType(t ->
t.changeType("INT64")))
+ .create("valStr", c -> c.changeType(t ->
t.changeType("STRING")))
+ .create("valInt", c -> c.changeType(t ->
t.changeType("INT32")))
+ .create("valNullable", c -> c.changeType(t ->
t.changeType("INT8")).changeNullable(true))
+ )
+ .changeIndices(idxs -> idxs
+ .create("PK", idx -> idx
+ .changeType("PRIMARY")
+ .changeColumns(c -> c
+ .create("key", t -> {
+ })
+ .create("affKey", t -> {
+ }))
+ .changeAffinityColumns(new String[] {"affKey"}))
+ ));
+
+ final UUID uuid = UUID.randomUUID();
+
+ // Put data on node 1.
+ Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+ tbl1.insert(tbl1.tupleBuilder().set("key", uuid).set("affKey", 42L)
+ .set("valStr", "String value").set("valInt",
73L).set("valNullable", null).build());
+
+ // Get data on node 2.
+ Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+ final Tuple val = tbl2.get(tbl1.tupleBuilder().set("key",
uuid).set("affKey", 42L).build());
+
+ assertEquals("String value", val.value("valStr"));
+ assertEquals(73L, (Long)val.value("valInt"));
+ assertNull(val.value("valNullable"));
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
new file mode 100644
index 0000000..035caad
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
+class TableCreationTest {
+ /** Nodes bootstrap configuration with preconfigured tables. */
+ private final String[] nodesBootstrapCfg =
+ {
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node0,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3344,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " },\n" +
+ " \"table\": {\n" +
+ " \"tables\": {\n" +
+ " \"tbl1\": {\n" +
+ " \"partitions\":10,\n" +
+ " \"replicas\":2,\n" +
+ " \"columns\": { \n" +
+ " \"key\": {\n" +
+ " \"type\": {" +
+ " \"type\":UUID\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " },\n" +
+ " \"affKey\": {\n" +
+ " \"type\": {" +
+ " \"type\":INT64\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " },\n" +
+ " \"valString\": {\n" +
+ " \"type\": {" +
+ " \"type\":String\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " },\n" +
+ " \"valInt\": {\n" +
+ " \"type\": {" +
+ " \"type\":INT32\n" +
+ " },\n" +
+ " \"nullable\":false\n" +
+ " },\n" +
+ " \"valNullable\": {\n" +
+ " \"type\": {" +
+ " \"type\":String\n" +
+ " },\n" +
+ " \"nullable\":true\n" +
+ " }\n" +
+ " },\n" + /* Columns. */
+ " \"indices\": {\n" +
+ " \"PK\": {\n" +
+ " \"type\":PRIMARY,\n" +
+ " \"columns\": {\n" +
+ " \"key\": {\n" +
+ " \"asc\":true\n" +
+ " },\n" +
+ " \"affKey\": {}\n" +
+ " },\n" + /* Columns. */
+ " \"affinityColumns\":[ \"affKey\" ]\n" +
+ " }\n" +
+ " }\n" + /* Indices. */
+ " },\n" + /* Table. */
+ "\n" +
+ " \"tbl2\": {\n" + // Table minimal configuration.
+ " \"columns\": { \n" +
+ " \"key\": {\n" +
+ " \"type\": {" +
+ " \"type\":INT64\n" +
+ " },\n" +
+ " },\n" +
+ " \"val\": {\n" +
+ " \"type\": {" +
+ " \"type\":INT64\n" +
+ " },\n" +
+ " }\n" +
+ " },\n" + /* Columns. */
+ " \"indices\": {\n" +
+ " \"PK\": {\n" +
+ " \"type\":PRIMARY,\n" +
+ " \"columns\": {\n" +
+ " \"key\": {}\n" +
+ " },\n" + /* Columns. */
+ " }\n" +
+ " }\n" + /* Indices. */
+ " }\n" + /* Table. */
+ " }\n" + /* Tables. */
+ " }\n" + /* Root. */
+ "}",
+
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node1,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3345,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}",
+
+ "{\n" +
+ " \"node\": {\n" +
+ " \"name\":node2,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3346,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\",
\"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}",
+ };
+
+ /**
+ * Check table creation via bootstrap configuration with pre-configured
table.
+ */
+ @Test
+ void testInitialSimpleTableConfiguration() {
+ List<Ignite> clusterNodes = new ArrayList<>();
+
+ for (String nodeBootstrapCfg : nodesBootstrapCfg)
+ clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg));
+
+ assertEquals(3, clusterNodes.size());
+
+ clusterNodes.forEach(Assertions::assertNotNull);
+
+ { /* Table 1.*/
+ Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+ tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val",
111L).build());
+
+ Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+ assertEquals(111L, tbl2.get(tbl2.tupleBuilder().set("key",
1L).build()));
+ }
+
+ { /* Table 2. */
+ final UUID uuid = UUID.randomUUID();
+
+ // Put data on node 1.
+ Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+ tbl1.insert(tbl1.tupleBuilder().set("key", uuid).set("affKey", 42L)
+ .set("valStr", "String value").set("valInt",
73L).set("valNullable", null).build());
+
+ // Get data on node 2.
+ Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+ final Tuple val = tbl2.get(tbl1.tupleBuilder().set("key",
uuid).set("affKey", 42L).build());
+
+ assertEquals("String value", val.value("valStr"));
+ assertEquals(73L, (Long)val.value("valInt"));
+ assertNull(val.value("valNullable"));
+ }
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index d35524b..01db2d3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -108,7 +108,7 @@ public class IgnitionImpl implements Ignition {
locConfigurationMgr.bootstrap(jsonStrBootstrapCfg);
}
catch (Exception e) {
- LOG.warn("Unable to parse user specific configuration, default
configuration will be used", e);
+ LOG.warn("Unable to parse user specific configuration, default
configuration will be used: " + e.getMessage());
}
else if (jsonStrBootstrapCfg != null)
LOG.warn("User specific configuration will be ignored, cause vault
was bootstrapped with pds configuration");
@@ -161,7 +161,7 @@ public class IgnitionImpl implements Ignition {
// Affinity manager startup.
AffinityManager affinityMgr = new AffinityManager(configurationMgr,
metaStorageMgr, baselineMgr, vaultMgr);
- SchemaManager schemaMgr = new SchemaManager(configurationMgr);
+ SchemaManager schemaMgr = new SchemaManager(configurationMgr,
metaStorageMgr, vaultMgr);
// Distributed table manager startup.
IgniteTables distributedTblMgr = new TableManager(
diff --git a/modules/schema/pom.xml b/modules/schema/pom.xml
index 03acb79..e2043f8 100644
--- a/modules/schema/pom.xml
+++ b/modules/schema/pom.xml
@@ -40,12 +40,17 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-bytecode</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-bytecode</artifactId>
+ <artifactId>ignite-metastorage</artifactId>
</dependency>
<!-- 3rd party dependencies -->
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index e0e0dfb..8e1e91d 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -46,6 +46,19 @@ public class SchemaDescriptor {
* @param valCols Value columns.
*/
public SchemaDescriptor(int ver, Column[] keyCols, Column[] valCols) {
+ this(ver, keyCols, null, valCols);
+ }
+
+ /**
+ * @param ver Schema version.
+ * @param keyCols Key columns.
+ * @param affCols Affinity column names.
+ * @param valCols Value columns.
+ */
+ public SchemaDescriptor(int ver, Column[] keyCols, @Nullable String[]
affCols, Column[] valCols) {
+ assert keyCols.length > 0 : "No key columns are conigured.";
+ assert valCols.length > 0 : "No value columns are conigured.";
+
this.ver = ver;
this.keyCols = new Columns(0, keyCols);
this.valCols = new Columns(keyCols.length, valCols);
@@ -54,6 +67,8 @@ public class SchemaDescriptor {
Arrays.stream(this.keyCols.columns()).forEach(c ->
colMap.put(c.name(), c));
Arrays.stream(this.valCols.columns()).forEach(c ->
colMap.put(c.name(), c));
+
+ //TODO: https://issues.apache.org/jira/browse/IGNITE-14388 Add
affinity columns support.
}
/**
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index da099e4..fbb6c4d 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,60 +17,315 @@
package org.apache.ignite.internal.schema;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.event.SchemaEvent;
+import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
/**
* Schema Manager.
+ *
+ * Schemas MUST be registered in a version ascending order incrementing by
{@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from
the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become
outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be
lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first
outdated versions
+ * that could be cleaned up earlier.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
- /** Configuration manager in order to handle and listen schema specific
configuration.*/
+public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters> {
+ /** The logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(SchemaManager.class);
+
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+ /** Schema history item key suffix. */
+ private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+ /** Configuration manager in order to handle and listen schema specific
configuration. */
private final ConfigurationManager configurationMgr;
- /** Schema. */
- private final SchemaDescriptor schema;
+ /** Metastorage manager. */
+ private final MetaStorageManager metaStorageMgr;
+
+ /** Vault manager. */
+ private final VaultManager vaultMgr;
+
+ /** Schema registries. */
+ private final Map<UUID, SchemaRegistryImpl> schemaRegs = new
ConcurrentHashMap<>();
/**
* The constructor.
*
* @param configurationMgr Configuration manager.
+ * @param metaStorageMgr Metastorage manager.
+ * @param vaultMgr Vault manager.
*/
- public SchemaManager(ConfigurationManager configurationMgr) {
+ public SchemaManager(
+ ConfigurationManager configurationMgr,
+ MetaStorageManager metaStorageMgr,
+ VaultManager vaultMgr
+ ) {
this.configurationMgr = configurationMgr;
+ this.metaStorageMgr = metaStorageMgr;
+ this.vaultMgr = vaultMgr;
+
+ metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new
WatchListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent>
events) {
+ for (WatchEvent evt : events) {
+ String keyTail =
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+ int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+ if (verPos == -1) {
+ final UUID tblId = UUID.fromString(keyTail);
+
+ SchemaRegistry reg = schemaRegistryForTable(tblId);
+
+ assert reg != null : "Table schema was not initialized
or table has been dropped: " + tblId;
+
+ if (evt.oldEntry() == null)
+ onEvent(SchemaEvent.INITIALIZED, new
SchemaEventParameters(tblId, reg), null);
+ else if (evt.newEntry() == null) {
+ schemaRegs.remove(tblId);
+
+ onEvent(SchemaEvent.DROPPED, new
SchemaEventParameters(tblId, null), null);
+ }
+
+ return true; // Ignore last table schema version.
+ }
+ else {
+ UUID tblId = UUID.fromString(keyTail.substring(0,
verPos));
+
+ SchemaRegistryImpl reg = schemaRegs.get(tblId);
- this.schema = new SchemaDescriptor(1,
- new Column[] {
- new Column("key", NativeType.LONG, false)
- },
- new Column[] {
- new Column("value", NativeType.LONG, false)
+ if (reg == null)
+ schemaRegs.put(tblId, (reg = new
SchemaRegistryImpl(v -> tableSchema(tblId, v))));
+
+ if (evt.oldEntry() == null)
+
reg.onSchemaRegistered((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+ else if (evt.newEntry() == null)
+
reg.onSchemaDropped(Integer.parseInt(keyTail.substring(verPos +
INTERNAL_VER_SUFFIX.length())));
+ else
+ throw new SchemaRegistryException("Schema of
concrete version can't be changed.");
+ }
+ }
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ LOG.error("Metastorage listener issue", e);
}
+ });
+ }
+
+ /**
+ * Creates schema registry for the table with existed schema or
+ * registers initial schema from configuration.
+ *
+ * @param tblId Table id.
+ * @param tblName Table name.
+ * @return Operation future.
+ */
+ public CompletableFuture<Boolean> initSchemaForTable(final UUID tblId,
String tblName) {
+ return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+ thenCompose(entry -> {
+ TableConfiguration tblConfig =
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+
+ assert entry.empty();
+
+ final int schemaVer = 1;
+
+ final Key lastVerKey = new Key(INTERNAL_PREFIX + tblId);
+ final Key schemaKey = new Key(INTERNAL_PREFIX + tblId +
INTERNAL_VER_SUFFIX + schemaVer);
+
+ final SchemaDescriptor desc =
createSchemaDescriptor(schemaVer, tblConfig);
+
+ return metaStorageMgr.invoke(
+ Conditions.key(lastVerKey).value().eq(entry.value()), //
Won't to rewrite if the version goes ahead.
+ List.of(
+ //TODO: IGNITE-14679 Serialize schema.
+ Operations.put(schemaKey, ByteUtils.toBytes(desc)),
+ Operations.put(lastVerKey,
ByteUtils.longToBytes(schemaVer))
+ ),
+ List.of(Operations.noop()));
+ });
+ }
+
+ /**
+ * Return table schema of certain version from history.
+ *
+ * @param tblId Table id.
+ * @param schemaVer Schema version.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor tableSchema(UUID tblId, int schemaVer) {
+ try {
+ return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId +
INTERNAL_VER_SUFFIX + schemaVer))
+ .thenApply(e -> e.empty() ? null :
(SchemaDescriptor)ByteUtils.fromBytes(e.value())).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new SchemaRegistryException("Can't read schema from vault:
ver=" + schemaVer, e);
+ }
+ }
+
+ /**
+ * Creates schema descriptor from configuration.
+ *
+ * @param ver Schema version.
+ * @param tblConfig Table config.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor createSchemaDescriptor(int ver,
TableConfiguration tblConfig) {
+ final TableIndexConfiguration pkCfg =
tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+ assert pkCfg != null;
+
+ final Set<String> keyColNames =
Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+ final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+ final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+ final ArrayList<Column> valCols = new ArrayList<>(cols.size() -
keyColNames.size());
+
+ cols.namedListKeys().stream()
+ .map(cols::get)
+ //TODO: IGNITE-14290 replace with helper class call.
+ .map(col -> new Column(col.name(), createType(col.type()),
col.nullable()))
+ .forEach(c -> (keyColNames.contains(c.name()) ? keyCols :
valCols).add(c));
+
+ return new SchemaDescriptor(
+ ver,
+ keyCols.toArray(Column[]::new),
+ pkCfg.affinityColumns().value(),
+ valCols.toArray(Column[]::new)
);
}
/**
- * Gets a current schema for the table specified.
+ * Create type from config.
*
- * @param tableId Table id.
- * @return Schema.
+ * TODO: IGNITE-14290 replace with helper class call.
+ *
+ * @param type Type view.
+ * @return Native type.
*/
- public SchemaDescriptor schema(UUID tableId) {
- return schema;
+ private NativeType createType(ColumnTypeView type) {
+ switch (type.type().toLowerCase()) {
+ case "byte":
+ return NativeType.BYTE;
+ case "short":
+ return NativeType.SHORT;
+ case "int":
+ return NativeType.INTEGER;
+ case "long":
+ return NativeType.LONG;
+ case "float":
+ return NativeType.FLOAT;
+ case "double":
+ return NativeType.DOUBLE;
+ case "uuid":
+ return NativeType.UUID;
+ case "bitmask":
+ return Bitmask.of(type.length());
+ case "string":
+ return NativeType.STRING;
+ case "bytes":
+ return NativeType.BYTES;
+
+ default:
+ throw new IllegalStateException("Unsupported column type: " +
type.type());
+ }
}
/**
- * Gets a schema for specific version.
+ * Compares schemas.
*
+ * @param expected Expected schema.
+ * @param actual Actual schema.
+ * @return {@code True} if schemas are equal, {@code false} otherwise.
+ */
+ public static boolean equalSchemas(SchemaDescriptor expected,
SchemaDescriptor actual) {
+ if (expected.keyColumns().length() != actual.keyColumns().length() ||
+ expected.valueColumns().length() != actual.valueColumns().length())
+ return false;
+
+ for (int i = 0; i < expected.length(); i++) {
+ if (!expected.column(i).equals(actual.column(i)))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* @param tableId Table id.
- * @param ver Schema version.
- * @return Schema.
+ * @return Schema registry for the table.
*/
- public SchemaDescriptor schema(UUID tableId, long ver) {
- assert ver >= 0;
+ private SchemaRegistry schemaRegistryForTable(UUID tableId) {
+ final SchemaRegistry reg = schemaRegs.get(tableId);
+
+ if (reg == null)
+ throw new SchemaRegistryException("No schema was ever registered
for the table: " + tableId);
+
+ return reg;
+ }
+
+ /**
+ * Unregistered all schemas associated with a table identifier.
+ *
+ * @param tableId Table identifier.
+ * @return Future which will complete when all versions of schema will be
unregistered.
+ */
+ public CompletableFuture<Boolean> unregisterSchemas(UUID tableId) {
+ CompletableFuture<Void> fut = metaStorageMgr.remove(new
Key(INTERNAL_PREFIX + tableId));
+
+ String schemaPrefix = INTERNAL_PREFIX + tableId + INTERNAL_VER_SUFFIX;
- assert schema.version() == ver;
+ List<Key> keys = new ArrayList<>();
+ try (Cursor<Entry> cursor = metaStorageMgr.range(new
Key(schemaPrefix), null)) {
+ cursor.forEach(entry -> keys.add(entry.key()));
+ }
+ catch (Exception e) {
+ LOG.error("Can't remove schemas for the table [tblId=" + tableId +
']');
+ }
- return schema;
+ return fut.thenCompose(r ->
metaStorageMgr.removeAll(keys)).thenApply(v -> true);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
similarity index 85%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index d63d71c..8da7278 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
-
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+package org.apache.ignite.internal.schema;
/**
- * Table schema manager interface.
+ * Table schema registry interface.
*/
-public interface TableSchemaView {
+public interface SchemaRegistry {
/**
* @return Current schema.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
similarity index 71%
copy from
modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
index 140047d..d81e0d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.schema.event;
+
+import org.apache.ignite.internal.manager.Event;
/**
- * Event parameters.
- * This type passed to the event listener.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * Schema management events.
*/
-public interface EventParameters {
+public enum SchemaEvent implements Event {
+ /** This event is fired when a schema was initialized. */
+ INITIALIZED,
+
+ /** This event is fired when a schema was dropped. */
+ DROPPED
}
+
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
similarity index 55%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
rename to
modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
index dd74868..2982afc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
@@ -15,38 +15,46 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.event;
import java.util.UUID;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.internal.schema.SchemaRegistry;
/**
- * Schema view implementation.
+ * Schema event parameters. There are properties which associate with a
concrete schema.
*/
-public class TableSchemaViewImpl implements TableSchemaView {
+public class SchemaEventParameters implements EventParameters {
/** Table identifier. */
private final UUID tableId;
- /** Schema manager. */
- private final SchemaManager schemaManager;
+ /** Schema registry. */
+ private final SchemaRegistry reg;
/**
* @param tableId Table identifier.
- * @param schemaManager Schema manager.
+ * @param reg Schema registry for the table.
*/
- public TableSchemaViewImpl(UUID tableId, SchemaManager schemaManager) {
+ public SchemaEventParameters(UUID tableId, SchemaRegistry reg) {
this.tableId = tableId;
- this.schemaManager = schemaManager;
+ this.reg = reg;
}
- /** {@inheritDoc} */
- @Override public SchemaDescriptor schema() {
- return schemaManager.schema(tableId);
+ /**
+ * Get the table identifier.
+ *
+ * @return Table id.
+ */
+ public UUID tableId() {
+ return tableId;
}
- /** {@inheritDoc} */
- @Override public SchemaDescriptor schema(int ver) {
- return schemaManager.schema(tableId, ver);
+ /**
+ * Get schema registry for the table.
+ *
+ * @return Schema registry.
+ */
+ public SchemaRegistry schemaRegistry() {
+ return reg;
}
-}
+}
\ No newline at end of file
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
similarity index 65%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
index d63d71c..d9298ed 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.registry;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * Table schema manager interface.
+ * Schema registration conflict exception is thown if
+ * registering schema's number was alredy registered earlier.
*/
-public interface TableSchemaView {
+public class SchemaRegistrationConflictException extends
IgniteInternalException {
/**
- * @return Current schema.
+ * Constructor.
+ *
+ * @param msg Message.
*/
- SchemaDescriptor schema();
-
- /**
- * @param ver Schema version.
- * @return Schema of given version.
- */
- SchemaDescriptor schema(int ver);
+ public SchemaRegistrationConflictException(String msg) {
+ super(msg);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
similarity index 61%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
rename to
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
index d63d71c..efd657a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
@@ -15,22 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.registry;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.lang.IgniteInternalException;
/**
- * Table schema manager interface.
+ * Schema registration exception.
*/
-public interface TableSchemaView {
+public class SchemaRegistryException extends IgniteInternalException {
/**
- * @return Current schema.
+ * Constructor with error message.
+ *
+ * @param msg Message.
*/
- SchemaDescriptor schema();
+ public SchemaRegistryException(String msg) {
+ super(msg);
+ }
/**
- * @param ver Schema version.
- * @return Schema of given version.
+ * Constructor with error message and cause.
+ *
+ * @param cause Cause.
*/
- SchemaDescriptor schema(int ver);
+ public SchemaRegistryException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
new file mode 100644
index 0000000..41f2340
--- /dev/null
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.registry;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Caching registry of actual schema descriptors for a table.
+ */
+public class SchemaRegistryImpl implements SchemaRegistry {
+ /** Initial schema version. */
+ public static final int INITIAL_SCHEMA_VERSION = -1;
+
+ /** Cached schemas. */
+ private final ConcurrentSkipListMap<Integer, SchemaDescriptor> schemaCache
= new ConcurrentSkipListMap<>();
+
+ /** Last registered version. */
+ private volatile int lastVer;
+
+ /** Schema store. */
+ private final Function<Integer, SchemaDescriptor> history;
+
+ /**
+ * Default constructor.
+ *
+ * @param history Schema history.
+ */
+ public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history) {
+ lastVer = INITIAL_SCHEMA_VERSION;
+ this.history = history;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param history Schema history.
+ */
+ public SchemaRegistryImpl(int initialVer, Function<Integer,
SchemaDescriptor> history) {
+ lastVer = initialVer;
+ this.history = history;
+ }
+
+ /**
+ * Gets schema descriptor for given version.
+ *
+ * @param ver Schema version to get descriptor for.
+ * @return Schema descriptor.
+ * @throws SchemaRegistryException If no schema found for given version.
+ */
+ @Override public SchemaDescriptor schema(int ver) {
+ SchemaDescriptor desc = schemaCache.get(ver);
+
+ if (desc != null)
+ return desc;
+
+ desc = history.apply(ver);
+
+ if (desc != null) {
+ schemaCache.putIfAbsent(ver, desc);
+
+ return desc;
+ }
+
+ if (lastVer < ver || ver <= 0)
+ throw new SchemaRegistryException("Incorrect schema version
requested: ver=" + ver);
+ else
+ throw new SchemaRegistryException("Failed to find schema: ver=" +
ver);
+ }
+
+ /**
+ * Gets schema descriptor for the latest version if initialized.
+ *
+ * @return Schema descriptor if initialized, {@code null} otherwise.
+ * @throws SchemaRegistryException If failed.
+ */
+ @Override public @Nullable SchemaDescriptor schema() {
+ final int lastVer0 = lastVer;
+
+ if (lastVer0 == INITIAL_SCHEMA_VERSION)
+ return null;
+
+ return schema(lastVer0);
+ }
+
+ /**
+ * @return Last known schema version.
+ */
+ public int lastSchemaVersion() {
+ return lastVer;
+ }
+
+ /**
+ * Registers new schema.
+ *
+ * @param desc Schema descriptor.
+ * @throws SchemaRegistrationConflictException If schema of provided
version was already registered.
+ * @throws SchemaRegistryException If schema of incorrect version provided.
+ */
+ public void onSchemaRegistered(SchemaDescriptor desc) {
+ if (lastVer == INITIAL_SCHEMA_VERSION) {
+ if (desc.version() != 1)
+ throw new SchemaRegistryException("Try to register schema of
wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
+ }
+ else if (desc.version() != lastVer + 1) {
+ if (desc.version() > 0 && desc.version() <= lastVer)
+ throw new SchemaRegistrationConflictException("Schema with
given version has been already registered: " + desc.version());
+
+ throw new SchemaRegistryException("Try to register schema of wrong
version: ver=" + desc.version() + ", lastVer=" + lastVer);
+ }
+
+ schemaCache.put(desc.version(), desc);
+
+ lastVer = desc.version();
+ }
+
+ /**
+ * Cleanup given schema version from history.
+ *
+ * @param ver Schema version to remove.
+ * @throws SchemaRegistryException If incorrect schema version provided.
+ */
+ public void onSchemaDropped(int ver) {
+ if (ver >= lastVer || ver <= 0 || schemaCache.keySet().first() < ver)
+ throw new SchemaRegistryException("Incorrect schema version to
clean up to: " + ver);
+
+ schemaCache.remove(ver);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
similarity index 80%
copy from
modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
index c245b5b..b6a1bfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
@@ -15,11 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.manager;
-
/**
- * The event cas whcih is produced by event producer component.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * Schema registry responsible for maintaining versioned schema history.
*/
-public interface Event {
-}
+package org.apache.ignite.internal.schema.registry;
\ No newline at end of file
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
new file mode 100644
index 0000000..fb20183
--- /dev/null
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import
org.apache.ignite.internal.schema.registry.SchemaRegistrationConflictException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.ignite.internal.schema.NativeType.BYTES;
+import static org.apache.ignite.internal.schema.NativeType.LONG;
+import static org.apache.ignite.internal.schema.NativeType.STRING;
+import static
org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Schema manager test.
+ */
+public class SchemaRegistryImplTest {
+ /**
+ * Check registration of schema with wrong versions.
+ */
+ @Test
+ public void testWrongSchemaVersionRegistration() {
+ final SchemaDescriptor schemaV0 = new
SchemaDescriptor(INITIAL_SCHEMA_VERSION,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(0,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+ assertNull(reg.schema());
+
+ // Try to register schema with initial version.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV0));
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+ assertNull(reg.schema());
+ assertThrows(SchemaRegistryException.class, () ->
reg.schema(INITIAL_SCHEMA_VERSION));
+
+ // Try to register schema with version of 0-zero.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV1));
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+ assertThrows(SchemaRegistryException.class, () ->
reg.schema(INITIAL_SCHEMA_VERSION));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+
+ // Try to register schema with version of 2.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV2));
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+ assertThrows(SchemaRegistryException.class, () ->
reg.schema(INITIAL_SCHEMA_VERSION));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+ }
+
+ /**
+ * Check initial schema registration.
+ */
+ @Test
+ public void testSchemaRegistration() {
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+ assertNull(reg.schema());
+
+ // Register schema with very first version.
+ reg.onSchemaRegistered(schemaV1);
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertSameSchema(schemaV1, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+
+ // Register schema with next version.
+ reg.onSchemaRegistered(schemaV2);
+
+ assertEquals(2, reg.lastSchemaVersion());
+ assertSameSchema(schemaV2, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+
+ // Try to register schema with version of 4.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV4));
+
+ assertEquals(2, reg.lastSchemaVersion());
+ assertSameSchema(schemaV2, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(4));
+ }
+
+ /**
+ * Check schema registration.
+ */
+ @Test
+ public void testDuplucateSchemaRegistration() {
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor wrongSchema = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+ // Register schema with very first version.
+ reg.onSchemaRegistered(schemaV1);
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertSameSchema(schemaV1, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+
+ // Try to register same schema once again.
+ assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV1));
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertSameSchema(schemaV1, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+ // Try to register another schema with same version and check nothing
was registered.
+ assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(wrongSchema));
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertEquals(1, reg.schema().version());
+
+ assertSameSchema(schemaV1, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+ // Register schema with next version.
+ reg.onSchemaRegistered(schemaV2);
+
+ assertEquals(2, reg.lastSchemaVersion());
+
+ assertSameSchema(schemaV2, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ }
+
+ /**
+ * Check schema cleanup.
+ */
+ @Test
+ public void testSchemaCleanup() {
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+ assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+ // Fail to cleanup initial schema
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(INITIAL_SCHEMA_VERSION));
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(0));
+
+ // Register schema with very first version.
+ reg.onSchemaRegistered(schemaV1);
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertNotNull(reg.schema());
+ assertNotNull(reg.schema(1));
+
+ // Try to remove latest schema.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(1));
+
+ assertEquals(1, reg.lastSchemaVersion());
+ assertNotNull(reg.schema());
+ assertNotNull(reg.schema(1));
+
+ // Register new schema with next version.
+ reg.onSchemaRegistered(schemaV2);
+ reg.onSchemaRegistered(schemaV3);
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertNotNull(reg.schema(1));
+ assertNotNull(reg.schema(2));
+ assertNotNull(reg.schema(3));
+
+ // Remove outdated schema 1.
+ reg.onSchemaDropped(1);
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertNotNull(reg.schema(2));
+ assertNotNull(reg.schema(3));
+
+ // Remove non-existed schemas.
+ reg.onSchemaDropped(1);
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertNotNull(reg.schema(2));
+ assertNotNull(reg.schema(3));
+
+ // Register new schema with next version.
+ reg.onSchemaRegistered(schemaV4);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertNotNull(reg.schema(2));
+ assertNotNull(reg.schema(3));
+ assertNotNull(reg.schema(4));
+
+ // Remove non-existed schemas.
+ reg.onSchemaDropped(1);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertSameSchema(schemaV4, reg.schema());
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+ assertSameSchema(schemaV4, reg.schema(4));
+
+ // Out of order remove.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(3));
+
+ // Correct removal order.
+ reg.onSchemaDropped(2);
+ reg.onSchemaDropped(3);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+ assertSameSchema(schemaV4, reg.schema());
+ assertSameSchema(schemaV4, reg.schema(4));
+
+ // Try to remove latest schema.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(4));
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertSameSchema(schemaV4, reg.schema(4));
+ }
+
+ /**
+ * Check schema registration with full history.
+ */
+ @Test
+ public void testInitialSchemaWithFullHistory() {
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1,
schemaV2);
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get);
+
+ assertEquals(2, reg.lastSchemaVersion());
+ assertSameSchema(schemaV2, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+
+ // Register schema with duplicate version.
+ assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV1));
+
+ assertEquals(2, reg.lastSchemaVersion());
+ assertSameSchema(schemaV2, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+ // Register schema with out-of-order version.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV4));
+
+ assertEquals(2, reg.lastSchemaVersion());
+ assertSameSchema(schemaV2, reg.schema());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+ // Register schema with next version.
+ reg.onSchemaRegistered(schemaV3);
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertSameSchema(schemaV3, reg.schema());
+ assertSameSchema(schemaV1, reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+ }
+
+ /**
+ * Check schema registration with history tail.
+ */
+ @Test
+ public void testInitialSchemaWithTailHistory() {
+ final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valBytesCol", BYTES, true)});
+
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valStringCol", STRING, true)});
+
+ final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV5 = new SchemaDescriptor(5,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {new Column("valStringCol", STRING, true)});
+
+ Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2,
schemaV3);
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get);
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertSameSchema(schemaV3, reg.schema());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+
+ // Register schema with duplicate version.
+ assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV2));
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertSameSchema(schemaV3, reg.schema());
+
+ // Register schema with out-of-order version.
+ assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV5));
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertSameSchema(schemaV3, reg.schema());
+
+ // Register schema with outdated version.
+ assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV1));
+
+ assertEquals(3, reg.lastSchemaVersion());
+ assertSameSchema(schemaV3, reg.schema());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+
+ // Register schema with next version.
+ reg.onSchemaRegistered(schemaV4);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertSameSchema(schemaV4, reg.schema());
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+ assertSameSchema(schemaV4, reg.schema(4));
+ }
+
+ /**
+ * Check schema cleanup.
+ */
+ @Test
+ public void testSchemaWithHistoryCleanup() {
+ final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valStringCol", STRING, true)
+ });
+
+ final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+ new Column[] {new Column("keyLongCol", LONG, true)},
+ new Column[] {
+ new Column("valBytesCol", BYTES, true),
+ new Column("valStringCol", STRING, true)
+ });
+
+ Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2,
schemaV3, schemaV4);
+
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertSameSchema(schemaV4, reg.schema());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+ assertSameSchema(schemaV2, reg.schema(2));
+ assertSameSchema(schemaV3, reg.schema(3));
+ assertSameSchema(schemaV4, reg.schema(4));
+
+ history.remove(1);
+ reg.onSchemaDropped(1);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertNotNull(reg.schema(2));
+ assertNotNull(reg.schema(3));
+ assertNotNull(reg.schema(4));
+
+ history.remove(2);
+ history.remove(3);
+ reg.onSchemaDropped(2);
+ reg.onSchemaDropped(3);
+
+ assertEquals(4, reg.lastSchemaVersion());
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+ assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+ assertNotNull(reg.schema(4));
+ }
+
+ /**
+ * @param history Table schema history.
+ * @return Schema history map.
+ */
+ private Map<Integer, SchemaDescriptor> schemaHistory(SchemaDescriptor...
history) {
+ return
Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, e ->
e));
+ }
+
+ /**
+ * Validate schemas are equals.
+ *
+ * @param schemaDesc1 Schema descriptor to compare with.
+ * @param schemaDesc2 Schema descriptor to compare.
+ */
+ private void assertSameSchema(SchemaDescriptor schemaDesc1,
SchemaDescriptor schemaDesc2) {
+ assertEquals(schemaDesc1.version(), schemaDesc2.version(),
"Descriptors of different versions.");
+
+ assertTrue(SchemaManager.equalSchemas(schemaDesc1, schemaDesc2),
"Schemas are not equals.");
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index f2e298c..7881e1b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.RowAssembler;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
import
org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
@@ -232,7 +232,7 @@ public class ITDistributedTableTest {
UUID.randomUUID(),
partMap,
PARTS
- ), new TableSchemaView() {
+ ), new SchemaRegistry() {
@Override public SchemaDescriptor schema() {
return SCHEMA;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
index 8eff2ca..e9728b4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
@@ -23,7 +23,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Marshaller interface.
+ * Tuple marshaller interface.
*
* @apiNote For key tuple marshalling, a {@code marshal(key, null)} method
call must be used.
* A {@code marshal(key} may return the same result, but it validates value
columns even if not specified.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index cceb908..cf23e08 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.lang.IgniteInternalException;
/**
@@ -29,16 +30,16 @@ abstract class AbstractTableView {
protected final InternalTable tbl;
/** Schema manager. */
- protected final TableSchemaView schemaMgr;
+ protected final SchemaRegistry schemaReg;
/**
* Constructor
* @param tbl Internal table.
- * @param schemaMgr Schema manager.
+ * @param schemaReg Schema registry.
*/
- protected AbstractTableView(InternalTable tbl, TableSchemaView schemaMgr) {
+ protected AbstractTableView(InternalTable tbl, SchemaRegistry schemaReg) {
this.tbl = tbl;
- this.schemaMgr = schemaMgr;
+ this.schemaReg = schemaReg;
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index c113dc7..474f049 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Tuple;
@@ -47,12 +48,12 @@ public class KVBinaryViewImpl extends AbstractTableView
implements KeyValueBinar
* Constructor.
*
* @param tbl Table storage.
- * @param schemaMgr Schema manager.
+ * @param schemaReg Schema registry.
*/
- public KVBinaryViewImpl(InternalTable tbl, TableSchemaView schemaMgr) {
- super(tbl, schemaMgr);
+ public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
+ super(tbl, schemaReg);
- marsh = new TupleMarshallerImpl(schemaMgr);
+ marsh = new TupleMarshallerImpl(schemaReg);
}
/** {@inheritDoc} */
@@ -285,7 +286,7 @@ public class KVBinaryViewImpl extends AbstractTableView
implements KeyValueBinar
if (row == null)
return null;
- final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+ final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
return new TableRow(schema, new Row(schema, row));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index dfb6538..6d3d10b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.KVSerializer;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.KeyMapper;
@@ -40,13 +41,13 @@ public class KVViewImpl<K, V> extends AbstractTableView
implements KeyValueView<
* Constructor.
*
* @param tbl Table storage.
- * @param schemaMgr Schema manager.
+ * @param schemaReg Schema registry.
* @param keyMapper Key class mapper.
* @param valueMapper Value class mapper.
*/
- public KVViewImpl(InternalTable tbl, TableSchemaView schemaMgr,
KeyMapper<K> keyMapper,
+ public KVViewImpl(InternalTable tbl, SchemaRegistry schemaReg,
KeyMapper<K> keyMapper,
ValueMapper<V> valueMapper) {
- super(tbl, schemaMgr);
+ super(tbl, schemaReg);
}
/** {@inheritDoc} */
@@ -238,7 +239,7 @@ public class KVViewImpl<K, V> extends AbstractTableView
implements KeyValueView<
if (row == null)
return null;
- final SchemaDescriptor rowSchema =
schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+ final SchemaDescriptor rowSchema =
schemaReg.schema(row.schemaVersion()); // Get a schema for row.
return new Row(rowSchema, row);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 0d9ac93..f442bbb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.RecordSerializer;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.mapper.RecordMapper;
@@ -39,11 +40,11 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
* Constructor.
*
* @param tbl Table.
- * @param schemaMgr Schema manager.
+ * @param schemaReg Schema registry.
* @param mapper Record class mapper.
*/
- public RecordViewImpl(InternalTable tbl, TableSchemaView schemaMgr,
RecordMapper<R> mapper) {
- super(tbl, schemaMgr);
+ public RecordViewImpl(InternalTable tbl, SchemaRegistry schemaReg,
RecordMapper<R> mapper) {
+ super(tbl, schemaReg);
}
/** {@inheritDoc} */
@@ -264,7 +265,7 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
if (row == null)
return null;
- final SchemaDescriptor rowSchema =
schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+ final SchemaDescriptor rowSchema =
schemaReg.schema(row.schemaVersion()); // Get a schema for row.
return new Row(rowSchema, row);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
index ecc312f..9b0483a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import java.util.BitSet;
+import java.util.UUID;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.schema.Column;
@@ -101,4 +103,18 @@ public abstract class RowChunkAdapter implements Tuple {
return row().stringValue(col.schemaIndex());
}
+
+ /** {@inheritDoc} */
+ @Override public UUID uuidValue(String colName) {
+ Column col = columnByName(colName);
+
+ return row().uuidValue(col.schemaIndex());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet bitmaskValue(String colName) {
+ Column col = columnByName(colName);
+
+ return row().bitmaskValue(col.schemaIndex());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3fd11de..284be44 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.KeyValueView;
@@ -49,12 +50,12 @@ public class TableImpl extends AbstractTableView implements
Table {
* Constructor.
*
* @param tbl Table.
- * @param schemaMgr Table schema manager.
+ * @param schemaReg Table schema registry.
*/
- public TableImpl(InternalTable tbl, TableSchemaView schemaMgr) {
- super(tbl, schemaMgr);
+ public TableImpl(InternalTable tbl, SchemaRegistry schemaReg) {
+ super(tbl, schemaReg);
- marsh = new TupleMarshallerImpl(schemaMgr);
+ marsh = new TupleMarshallerImpl(schemaReg);
}
/**
@@ -71,23 +72,23 @@ public class TableImpl extends AbstractTableView implements
Table {
*
* @return Schema view.
*/
- public TableSchemaView schemaView() {
- return schemaMgr;
+ public SchemaRegistry schemaView() {
+ return schemaReg;
}
/** {@inheritDoc} */
@Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
- return new RecordViewImpl<>(tbl, schemaMgr, recMapper);
+ return new RecordViewImpl<>(tbl, schemaReg, recMapper);
}
/** {@inheritDoc} */
@Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper,
ValueMapper<V> valMapper) {
- return new KVViewImpl<>(tbl, schemaMgr, keyMapper, valMapper);
+ return new KVViewImpl<>(tbl, schemaReg, keyMapper, valMapper);
}
/** {@inheritDoc} */
@Override public KeyValueBinaryView kvView() {
- return new KVBinaryViewImpl(tbl, schemaMgr);
+ return new KVBinaryViewImpl(tbl, schemaReg);
}
/** {@inheritDoc} */
@@ -327,7 +328,7 @@ public class TableImpl extends AbstractTableView implements
Table {
if (row == null)
return null;
- final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+ final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
return new TableRow(schema, new Row(schema, row));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
index 4e0e850..d457077 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.table;
+import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.table.Tuple;
@@ -95,4 +97,14 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple
{
@Override public String stringValue(String colName) {
return value(colName);
}
+
+ /** {@inheritDoc} */
+ @Override public UUID uuidValue(String colName) {
+ return value(colName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet bitmaskValue(String colName) {
+ return value(colName);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index 05c3ed7..f418f3c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -24,25 +24,26 @@ import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.RowAssembler;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.NotNull;
import static
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.getValueSize;
/**
- * Marshaller implementation.
+ * Tuple marshaller implementation.
*/
public class TupleMarshallerImpl implements TupleMarshaller {
/** Schema manager. */
- private final TableSchemaView schemaMgr;
+ private final SchemaRegistry schemaReg;
/**
* Constructor.
*
- * @param schemaMgr Schema manager.
+ * @param schemaReg Schema manager.
*/
- public TupleMarshallerImpl(TableSchemaView schemaMgr) {
- this.schemaMgr = schemaMgr;
+ public TupleMarshallerImpl(SchemaRegistry schemaReg) {
+ this.schemaReg = schemaReg;
}
/** {@inheritDoc} */
@@ -52,7 +53,7 @@ public class TupleMarshallerImpl implements TupleMarshaller {
/** {@inheritDoc} */
@Override public Row marshal(Tuple keyTuple, Tuple valTuple) {
- final SchemaDescriptor schema = schemaMgr.schema();
+ final SchemaDescriptor schema = schemaReg.schema();
assert keyTuple instanceof TupleBuilderImpl;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a5c1cc0..f2a24e8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -36,12 +36,15 @@ import
org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.event.SchemaEvent;
+import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaViewImpl;
import
org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
@@ -83,7 +86,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private final AffinityManager affMgr;
/** Tables. */
- private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
+ private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
/**
* Creates a new table manager.
@@ -118,8 +121,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @param name Table name.
* @param tblId Table id.
* @param assignment Affinity assignment.
+ * @param schemaReg Schema registry for the table.
*/
- private void createTableLocally(String name, UUID tblId,
List<List<ClusterNode>> assignment) {
+ private void createTableLocally(
+ String name,
+ UUID tblId,
+ List<List<ClusterNode>> assignment,
+ SchemaRegistry schemaReg
+ ) {
int partitions = assignment.size();
HashMap<Integer, RaftGroupService> partitionMap = new
HashMap<>(partitions);
@@ -135,7 +144,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
onEvent(TableEvent.CREATE, new TableEventParameters(
tblId,
name,
- new TableSchemaViewImpl(tblId, schemaMgr),
+ schemaReg,
new InternalTableImpl(tblId, partitionMap, partitions)
), null);
}
@@ -199,22 +208,20 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
UUID tblId = new UUID(revision, 0L);
if (hasMetastorageLocally) {
- var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
+ var key = new Key(INTERNAL_PREFIX + tblId);
futs.add(metaStorageMgr.invoke(
Conditions.key(key).value().eq(null),
Operations.put(key,
tableView.name().getBytes(StandardCharsets.UTF_8)),
- Operations.noop()).thenCompose(res ->
- affMgr.calculateAssignments(tblId)));
+ Operations.noop())
+ .thenCompose(res ->
schemaMgr.initSchemaForTable(tblId, tableView.name()))
+ .thenCompose(res ->
affMgr.calculateAssignments(tblId)));
}
- affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
- if (!tblId.equals(parameters.tableId()))
- return false;
+ final CompletableFuture<AffinityEventParameters>
affinityReadyFut = new CompletableFuture<>();
+ final CompletableFuture<SchemaEventParameters> schemaReadyFut
= new CompletableFuture<>();
- if (e == null)
- createTableLocally(tblName, tblId,
parameters.assignment());
- else {
+ CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
+ .exceptionally(e -> {
LOG.error("Failed to create a new table [name=" +
tblName + ", id=" + tblId + ']', e);
onEvent(TableEvent.CREATE, new TableEventParameters(
@@ -223,7 +230,36 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
null,
null
), e);
- }
+
+ return null;
+ })
+ .thenRun(() -> createTableLocally(
+ tblName,
+ tblId,
+ affinityReadyFut.join().assignment(),
+ schemaReadyFut.join().schemaRegistry()
+ ));
+
+ affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
+
+ if (e == null)
+ affinityReadyFut.complete(parameters);
+ else
+ affinityReadyFut.completeExceptionally(e);
+
+ return true;
+ });
+
+ schemaMgr.listen(SchemaEvent.INITIALIZED, (parameters, e) -> {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
+
+ if (e == null)
+ schemaReadyFut.complete(parameters);
+ else
+ schemaReadyFut.completeExceptionally(e);
return true;
});
@@ -240,12 +276,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
UUID tblId = t.internalTable().tableId();
if (hasMetastorageLocally) {
- var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
- futs.add(affMgr.removeAssignment(tblId).thenCompose(res ->
-
metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
- Operations.remove(key),
- Operations.noop())));
+ var key = new Key(INTERNAL_PREFIX + tblId);
+
+ futs.add(affMgr.removeAssignment(tblId)
+ .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
+ .thenCompose(res ->
+
metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
+ Operations.remove(key),
+ Operations.noop())));
}
affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
@@ -304,8 +342,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
@Override public void dropTable(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
- listen(TableEvent.DROP, new BiPredicate<TableEventParameters,
Exception>() {
- @Override public boolean test(TableEventParameters params,
Exception e) {
+ listen(TableEvent.DROP, new BiPredicate<>() {
+ @Override public boolean test(TableEventParameters params,
Throwable e) {
String tableName = params.tableName();
if (!name.equals(tableName))
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
index 853617d..4e0a750 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.table.event;
import java.util.UUID;
import org.apache.ignite.internal.manager.EventParameters;
import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
/**
* Table event parameters.
@@ -34,7 +34,7 @@ public class TableEventParameters implements EventParameters {
private final String tableName;
/** Table schema view. */
- private final TableSchemaView tableSchemaView;
+ private final SchemaRegistry schemaRegistry;
/** Internal table. */
private final InternalTable internalTable;
@@ -42,18 +42,18 @@ public class TableEventParameters implements
EventParameters {
/**
* @param tableId Table identifier.
* @param tableName Table name.
- * @param tableSchemaView Table schema view.
+ * @param schemaRegistry Table schema view.
* @param internalTable Internal table.
*/
public TableEventParameters(
UUID tableId,
String tableName,
- TableSchemaView tableSchemaView,
+ SchemaRegistry schemaRegistry,
InternalTable internalTable
) {
this.tableId = tableId;
this.tableName = tableName;
- this.tableSchemaView = tableSchemaView;
+ this.schemaRegistry = schemaRegistry;
this.internalTable = internalTable;
}
@@ -80,8 +80,8 @@ public class TableEventParameters implements EventParameters {
*
* @return Schema descriptor.
*/
- public TableSchemaView tableSchemaView() {
- return tableSchemaView;
+ public SchemaRegistry tableSchemaView() {
+ return schemaRegistry;
}
/**
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 3b015a3..9ecaec4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -18,13 +18,13 @@
package org.apache.ignite.internal.table.impl;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.jetbrains.annotations.NotNull;
/**
* Dummy schema manager for tests.
*/
-public class DummySchemaManagerImpl implements TableSchemaView {
+public class DummySchemaManagerImpl implements SchemaRegistry {
/** Schema. */
private final SchemaDescriptor schema;