This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e2c8a55b60 IGNITE-18732 Fix race in VersionedValues in SchemaManager
(#1665)
e2c8a55b60 is described below
commit e2c8a55b60e3c0a21f702911e31b281f317480d0
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Feb 16 12:19:26 2023 +0200
IGNITE-18732 Fix race in VersionedValues in SchemaManager (#1665)
---
.../ignite/internal/schema/SchemaManager.java | 226 +++++++++------------
.../schema/registry/SchemaRegistryImpl.java | 15 +-
.../schema/registry/SchemaRegistryImplTest.java | 17 +-
.../schema/registry/UpgradingRowAdapterTest.java | 18 +-
.../internal/sql/engine/StopCalciteModuleTest.java | 2 +-
.../sql/engine/exec/MockedStructuresTest.java | 8 +-
.../TupleMarshallerFixlenOnlyBenchmark.java | 3 +-
.../TupleMarshallerVarlenOnlyBenchmark.java | 3 +-
.../table/distributed/TableManagerTest.java | 3 +
9 files changed, 139 insertions(+), 156 deletions(-)
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 893cb9d305..d565ee5c70 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.schema;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -30,20 +33,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.NamedListView;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.causality.VersionedValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.schema.configuration.ColumnView;
import
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
@@ -65,11 +66,13 @@ import org.jetbrains.annotations.Nullable;
* The class services a management of table schemas.
*/
public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters> implements IgniteComponent {
+ private static final IgniteLogger LOGGER =
Loggers.forClass(SchemaManager.class);
+
/** Initial version for schemas. */
public static final int INITIAL_SCHEMA_VERSION = 1;
/** Schema history key predicate part. */
- public static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+ private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -117,119 +120,109 @@ public class SchemaManager extends
Producer<SchemaEvent, SchemaEventParameters>
try {
ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
- int verFromUpdate = tblCfg.schemaId();
+ int newSchemaVersion = tblCfg.schemaId();
UUID tblId = tblCfg.id();
- String tableName = tblCfg.name();
-
- SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
-
- if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version())
!= null) {
+ if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
return completedFuture(null);
}
- if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
- SchemaDescriptor oldSchema = searchSchemaByVersion(tblId,
verFromUpdate - 1);
+ SchemaDescriptor newSchema =
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
- if (oldSchema == null) {
- byte[] serPrevSchema = schemaByVersion(tblId,
verFromUpdate - 1);
-
- assert serPrevSchema != null;
-
- oldSchema =
SchemaSerializerImpl.INSTANCE.deserialize(serPrevSchema);
- }
+ // This is intentionally a blocking call to enforce configuration
listener execution order. Unfortunately it is not possible
+ // to execute this method asynchronously, because the schema
descriptor is needed to fire the CREATE event as a synchronous part
+ // of the configuration listener.
+ try {
+ setColumnMapping(newSchema, tblId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
-
schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(oldSchema,
schemaDescFromUpdate));
+ return failedFuture(e);
+ } catch (ExecutionException e) {
+ return failedFuture(e);
}
long causalityToken = ctx.storageRevision();
- CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
-
- try {
- final ByteArray key = schemaWithVerHistKey(tblId,
verFromUpdate);
-
- createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
- Conditions.notExists(key),
- Operations.put(key,
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
- Operations.noop()));
- } catch (Throwable th) {
- createSchemaFut.completeExceptionally(th);
- }
-
- createSchemaFut.whenComplete((ignore, th) -> {
- if (th == null) {
- registriesVv.get(causalityToken).thenRun(() ->
inBusyLock(busyLock,
- () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+ // Fire event early, because dependent listeners have to register
VersionedValues' update futures
+ var eventParams = new SchemaEventParameters(causalityToken, tblId,
newSchema);
+
+ fireEvent(SchemaEvent.CREATE, eventParams)
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOGGER.warn("Error when processing CREATE event",
e);
+ }
+ });
+
+ return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(new
IgniteInternalException(IgniteStringFormatter.format(
+ "Cannot create a schema for the table [tblId={},
ver={}]", tblId, newSchemaVersion), e)
+ );
}
- });
- return createSchemaFut;
+ return registerSchema(registries, tblId, tblCfg.name(),
newSchema);
+ }));
} finally {
busyLock.leaveBusy();
}
}
- /**
- * Create new schema locally.
- *
- * @param causalityToken Causality token.
- * @param tableId Table id.
- * @param tableName Table name.
- * @param schemaDescriptor Schema descriptor.
- * @return Create schema future.
- */
- private CompletableFuture<?> createSchema(long causalityToken, UUID
tableId, String tableName, SchemaDescriptor schemaDescriptor) {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ private void setColumnMapping(SchemaDescriptor schema, UUID tableId)
throws ExecutionException, InterruptedException {
+ if (schema.version() == INITIAL_SCHEMA_VERSION) {
+ return;
}
- try {
- return createSchemaInternal(causalityToken, tableId, tableName,
schemaDescriptor);
- } finally {
- busyLock.leaveBusy();
+ int prevVersion = schema.version() - 1;
+
+ SchemaDescriptor prevSchema = searchSchemaByVersion(tableId,
prevVersion);
+
+ if (prevSchema == null) {
+ // This is intentionally a blocking call, because this method is
used in a synchronous part of the configuration listener.
+ // See the call site for more details.
+ prevSchema = schemaByVersion(tableId, prevVersion).get();
}
+
+ schema.columnMapping(SchemaUtils.columnMapper(prevSchema, schema));
}
/**
- * Internal method for creating schema locally.
+ * Registers the new schema in a Schema Registry.
*
- * @param causalityToken Causality token.
+ * @param registries Map of schema registries.
* @param tableId Table id.
* @param tableName Table name.
- * @param schemaDescriptor Schema descriptor.
- * @return Create schema future.
+ * @param schema Schema descriptor.
+ * @return Future that, when complete, will resolve into an updated map of
schema registries
+ * (to be used in {@link VersionedValue#update}).
*/
- private CompletableFuture<?> createSchemaInternal(
- long causalityToken,
+ private CompletableFuture<Map<UUID, SchemaRegistryImpl>> registerSchema(
+ Map<UUID, SchemaRegistryImpl> registries,
UUID tableId,
String tableName,
- SchemaDescriptor schemaDescriptor
+ SchemaDescriptor schema
) {
- return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
- if (e != null) {
- return failedFuture(new
IgniteInternalException(IgniteStringFormatter.format(
- "Cannot create a schema for the table [tblId={},
ver={}]", tableId, schemaDescriptor.version()), e)
- );
- }
+ ByteArray key = schemaWithVerHistKey(tableId, schema.version());
- Map<UUID, SchemaRegistryImpl> regs = registries;
+ byte[] serializedSchema =
SchemaSerializerImpl.INSTANCE.serialize(schema);
- SchemaRegistryImpl reg = regs.get(tableId);
+ return metastorageMgr.invoke(notExists(key), put(key,
serializedSchema), noop())
+ .thenApply(t -> {
+ SchemaRegistryImpl reg = registries.get(tableId);
- if (reg == null) {
- regs = new HashMap<>(registries);
+ if (reg == null) {
+ Map<UUID, SchemaRegistryImpl> copy = new
HashMap<>(registries);
- SchemaRegistryImpl registry = createSchemaRegistry(tableId,
tableName, schemaDescriptor);
+ copy.put(tableId, createSchemaRegistry(tableId,
tableName, schema));
- regs.put(tableId, registry);
- } else {
- reg.onSchemaRegistered(schemaDescriptor);
- }
+ return copy;
+ } else {
+ reg.onSchemaRegistered(schema);
- return completedFuture(regs);
- }));
+ return registries;
+ }
+ });
}
/**
@@ -241,35 +234,17 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* @return Schema registry.
*/
private SchemaRegistryImpl createSchemaRegistry(UUID tableId, String
tableName, SchemaDescriptor initialSchema) {
- return new SchemaRegistryImpl(ver -> {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- return tableSchema(tableId, tableName, ver);
- } finally {
- busyLock.leaveBusy();
- }
- }, () -> {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- return latestSchemaVersion(tableId);
- } finally {
- busyLock.leaveBusy();
- }
- },
- initialSchema
+ return new SchemaRegistryImpl(
+ ver -> inBusyLock(busyLock, () -> tableSchema(tableId,
tableName, ver)),
+ () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
+ initialSchema
);
}
/**
* Return table schema of certain version from history.
*
- * @param tblId Table id.
+ * @param tblId Table id.
* @param schemaVer Schema version.
* @return Schema descriptor.
*/
@@ -340,7 +315,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* @param schemaVer Schema version.
* @return Descriptor if required schema found, or {@code null} otherwise.
*/
- private SchemaDescriptor searchSchemaByVersion(UUID tblId, int schemaVer) {
+ private @Nullable SchemaDescriptor searchSchemaByVersion(UUID tblId, int
schemaVer) {
SchemaRegistry registry = registriesVv.latest().get(tblId);
if (registry != null && schemaVer <= registry.lastSchemaVersion()) {
@@ -354,7 +329,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* Gets a schema descriptor from the configuration storage.
*
* @param schemaVer Schema version.
- * @param tblCfg Table configuration.
+ * @param tblCfg Table configuration.
* @return Schema descriptor.
*/
private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int
schemaVer, ExtendedTableConfiguration tblCfg) {
@@ -368,9 +343,9 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* Get the schema registry for the given causality token and table id.
*
* @param causalityToken Causality token.
- * @param tableId Id of a table which the required registry belongs to. If
{@code null}, then this method will return
- * a future which will be completed with {@code null}
result, but only when the schema manager will have
- * consistent state regarding given causality token.
+ * @param tableId Id of a table which the required registry belongs to. If
{@code null}, then this method will return a future which
+ * will be completed with {@code null} result, but only when the
schema manager will have consistent state regarding given causality
+ * token.
* @return A future which will be completed when schema registries for
given causality token are ready.
*/
public CompletableFuture<SchemaRegistry> schemaRegistry(long
causalityToken, @Nullable UUID tableId) {
@@ -433,8 +408,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* @param tblId Table id.
* @return The latest schema version.
*/
- // TODO: Make this method async, see
https://issues.apache.org/jira/browse/IGNITE-18732
- private int latestSchemaVersion(UUID tblId) {
+ private CompletableFuture<Integer> latestSchemaVersion(UUID tblId) {
var latestVersionFuture = new CompletableFuture<Integer>();
metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new
Subscriber<>() {
@@ -467,15 +441,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
});
- try {
- return latestVersionFuture.get(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInternalException("Interrupted when getting schema
version from metastorage", e);
- } catch (TimeoutException | ExecutionException e) {
- throw new IgniteInternalException("Exception when getting schema
version from metastorage", e);
- }
+ return latestVersionFuture;
}
/**
@@ -484,19 +450,15 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
* @param tblId Table id.
* @return Schema representation if schema found, {@code null} otherwise.
*/
- // TODO: Make this method async, see
https://issues.apache.org/jira/browse/IGNITE-18732
- private byte[] schemaByVersion(UUID tblId, int ver) {
- try {
- return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
- .thenApply(Entry::value)
- .get(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInternalException("Interrupted when getting schema
from metastorage", e);
- } catch (TimeoutException | ExecutionException e) {
- throw new IgniteInternalException("Exception when getting schema
from metastorage", e);
- }
+ private CompletableFuture<SchemaDescriptor> schemaByVersion(UUID tblId,
int ver) {
+ return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+ .thenApply(entry -> {
+ byte[] value = entry.value();
+
+ assert value != null;
+
+ return SchemaSerializerImpl.INSTANCE.deserialize(value);
+ });
}
private int extractVerFromSchemaKey(String key) {
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 32d7bac3ea..75e879cb86 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Function;
-import java.util.function.IntSupplier;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -46,10 +46,10 @@ public class SchemaRegistryImpl implements SchemaRegistry {
private final Map<Long, ColumnMapper> mappingCache = new
ConcurrentHashMap<>();
/** Schema store. */
- private final Function<Integer, CompletableFuture<SchemaDescriptor>>
history;
+ private final IntFunction<CompletableFuture<SchemaDescriptor>> history;
/** The method to provide the latest schema version on cluster. */
- private final IntSupplier latestVersionStore;
+ private final Supplier<CompletableFuture<Integer>> latestVersionStore;
/**
* Constructor.
@@ -59,8 +59,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
* @param initialSchema Initial schema.
*/
public SchemaRegistryImpl(
- Function<Integer, CompletableFuture<SchemaDescriptor>> history,
- IntSupplier latestVersionStore,
+ IntFunction<CompletableFuture<SchemaDescriptor>> history,
+ Supplier<CompletableFuture<Integer>> latestVersionStore,
SchemaDescriptor initialSchema
) {
this.history = history;
@@ -118,7 +118,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
/** {@inheritDoc} */
@Override
public SchemaDescriptor waitLatestSchema() {
- int lastVer0 = latestVersionStore.getAsInt();
+ // TODO: remove blocking code
https://issues.apache.org/jira/browse/IGNITE-17931
+ int lastVer0 = latestVersionStore.get().join();
Integer lastLocalVer = schemaCache.lastKey();
assert lastLocalVer <= lastVer0 : "Cached schema is earlier than
consensus [lastVer=" + lastLocalVer
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index f6aacd6761..dfd5e33c8b 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema.registry;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
import static org.apache.ignite.internal.schema.NativeTypes.INT64;
import static org.apache.ignite.internal.schema.NativeTypes.STRING;
@@ -58,7 +59,7 @@ public class SchemaRegistryImplTest {
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valBytesCol", BYTES, true)});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
INITIAL_SCHEMA_VERSION, schemaV0);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV0);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNotNull(reg.schema());
@@ -108,7 +109,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
INITIAL_SCHEMA_VERSION, schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNotNull(reg.schema());
@@ -162,7 +163,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
INITIAL_SCHEMA_VERSION, schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -230,7 +231,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
INITIAL_SCHEMA_VERSION, schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -346,7 +347,7 @@ public class SchemaRegistryImplTest {
Map<Integer, CompletableFuture<SchemaDescriptor>> history =
schemaHistory(schemaV1, schemaV2);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> INITIAL_SCHEMA_VERSION, schemaV2);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV2);
assertEquals(2, reg.lastSchemaVersion());
assertSameSchema(schemaV2, reg.schema());
@@ -412,7 +413,7 @@ public class SchemaRegistryImplTest {
Map<Integer, CompletableFuture<SchemaDescriptor>> history =
schemaHistory(schemaV2, schemaV3);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> INITIAL_SCHEMA_VERSION, schemaV3);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV3);
assertEquals(3, reg.lastSchemaVersion());
assertSameSchema(schemaV3, reg.schema());
@@ -478,7 +479,7 @@ public class SchemaRegistryImplTest {
Map<Integer, CompletableFuture<SchemaDescriptor>> history =
schemaHistory(schemaV2, schemaV3, schemaV4);
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> INITIAL_SCHEMA_VERSION, schemaV4);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, ()
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV4);
assertEquals(4, reg.lastSchemaVersion());
assertSameSchema(schemaV4, reg.schema());
@@ -544,7 +545,7 @@ public class SchemaRegistryImplTest {
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
INITIAL_SCHEMA_VERSION, schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
final Map<Long, ColumnMapper> cache = reg.mappingCache();
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index a584a38163..c7ef390e0f 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -151,15 +151,25 @@ public class UpgradingRowAdapterTest {
BinaryRow row = serializeValuesToRow(schema, values);
+ var schemaRegistry = new SchemaRegistryImpl(
+ v -> v == 1 ? completedFuture(schema) :
completedFuture(schema2),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schema
+ );
+
// Validate row.
- validateRow(values, new SchemaRegistryImpl(v -> v == 1 ?
completedFuture(schema) : completedFuture(schema2),
- () -> INITIAL_SCHEMA_VERSION, schema), row);
+ validateRow(values, schemaRegistry, row);
// Validate upgraded row.
values.add(addedColumnIndex, null);
- validateRow(values, new SchemaRegistryImpl(v -> v == 1 ?
completedFuture(schema) : completedFuture(schema2),
- () -> schema2.version(), schema2), row);
+ var schema2Registry = new SchemaRegistryImpl(
+ v -> v == 1 ? completedFuture(schema) :
completedFuture(schema2),
+ () -> completedFuture(schema2.version()),
+ schema2
+ );
+
+ validateRow(values, schema2Registry, row);
}
private void validateRow(List<Object> values, SchemaRegistryImpl
schemaRegistry, BinaryRow binaryRow) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 89d29d4c44..babf997a4c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -154,7 +154,7 @@ public class StopCalciteModuleTest {
new Column[]{new Column(1, "VAL", NativeTypes.INT32, false)}
);
- schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc),
() -> INITIAL_SCHEMA_VERSION, schemaDesc);
+ schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc),
() -> completedFuture(INITIAL_SCHEMA_VERSION), schemaDesc);
when(tbl.name()).thenReturn("TEST");
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 7ab14fe4be..0f50ecadea 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -55,9 +55,11 @@ import
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
@@ -269,12 +271,14 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
}
/** Dummy metastore activity mock. */
- private void mockMetastore() throws Exception {
+ private void mockMetastore() {
when(msm.prefix(any())).thenReturn(subscriber -> {
subscriber.onSubscribe(mock(Subscription.class));
subscriber.onComplete();
});
+
+ when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(completedFuture(null));
}
/**
@@ -550,7 +554,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
tblsCfg,
cs,
rm,
- null,
+ mock(ReplicaManager.class),
null,
null,
bm,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index b61f11ddb6..cbae76eda9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.benchmarks;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import java.util.Random;
@@ -107,7 +108,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> INITIAL_SCHEMA_VERSION, schema) {
+ marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
@Override
public SchemaDescriptor schema() {
return schema;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index d12ea24a29..896353ccb8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.benchmarks;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
import static org.apache.ignite.internal.schema.NativeTypes.INT64;
import static org.apache.ignite.internal.schema.NativeTypes.STRING;
@@ -120,7 +121,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> INITIAL_SCHEMA_VERSION, schema) {
+ marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
@Override
public SchemaDescriptor schema() {
return schema;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index c114dd7eda..e8d4e374ee 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -601,6 +602,8 @@ public class TableManagerTest extends IgniteAbstractTest {
subscriber.onComplete();
});
+
+ when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(completedFuture(null));
}
/**