This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e2c8a55b60 IGNITE-18732 Fix race in VersionedValues in SchemaManager 
(#1665)
e2c8a55b60 is described below

commit e2c8a55b60e3c0a21f702911e31b281f317480d0
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Feb 16 12:19:26 2023 +0200

    IGNITE-18732 Fix race in VersionedValues in SchemaManager (#1665)
---
 .../ignite/internal/schema/SchemaManager.java      | 226 +++++++++------------
 .../schema/registry/SchemaRegistryImpl.java        |  15 +-
 .../schema/registry/SchemaRegistryImplTest.java    |  17 +-
 .../schema/registry/UpgradingRowAdapterTest.java   |  18 +-
 .../internal/sql/engine/StopCalciteModuleTest.java |   2 +-
 .../sql/engine/exec/MockedStructuresTest.java      |   8 +-
 .../TupleMarshallerFixlenOnlyBenchmark.java        |   3 +-
 .../TupleMarshallerVarlenOnlyBenchmark.java        |   3 +-
 .../table/distributed/TableManagerTest.java        |   3 +
 9 files changed, 139 insertions(+), 156 deletions(-)

diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 893cb9d305..d565ee5c70 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.schema;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
@@ -30,20 +33,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.configuration.NamedListView;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.causality.VersionedValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.schema.configuration.ColumnView;
 import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
@@ -65,11 +66,13 @@ import org.jetbrains.annotations.Nullable;
  * The class services a management of table schemas.
  */
 public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters> implements IgniteComponent {
+    private static final IgniteLogger LOGGER = 
Loggers.forClass(SchemaManager.class);
+
     /** Initial version for schemas. */
     public static final int INITIAL_SCHEMA_VERSION = 1;
 
     /** Schema history key predicate part. */
-    public static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+    private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -117,119 +120,109 @@ public class SchemaManager extends 
Producer<SchemaEvent, SchemaEventParameters>
         try {
             ExtendedTableView tblCfg = (ExtendedTableView) 
ctx.config(ExtendedTableConfiguration.class).value();
 
-            int verFromUpdate = tblCfg.schemaId();
+            int newSchemaVersion = tblCfg.schemaId();
 
             UUID tblId = tblCfg.id();
 
-            String tableName = tblCfg.name();
-
-            SchemaDescriptor schemaDescFromUpdate = 
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
-
-            if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) 
!= null) {
+            if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
                 return completedFuture(null);
             }
 
-            if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
-                SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, 
verFromUpdate - 1);
+            SchemaDescriptor newSchema = 
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
 
-                if (oldSchema == null) {
-                    byte[] serPrevSchema = schemaByVersion(tblId, 
verFromUpdate - 1);
-
-                    assert serPrevSchema != null;
-
-                    oldSchema = 
SchemaSerializerImpl.INSTANCE.deserialize(serPrevSchema);
-                }
+            // This is intentionally a blocking call to enforce configuration 
listener execution order. Unfortunately it is not possible
+            // to execute this method asynchronously, because the schema 
descriptor is needed to fire the CREATE event as a synchronous part
+            // of the configuration listener.
+            try {
+                setColumnMapping(newSchema, tblId);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
 
-                
schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(oldSchema, 
schemaDescFromUpdate));
+                return failedFuture(e);
+            } catch (ExecutionException e) {
+                return failedFuture(e);
             }
 
             long causalityToken = ctx.storageRevision();
 
-            CompletableFuture<?> createSchemaFut = 
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
-
-            try {
-                final ByteArray key = schemaWithVerHistKey(tblId, 
verFromUpdate);
-
-                createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
-                        Conditions.notExists(key),
-                        Operations.put(key, 
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
-                        Operations.noop()));
-            } catch (Throwable th) {
-                createSchemaFut.completeExceptionally(th);
-            }
-
-            createSchemaFut.whenComplete((ignore, th) -> {
-                if (th == null) {
-                    registriesVv.get(causalityToken).thenRun(() -> 
inBusyLock(busyLock,
-                            () -> fireEvent(SchemaEvent.CREATE, new 
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+            // Fire event early, because dependent listeners have to register 
VersionedValues' update futures
+            var eventParams = new SchemaEventParameters(causalityToken, tblId, 
newSchema);
+
+            fireEvent(SchemaEvent.CREATE, eventParams)
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOGGER.warn("Error when processing CREATE event", 
e);
+                        }
+                    });
+
+            return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(new 
IgniteInternalException(IgniteStringFormatter.format(
+                            "Cannot create a schema for the table [tblId={}, 
ver={}]", tblId, newSchemaVersion), e)
+                    );
                 }
-            });
 
-            return createSchemaFut;
+                return registerSchema(registries, tblId, tblCfg.name(), 
newSchema);
+            }));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    /**
-     * Create new schema locally.
-     *
-     * @param causalityToken Causality token.
-     * @param tableId Table id.
-     * @param tableName Table name.
-     * @param schemaDescriptor Schema descriptor.
-     * @return Create schema future.
-     */
-    private CompletableFuture<?> createSchema(long causalityToken, UUID 
tableId, String tableName, SchemaDescriptor schemaDescriptor) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+    private void setColumnMapping(SchemaDescriptor schema, UUID tableId) 
throws ExecutionException, InterruptedException {
+        if (schema.version() == INITIAL_SCHEMA_VERSION) {
+            return;
         }
 
-        try {
-            return createSchemaInternal(causalityToken, tableId, tableName, 
schemaDescriptor);
-        } finally {
-            busyLock.leaveBusy();
+        int prevVersion = schema.version() - 1;
+
+        SchemaDescriptor prevSchema = searchSchemaByVersion(tableId, 
prevVersion);
+
+        if (prevSchema == null) {
+            // This is intentionally a blocking call, because this method is 
used in a synchronous part of the configuration listener.
+            // See the call site for more details.
+            prevSchema = schemaByVersion(tableId, prevVersion).get();
         }
+
+        schema.columnMapping(SchemaUtils.columnMapper(prevSchema, schema));
     }
 
     /**
-     * Internal method for creating schema locally.
+     * Registers the new schema in a Schema Registry.
      *
-     * @param causalityToken Causality token.
+     * @param registries Map of schema registries.
      * @param tableId Table id.
      * @param tableName Table name.
-     * @param schemaDescriptor Schema descriptor.
-     * @return Create schema future.
+     * @param schema Schema descriptor.
+     * @return Future that, when complete, will resolve into an updated map of 
schema registries
+     *     (to be used in {@link VersionedValue#update}).
      */
-    private CompletableFuture<?> createSchemaInternal(
-            long causalityToken,
+    private CompletableFuture<Map<UUID, SchemaRegistryImpl>> registerSchema(
+            Map<UUID, SchemaRegistryImpl> registries,
             UUID tableId,
             String tableName,
-            SchemaDescriptor schemaDescriptor
+            SchemaDescriptor schema
     ) {
-        return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
-            if (e != null) {
-                return failedFuture(new 
IgniteInternalException(IgniteStringFormatter.format(
-                        "Cannot create a schema for the table [tblId={}, 
ver={}]", tableId, schemaDescriptor.version()), e)
-                );
-            }
+        ByteArray key = schemaWithVerHistKey(tableId, schema.version());
 
-            Map<UUID, SchemaRegistryImpl> regs = registries;
+        byte[] serializedSchema = 
SchemaSerializerImpl.INSTANCE.serialize(schema);
 
-            SchemaRegistryImpl reg = regs.get(tableId);
+        return metastorageMgr.invoke(notExists(key), put(key, 
serializedSchema), noop())
+                .thenApply(t -> {
+                    SchemaRegistryImpl reg = registries.get(tableId);
 
-            if (reg == null) {
-                regs = new HashMap<>(registries);
+                    if (reg == null) {
+                        Map<UUID, SchemaRegistryImpl> copy = new 
HashMap<>(registries);
 
-                SchemaRegistryImpl registry = createSchemaRegistry(tableId, 
tableName, schemaDescriptor);
+                        copy.put(tableId, createSchemaRegistry(tableId, 
tableName, schema));
 
-                regs.put(tableId, registry);
-            } else {
-                reg.onSchemaRegistered(schemaDescriptor);
-            }
+                        return copy;
+                    } else {
+                        reg.onSchemaRegistered(schema);
 
-            return completedFuture(regs);
-        }));
+                        return registries;
+                    }
+                });
     }
 
     /**
@@ -241,35 +234,17 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * @return Schema registry.
      */
     private SchemaRegistryImpl createSchemaRegistry(UUID tableId, String 
tableName, SchemaDescriptor initialSchema) {
-        return new SchemaRegistryImpl(ver -> {
-            if (!busyLock.enterBusy()) {
-                throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-            }
-
-            try {
-                return tableSchema(tableId, tableName, ver);
-            } finally {
-                busyLock.leaveBusy();
-            }
-        }, () -> {
-            if (!busyLock.enterBusy()) {
-                throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
-            }
-
-            try {
-                return latestSchemaVersion(tableId);
-            } finally {
-                busyLock.leaveBusy();
-            }
-        },
-            initialSchema
+        return new SchemaRegistryImpl(
+                ver -> inBusyLock(busyLock, () -> tableSchema(tableId, 
tableName, ver)),
+                () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
+                initialSchema
         );
     }
 
     /**
      * Return table schema of certain version from history.
      *
-     * @param tblId     Table id.
+     * @param tblId Table id.
      * @param schemaVer Schema version.
      * @return Schema descriptor.
      */
@@ -340,7 +315,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * @param schemaVer Schema version.
      * @return Descriptor if required schema found, or {@code null} otherwise.
      */
-    private SchemaDescriptor searchSchemaByVersion(UUID tblId, int schemaVer) {
+    private @Nullable SchemaDescriptor searchSchemaByVersion(UUID tblId, int 
schemaVer) {
         SchemaRegistry registry = registriesVv.latest().get(tblId);
 
         if (registry != null && schemaVer <= registry.lastSchemaVersion()) {
@@ -354,7 +329,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * Gets a schema descriptor from the configuration storage.
      *
      * @param schemaVer Schema version.
-     * @param tblCfg    Table configuration.
+     * @param tblCfg Table configuration.
      * @return Schema descriptor.
      */
     private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int 
schemaVer, ExtendedTableConfiguration tblCfg) {
@@ -368,9 +343,9 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * Get the schema registry for the given causality token and table id.
      *
      * @param causalityToken Causality token.
-     * @param tableId Id of a table which the required registry belongs to. If 
{@code null}, then this method will return
-     *                a future which will be completed with {@code null} 
result, but only when the schema manager will have
-     *                consistent state regarding given causality token.
+     * @param tableId Id of a table which the required registry belongs to. If 
{@code null}, then this method will return a future which
+     *     will be completed with {@code null} result, but only when the 
schema manager will have consistent state regarding given causality
+     *     token.
      * @return A future which will be completed when schema registries for 
given causality token are ready.
      */
     public CompletableFuture<SchemaRegistry> schemaRegistry(long 
causalityToken, @Nullable UUID tableId) {
@@ -433,8 +408,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * @param tblId Table id.
      * @return The latest schema version.
      */
-    // TODO: Make this method async, see 
https://issues.apache.org/jira/browse/IGNITE-18732
-    private int latestSchemaVersion(UUID tblId) {
+    private CompletableFuture<Integer> latestSchemaVersion(UUID tblId) {
         var latestVersionFuture = new CompletableFuture<Integer>();
 
         metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new 
Subscriber<>() {
@@ -467,15 +441,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
             }
         });
 
-        try {
-            return latestVersionFuture.get(10, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInternalException("Interrupted when getting schema 
version from metastorage", e);
-        } catch (TimeoutException | ExecutionException e) {
-            throw new IgniteInternalException("Exception when getting schema 
version from metastorage", e);
-        }
+        return latestVersionFuture;
     }
 
     /**
@@ -484,19 +450,15 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
      * @param tblId Table id.
      * @return Schema representation if schema found, {@code null} otherwise.
      */
-    // TODO: Make this method async, see 
https://issues.apache.org/jira/browse/IGNITE-18732
-    private byte[] schemaByVersion(UUID tblId, int ver) {
-        try {
-            return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
-                    .thenApply(Entry::value)
-                    .get(10, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInternalException("Interrupted when getting schema 
from metastorage", e);
-        } catch (TimeoutException | ExecutionException e) {
-            throw new IgniteInternalException("Exception when getting schema 
from metastorage", e);
-        }
+    private CompletableFuture<SchemaDescriptor> schemaByVersion(UUID tblId, 
int ver) {
+        return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+                .thenApply(entry -> {
+                    byte[] value = entry.value();
+
+                    assert value != null;
+
+                    return SchemaSerializerImpl.INSTANCE.deserialize(value);
+                });
     }
 
     private int extractVerFromSchemaKey(String key) {
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 32d7bac3ea..75e879cb86 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Function;
-import java.util.function.IntSupplier;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -46,10 +46,10 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     private final Map<Long, ColumnMapper> mappingCache = new 
ConcurrentHashMap<>();
 
     /** Schema store. */
-    private final Function<Integer, CompletableFuture<SchemaDescriptor>> 
history;
+    private final IntFunction<CompletableFuture<SchemaDescriptor>> history;
 
     /** The method to provide the latest schema version on cluster. */
-    private final IntSupplier latestVersionStore;
+    private final Supplier<CompletableFuture<Integer>> latestVersionStore;
 
     /**
      * Constructor.
@@ -59,8 +59,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
      * @param initialSchema      Initial schema.
      */
     public SchemaRegistryImpl(
-            Function<Integer, CompletableFuture<SchemaDescriptor>> history,
-            IntSupplier latestVersionStore,
+            IntFunction<CompletableFuture<SchemaDescriptor>> history,
+            Supplier<CompletableFuture<Integer>> latestVersionStore,
             SchemaDescriptor initialSchema
     ) {
         this.history = history;
@@ -118,7 +118,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     /** {@inheritDoc} */
     @Override
     public SchemaDescriptor waitLatestSchema() {
-        int lastVer0 = latestVersionStore.getAsInt();
+        // TODO: remove blocking code 
https://issues.apache.org/jira/browse/IGNITE-17931
+        int lastVer0 = latestVersionStore.get().join();
         Integer lastLocalVer = schemaCache.lastKey();
 
         assert lastLocalVer <= lastVer0 : "Cached schema is earlier than 
consensus [lastVer=" + lastLocalVer
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index f6aacd6761..dfd5e33c8b 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.schema.registry;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.INT64;
 import static org.apache.ignite.internal.schema.NativeTypes.STRING;
@@ -58,7 +59,7 @@ public class SchemaRegistryImplTest {
                 new Column[]{new Column("keyLongCol", INT64, false)},
                 new Column[]{new Column("valBytesCol", BYTES, true)});
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
INITIAL_SCHEMA_VERSION, schemaV0);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV0);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNotNull(reg.schema());
@@ -108,7 +109,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
INITIAL_SCHEMA_VERSION, schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNotNull(reg.schema());
@@ -162,7 +163,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
INITIAL_SCHEMA_VERSION, schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -230,7 +231,7 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
INITIAL_SCHEMA_VERSION, schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -346,7 +347,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, CompletableFuture<SchemaDescriptor>> history = 
schemaHistory(schemaV1, schemaV2);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> INITIAL_SCHEMA_VERSION, schemaV2);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV2);
 
         assertEquals(2, reg.lastSchemaVersion());
         assertSameSchema(schemaV2, reg.schema());
@@ -412,7 +413,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, CompletableFuture<SchemaDescriptor>> history = 
schemaHistory(schemaV2, schemaV3);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> INITIAL_SCHEMA_VERSION, schemaV3);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV3);
 
         assertEquals(3, reg.lastSchemaVersion());
         assertSameSchema(schemaV3, reg.schema());
@@ -478,7 +479,7 @@ public class SchemaRegistryImplTest {
 
         Map<Integer, CompletableFuture<SchemaDescriptor>> history = 
schemaHistory(schemaV2, schemaV3, schemaV4);
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> INITIAL_SCHEMA_VERSION, schemaV4);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () 
-> completedFuture(INITIAL_SCHEMA_VERSION), schemaV4);
 
         assertEquals(4, reg.lastSchemaVersion());
         assertSameSchema(schemaV4, reg.schema());
@@ -544,7 +545,7 @@ public class SchemaRegistryImplTest {
 
         
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
INITIAL_SCHEMA_VERSION, schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
 
         final Map<Long, ColumnMapper> cache = reg.mappingCache();
 
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index a584a38163..c7ef390e0f 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -151,15 +151,25 @@ public class UpgradingRowAdapterTest {
 
         BinaryRow row = serializeValuesToRow(schema, values);
 
+        var schemaRegistry = new SchemaRegistryImpl(
+                v -> v == 1 ? completedFuture(schema) : 
completedFuture(schema2),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schema
+        );
+
         // Validate row.
-        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? 
completedFuture(schema) : completedFuture(schema2),
-                () -> INITIAL_SCHEMA_VERSION, schema), row);
+        validateRow(values, schemaRegistry, row);
 
         // Validate upgraded row.
         values.add(addedColumnIndex, null);
 
-        validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? 
completedFuture(schema) : completedFuture(schema2),
-                () -> schema2.version(), schema2), row);
+        var schema2Registry = new SchemaRegistryImpl(
+                v -> v == 1 ? completedFuture(schema) : 
completedFuture(schema2),
+                () -> completedFuture(schema2.version()),
+                schema2
+        );
+
+        validateRow(values, schema2Registry, row);
     }
 
     private void validateRow(List<Object> values, SchemaRegistryImpl 
schemaRegistry, BinaryRow binaryRow) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 89d29d4c44..babf997a4c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -154,7 +154,7 @@ public class StopCalciteModuleTest {
                 new Column[]{new Column(1, "VAL", NativeTypes.INT32, false)}
         );
 
-        schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc), 
() -> INITIAL_SCHEMA_VERSION, schemaDesc);
+        schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc), 
() -> completedFuture(INITIAL_SCHEMA_VERSION), schemaDesc);
 
         when(tbl.name()).thenReturn("TEST");
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 7ab14fe4be..0f50ecadea 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -55,9 +55,11 @@ import 
org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaUtils;
@@ -269,12 +271,14 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     }
 
     /** Dummy metastore activity mock. */
-    private void mockMetastore() throws Exception {
+    private void mockMetastore() {
         when(msm.prefix(any())).thenReturn(subscriber -> {
             subscriber.onSubscribe(mock(Subscription.class));
 
             subscriber.onComplete();
         });
+
+        when(msm.invoke(any(), any(Operation.class), 
any(Operation.class))).thenReturn(completedFuture(null));
     }
 
     /**
@@ -550,7 +554,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
                 tblsCfg,
                 cs,
                 rm,
-                null,
+                mock(ReplicaManager.class),
                 null,
                 null,
                 bm,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index b61f11ddb6..cbae76eda9 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.benchmarks;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 
 import java.util.Random;
@@ -107,7 +108,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> INITIAL_SCHEMA_VERSION, schema) {
+        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index d12ea24a29..896353ccb8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.benchmarks;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.INT64;
 import static org.apache.ignite.internal.schema.NativeTypes.STRING;
@@ -120,7 +121,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> INITIAL_SCHEMA_VERSION, schema) {
+        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index c114dd7eda..e8d4e374ee 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -601,6 +602,8 @@ public class TableManagerTest extends IgniteAbstractTest {
 
             subscriber.onComplete();
         });
+
+        when(msm.invoke(any(), any(Operation.class), 
any(Operation.class))).thenReturn(completedFuture(null));
     }
 
     /**

Reply via email to