This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8742839b1e IGNITE-20071 Simplify SchemaManager and SchemaRegistryImpl 
interaction (#2369)
8742839b1e is described below

commit 8742839b1e991fe95e045d05aa5a0386e42e4dfe
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Jul 27 20:13:02 2023 +0400

    IGNITE-20071 Simplify SchemaManager and SchemaRegistryImpl interaction 
(#2369)
---
 .../ignite/client/fakes/FakeSchemaRegistry.java    |   5 +
 .../ignite/internal/schema/SchemaManager.java      | 152 +++++----------------
 .../ignite/internal/schema/SchemaRegistry.java     |   9 +-
 .../schema/registry/SchemaRegistryImpl.java        |  31 ++++-
 .../schema/registry/SchemaRegistryImplTest.java    |  46 +++++--
 .../TupleMarshallerFixlenOnlyBenchmark.java        |   6 +-
 .../TupleMarshallerVarlenOnlyBenchmark.java        |   6 +-
 .../table/impl/DummySchemaManagerImpl.java         |   5 +
 8 files changed, 124 insertions(+), 136 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
index 21b8e74a26..3c76b7e1e1 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
@@ -132,4 +132,9 @@ public class FakeSchemaRegistry implements SchemaRegistry {
     public List<Row> resolve(Collection<BinaryRow> rows) {
         return rows.stream().map(binaryRow -> binaryRow == null ? null : 
resolve(binaryRow)).collect(toList());
     }
+
+    @Override
+    public void close() {
+        // No-op.
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 0abadc888d..51f0d71c8a 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -37,7 +37,6 @@ import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.configuration.NamedListView;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -47,13 +46,13 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.schema.configuration.ColumnView;
 import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -161,13 +160,31 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
                     );
                 }
 
-                return registerSchema(registries, tblId, tblCfg.name(), 
newSchema);
+                return saveSchemaDescriptor(tblId, newSchema)
+                        .thenApply(t -> registerSchema(tblId, newSchema, 
registries));
             }));
         } finally {
             busyLock.leaveBusy();
         }
     }
 
+    private Map<Integer, SchemaRegistryImpl> registerSchema(int tblId, 
SchemaDescriptor newSchema,
+            Map<Integer, SchemaRegistryImpl> registries) {
+        SchemaRegistryImpl reg = registries.get(tblId);
+
+        if (reg == null) {
+            Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
+
+            copy.put(tblId, createSchemaRegistry(tblId, newSchema));
+
+            return copy;
+        } else {
+            reg.onSchemaRegistered(newSchema);
+
+            return registries;
+        }
+    }
+
     private void setColumnMapping(SchemaDescriptor schema, int tableId) throws 
ExecutionException, InterruptedException {
         if (schema.version() == INITIAL_SCHEMA_VERSION) {
             return;
@@ -187,126 +204,42 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     }
 
     /**
-     * Registers the new schema in a Schema Registry.
+     * Gets a schema descriptor from the configuration storage.
      *
-     * @param registries Map of schema registries.
-     * @param tableId Table id.
-     * @param tableName Table name.
-     * @param schema Schema descriptor.
-     * @return Future that, when complete, will resolve into an updated map of 
schema registries
-     *     (to be used in {@link IncrementalVersionedValue#update}).
+     * @param tableId Table ID.
+     * @param schemaVer Schema version.
+     * @return Schema descriptor.
      */
-    private CompletableFuture<Map<Integer, SchemaRegistryImpl>> registerSchema(
-            Map<Integer, SchemaRegistryImpl> registries,
-            int tableId,
-            String tableName,
-            SchemaDescriptor schema
-    ) {
-        ByteArray key = schemaWithVerHistKey(tableId, schema.version());
-
-        byte[] serializedSchema = 
SchemaSerializerImpl.INSTANCE.serialize(schema);
-
-        return metastorageMgr.invoke(notExists(key), put(key, 
serializedSchema), noop())
-                .thenApply(t -> {
-                    SchemaRegistryImpl reg = registries.get(tableId);
+    private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int 
tableId, int schemaVer) {
+        CompletableFuture<Entry> ent = 
metastorageMgr.get(schemaWithVerHistKey(tableId, schemaVer));
 
-                    if (reg == null) {
-                        Map<Integer, SchemaRegistryImpl> copy = new 
HashMap<>(registries);
+        return ent.thenApply(e -> 
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
+    }
 
-                        copy.put(tableId, createSchemaRegistry(tableId, 
tableName, schema));
+    /** Saves a schema descriptor to the configuration storage. */
+    private CompletableFuture<Boolean> saveSchemaDescriptor(int tableId, 
SchemaDescriptor schema) {
+        ByteArray key = schemaWithVerHistKey(tableId, schema.version());
 
-                        return copy;
-                    } else {
-                        reg.onSchemaRegistered(schema);
+        byte[] serializedSchema = 
SchemaSerializerImpl.INSTANCE.serialize(schema);
 
-                        return registries;
-                    }
-                });
+        return metastorageMgr.invoke(notExists(key), put(key, 
serializedSchema), noop());
     }
 
     /**
      * Create schema registry for the table.
      *
      * @param tableId Table id.
-     * @param tableName Table name.
      * @param initialSchema Initial schema for the registry.
      * @return Schema registry.
      */
-    private SchemaRegistryImpl createSchemaRegistry(int tableId, String 
tableName, SchemaDescriptor initialSchema) {
+    private SchemaRegistryImpl createSchemaRegistry(int tableId, 
SchemaDescriptor initialSchema) {
         return new SchemaRegistryImpl(
-                ver -> inBusyLock(busyLock, () -> tableSchema(tableId, 
tableName, ver)),
+                ver -> inBusyLock(busyLock, () -> 
loadSchemaDescriptor(tableId, ver)),
                 () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
                 initialSchema
         );
     }
 
-    /**
-     * Return table schema of certain version from history.
-     *
-     * @param tblId Table id.
-     * @param schemaVer Schema version.
-     * @return Schema descriptor.
-     */
-    private CompletableFuture<SchemaDescriptor> tableSchema(int tblId, String 
tableName, int schemaVer) {
-        TableConfiguration tblCfg = tablesCfg.tables().get(tableName);
-
-        CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
-
-        SchemaRegistry registry = registriesVv.latest().get(tblId);
-
-        if (registry.lastSchemaVersion() > schemaVer) {
-            return getSchemaDescriptor(schemaVer, tblCfg);
-        }
-
-        CompletionListener<Map<Integer, SchemaRegistryImpl>> schemaListener = 
(token, regs, e) -> {
-            if (schemaVer <= regs.get(tblId).lastSchemaVersion()) {
-                SchemaRegistry registry0 = registriesVv.latest().get(tblId);
-
-                SchemaDescriptor desc = registry0.schemaCached(schemaVer);
-
-                assert desc != null : "Unexpected empty schema description.";
-
-                fut.complete(desc);
-            }
-        };
-
-        registriesVv.whenComplete(schemaListener);
-
-        // This check is needed for the case when we have registered 
schemaListener,
-        // but registriesVv has already been completed, so listener would be 
triggered only for the next versioned value update.
-        if (checkSchemaVersion(tblId, schemaVer)) {
-            registriesVv.removeWhenComplete(schemaListener);
-
-            registry = registriesVv.latest().get(tblId);
-
-            SchemaDescriptor desc = registry.schemaCached(schemaVer);
-
-            assert desc != null : "Unexpected empty schema description.";
-
-            fut.complete(desc);
-        }
-
-        return fut.thenApply(res -> {
-            registriesVv.removeWhenComplete(schemaListener);
-            return res;
-        });
-    }
-
-    /**
-     * Checks that the provided schema version is less or equal than the 
latest version from the schema registry.
-     *
-     * @param tblId Unique table id.
-     * @param schemaVer Schema version for the table.
-     * @return True, if the schema version is less or equal than the latest 
version from the schema registry, false otherwise.
-     */
-    private boolean checkSchemaVersion(int tblId, int schemaVer) {
-        SchemaRegistry registry = registriesVv.latest().get(tblId);
-
-        assert registry != null : IgniteStringFormatter.format("Registry for 
the table not found [tblId={}]", tblId);
-
-        return schemaVer <= registry.lastSchemaVersion();
-    }
-
     /**
      * Try to find schema in cache.
      *
@@ -324,19 +257,6 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         }
     }
 
-    /**
-     * Gets a schema descriptor from the configuration storage.
-     *
-     * @param schemaVer Schema version.
-     * @param tblCfg Table configuration.
-     * @return Schema descriptor.
-     */
-    private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int 
schemaVer, TableConfiguration tblCfg) {
-        CompletableFuture<Entry> ent = 
metastorageMgr.get(schemaWithVerHistKey(tblCfg.id().value(), schemaVer));
-
-        return ent.thenApply(e -> 
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
-    }
-
     /**
      * Get the schema registry for the given causality token and table id.
      *
@@ -398,6 +318,8 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         }
 
         busyLock.block();
+
+        IgniteUtils.closeAllManually(registriesVv.latest().values().stream());
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index 2e8ecd8918..94022748f1 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
 
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
 import org.apache.ignite.internal.schema.row.Row;
 import org.jetbrains.annotations.Nullable;
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
  *      beginning.
  * @implSpec Initial schema history MAY be registered without the first 
outdated versions that could be cleaned up earlier.
  */
-public interface SchemaRegistry {
+public interface SchemaRegistry extends ManuallyCloseable {
     /**
      * Gets schema descriptor for the latest version if initialized.
      *
@@ -97,4 +98,10 @@ public interface SchemaRegistry {
      * @return Schema-aware rows. Contains {@code null} at the same positions 
as in {@code rows}.
      */
     List<Row> resolve(Collection<BinaryRow> rows);
+
+    /**
+     * Closes the registry freeing any resources it holds.
+     */
+    @Override
+    void close();
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 5368222c72..ade5ef054a 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.mapping.ColumnMapper;
 import org.apache.ignite.internal.schema.mapping.ColumnMapping;
 import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -46,27 +47,31 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     private final Map<Long, ColumnMapper> mappingCache = new 
ConcurrentHashMap<>();
 
     /** Schema store. */
-    private final IntFunction<CompletableFuture<SchemaDescriptor>> history;
+    private final IntFunction<CompletableFuture<SchemaDescriptor>> 
loadSchemaByVersion;
 
     /** The method to provide the latest schema version on cluster. */
     private final Supplier<CompletableFuture<Integer>> latestVersionStore;
 
+    private final PendingComparableValuesTracker<Integer, Void> versionTracker 
= new PendingComparableValuesTracker<>(0);
+
     /**
      * Constructor.
      *
-     * @param history            Schema history.
+     * @param loadSchemaByVersion            Schema history.
      * @param latestVersionStore The method to provide the latest version of 
the schema.
      * @param initialSchema      Initial schema.
      */
     public SchemaRegistryImpl(
-            IntFunction<CompletableFuture<SchemaDescriptor>> history,
+            IntFunction<CompletableFuture<SchemaDescriptor>> 
loadSchemaByVersion,
             Supplier<CompletableFuture<Integer>> latestVersionStore,
             SchemaDescriptor initialSchema
     ) {
-        this.history = history;
+        this.loadSchemaByVersion = loadSchemaByVersion;
         this.latestVersionStore = latestVersionStore;
 
         schemaCache.put(initialSchema.version(), initialSchema);
+
+        versionTracker.update(initialSchema.version(), null);
     }
 
     /** {@inheritDoc} */
@@ -83,7 +88,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
             return desc;
         }
 
-        CompletableFuture<SchemaDescriptor> descFut = history.apply(ver);
+        CompletableFuture<SchemaDescriptor> descFut = tableSchema(ver);
 
         if (descFut != null) {
             // TODO: remove blocking code 
https://issues.apache.org/jira/browse/IGNITE-17931
@@ -92,6 +97,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
 
         if (desc != null) {
             schemaCache.putIfAbsent(ver, desc);
+            versionTracker.update(ver, null);
 
             return desc;
         }
@@ -161,6 +167,11 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         return rows;
     }
 
+    @Override
+    public void close() {
+        versionTracker.close();
+    }
+
     /**
      * Resolves a schema for row. The method is optimal when the latest schema 
is already got.
      *
@@ -235,6 +246,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         }
 
         schemaCache.put(desc.version(), desc);
+
+        versionTracker.update(desc.version(), null);
     }
 
     /**
@@ -263,4 +276,12 @@ public class SchemaRegistryImpl implements SchemaRegistry {
     Map<Long, ColumnMapper> mappingCache() {
         return mappingCache;
     }
+
+    private CompletableFuture<SchemaDescriptor> tableSchema(int schemaVer) {
+        if (schemaVer < lastSchemaVersion()) {
+            return loadSchemaByVersion.apply(schemaVer);
+        }
+
+        return versionTracker.waitFor(schemaVer).thenApply(unused -> 
schemaCached(schemaVer));
+    }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index dfd5e33c8b..75f7841c2d 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -18,11 +18,15 @@
 package org.apache.ignite.internal.schema.registry;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
 import static org.apache.ignite.internal.schema.NativeTypes.INT64;
 import static org.apache.ignite.internal.schema.NativeTypes.STRING;
 import static 
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static 
org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -59,7 +63,11 @@ public class SchemaRegistryImplTest {
                 new Column[]{new Column("keyLongCol", INT64, false)},
                 new Column[]{new Column("valBytesCol", BYTES, true)});
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV0);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+                v -> completedFuture(null),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schemaV0
+        );
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNotNull(reg.schema());
@@ -109,7 +117,11 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+                v -> completedFuture(null),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schemaV1
+        );
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
         assertNotNull(reg.schema());
@@ -136,8 +148,12 @@ public class SchemaRegistryImplTest {
         assertSameSchema(schemaV2, reg.schema());
         assertSameSchema(schemaV1, reg.schema(1));
         assertSameSchema(schemaV2, reg.schema(2));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(4));
+        try {
+            assertThat(supplyAsync(() -> reg.schema(3)), 
not(completedFuture()));
+            assertThat(supplyAsync(() -> reg.schema(4)), 
not(completedFuture()));
+        } finally {
+            reg.close();
+        }
     }
 
     /**
@@ -163,7 +179,11 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+                v -> completedFuture(null),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schemaV1
+        );
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -180,7 +200,6 @@ public class SchemaRegistryImplTest {
         assertEquals(1, reg.lastSchemaVersion());
         assertSameSchema(schemaV1, reg.schema());
         assertSameSchema(schemaV1, reg.schema(1));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
 
         // Try to register another schema with same version and check nothing 
was registered.
         assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(wrongSchema));
@@ -190,7 +209,6 @@ public class SchemaRegistryImplTest {
 
         assertSameSchema(schemaV1, reg.schema());
         assertSameSchema(schemaV1, reg.schema(1));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
 
         // Register schema with next version.
         reg.onSchemaRegistered(schemaV2);
@@ -231,7 +249,11 @@ public class SchemaRegistryImplTest {
                         new Column("valStringCol", STRING, true)
                 });
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+                v -> completedFuture(null),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schemaV1
+        );
 
         assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
 
@@ -361,14 +383,12 @@ public class SchemaRegistryImplTest {
         assertSameSchema(schemaV2, reg.schema());
         assertSameSchema(schemaV1, reg.schema(1));
         assertSameSchema(schemaV2, reg.schema(2));
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
 
         // Register schema with out-of-order version.
         assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV4));
 
         assertEquals(2, reg.lastSchemaVersion());
         assertSameSchema(schemaV2, reg.schema());
-        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
 
         // Register schema with next version.
         reg.onSchemaRegistered(schemaV3);
@@ -545,7 +565,11 @@ public class SchemaRegistryImplTest {
 
         
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
 
-        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () -> 
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+                v -> completedFuture(null),
+                () -> completedFuture(INITIAL_SCHEMA_VERSION),
+                schemaV1
+        );
 
         final Map<Long, ColumnMapper> cache = reg.mappingCache();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index cbae76eda9..785f94ad06 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -108,7 +109,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
+        SchemaRegistry reg = new SchemaRegistryImpl(v -> 
completedFuture(null), () -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
@@ -123,7 +124,8 @@ public class TupleMarshallerFixlenOnlyBenchmark {
             public int lastSchemaVersion() {
                 return schema.version();
             }
-        });
+        };
+        marshaller = new TupleMarshallerImpl(reg);
 
         vals = new Object[schema.valueColumns().length()];
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index 896353ccb8..4c75ad44a2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.Columns;
 import org.apache.ignite.internal.schema.DefaultValueProvider;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -121,7 +122,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
                         .toArray(Column[]::new)
         );
 
-        marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null, 
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
+        SchemaRegistry reg = new SchemaRegistryImpl(v -> 
completedFuture(null), () -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
             @Override
             public SchemaDescriptor schema() {
                 return schema;
@@ -136,7 +137,8 @@ public class TupleMarshallerVarlenOnlyBenchmark {
             public int lastSchemaVersion() {
                 return schema.version();
             }
-        });
+        };
+        marshaller = new TupleMarshallerImpl(reg);
 
         if (useString) {
             final byte[] data = new byte[dataSize / fieldsCount];
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index b9e919d901..314777fa89 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -98,4 +98,9 @@ public class DummySchemaManagerImpl implements SchemaRegistry 
{
     public List<Row> resolve(Collection<BinaryRow> rows) {
         return rows.stream().map(binaryRow -> binaryRow == null ? null : 
resolve(binaryRow)).collect(toList());
     }
+
+    @Override
+    public void close() {
+        // No-op.
+    }
 }

Reply via email to