This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8742839b1e IGNITE-20071 Simplify SchemaManager and SchemaRegistryImpl
interaction (#2369)
8742839b1e is described below
commit 8742839b1e991fe95e045d05aa5a0386e42e4dfe
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Jul 27 20:13:02 2023 +0400
IGNITE-20071 Simplify SchemaManager and SchemaRegistryImpl interaction
(#2369)
---
.../ignite/client/fakes/FakeSchemaRegistry.java | 5 +
.../ignite/internal/schema/SchemaManager.java | 152 +++++----------------
.../ignite/internal/schema/SchemaRegistry.java | 9 +-
.../schema/registry/SchemaRegistryImpl.java | 31 ++++-
.../schema/registry/SchemaRegistryImplTest.java | 46 +++++--
.../TupleMarshallerFixlenOnlyBenchmark.java | 6 +-
.../TupleMarshallerVarlenOnlyBenchmark.java | 6 +-
.../table/impl/DummySchemaManagerImpl.java | 5 +
8 files changed, 124 insertions(+), 136 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
index 21b8e74a26..3c76b7e1e1 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
@@ -132,4 +132,9 @@ public class FakeSchemaRegistry implements SchemaRegistry {
public List<Row> resolve(Collection<BinaryRow> rows) {
return rows.stream().map(binaryRow -> binaryRow == null ? null :
resolve(binaryRow)).collect(toList());
}
+
+ @Override
+ public void close() {
+ // No-op.
+ }
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 0abadc888d..51f0d71c8a 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -37,7 +37,6 @@ import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.configuration.NamedListView;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.internal.causality.CompletionListener;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -47,13 +46,13 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.schema.configuration.ColumnView;
import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -161,13 +160,31 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
);
}
- return registerSchema(registries, tblId, tblCfg.name(),
newSchema);
+ return saveSchemaDescriptor(tblId, newSchema)
+ .thenApply(t -> registerSchema(tblId, newSchema,
registries));
}));
} finally {
busyLock.leaveBusy();
}
}
+ private Map<Integer, SchemaRegistryImpl> registerSchema(int tblId,
SchemaDescriptor newSchema,
+ Map<Integer, SchemaRegistryImpl> registries) {
+ SchemaRegistryImpl reg = registries.get(tblId);
+
+ if (reg == null) {
+ Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
+
+ copy.put(tblId, createSchemaRegistry(tblId, newSchema));
+
+ return copy;
+ } else {
+ reg.onSchemaRegistered(newSchema);
+
+ return registries;
+ }
+ }
+
private void setColumnMapping(SchemaDescriptor schema, int tableId) throws
ExecutionException, InterruptedException {
if (schema.version() == INITIAL_SCHEMA_VERSION) {
return;
@@ -187,126 +204,42 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
/**
- * Registers the new schema in a Schema Registry.
+ * Gets a schema descriptor from the configuration storage.
*
- * @param registries Map of schema registries.
- * @param tableId Table id.
- * @param tableName Table name.
- * @param schema Schema descriptor.
- * @return Future that, when complete, will resolve into an updated map of
schema registries
- * (to be used in {@link IncrementalVersionedValue#update}).
+ * @param tableId Table ID.
+ * @param schemaVer Schema version.
+ * @return Schema descriptor.
*/
- private CompletableFuture<Map<Integer, SchemaRegistryImpl>> registerSchema(
- Map<Integer, SchemaRegistryImpl> registries,
- int tableId,
- String tableName,
- SchemaDescriptor schema
- ) {
- ByteArray key = schemaWithVerHistKey(tableId, schema.version());
-
- byte[] serializedSchema =
SchemaSerializerImpl.INSTANCE.serialize(schema);
-
- return metastorageMgr.invoke(notExists(key), put(key,
serializedSchema), noop())
- .thenApply(t -> {
- SchemaRegistryImpl reg = registries.get(tableId);
+ private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int
tableId, int schemaVer) {
+ CompletableFuture<Entry> ent =
metastorageMgr.get(schemaWithVerHistKey(tableId, schemaVer));
- if (reg == null) {
- Map<Integer, SchemaRegistryImpl> copy = new
HashMap<>(registries);
+ return ent.thenApply(e ->
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
+ }
- copy.put(tableId, createSchemaRegistry(tableId,
tableName, schema));
+ /** Saves a schema descriptor to the configuration storage. */
+ private CompletableFuture<Boolean> saveSchemaDescriptor(int tableId,
SchemaDescriptor schema) {
+ ByteArray key = schemaWithVerHistKey(tableId, schema.version());
- return copy;
- } else {
- reg.onSchemaRegistered(schema);
+ byte[] serializedSchema =
SchemaSerializerImpl.INSTANCE.serialize(schema);
- return registries;
- }
- });
+ return metastorageMgr.invoke(notExists(key), put(key,
serializedSchema), noop());
}
/**
* Create schema registry for the table.
*
* @param tableId Table id.
- * @param tableName Table name.
* @param initialSchema Initial schema for the registry.
* @return Schema registry.
*/
- private SchemaRegistryImpl createSchemaRegistry(int tableId, String
tableName, SchemaDescriptor initialSchema) {
+ private SchemaRegistryImpl createSchemaRegistry(int tableId,
SchemaDescriptor initialSchema) {
return new SchemaRegistryImpl(
- ver -> inBusyLock(busyLock, () -> tableSchema(tableId,
tableName, ver)),
+ ver -> inBusyLock(busyLock, () ->
loadSchemaDescriptor(tableId, ver)),
() -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
initialSchema
);
}
- /**
- * Return table schema of certain version from history.
- *
- * @param tblId Table id.
- * @param schemaVer Schema version.
- * @return Schema descriptor.
- */
- private CompletableFuture<SchemaDescriptor> tableSchema(int tblId, String
tableName, int schemaVer) {
- TableConfiguration tblCfg = tablesCfg.tables().get(tableName);
-
- CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
-
- SchemaRegistry registry = registriesVv.latest().get(tblId);
-
- if (registry.lastSchemaVersion() > schemaVer) {
- return getSchemaDescriptor(schemaVer, tblCfg);
- }
-
- CompletionListener<Map<Integer, SchemaRegistryImpl>> schemaListener =
(token, regs, e) -> {
- if (schemaVer <= regs.get(tblId).lastSchemaVersion()) {
- SchemaRegistry registry0 = registriesVv.latest().get(tblId);
-
- SchemaDescriptor desc = registry0.schemaCached(schemaVer);
-
- assert desc != null : "Unexpected empty schema description.";
-
- fut.complete(desc);
- }
- };
-
- registriesVv.whenComplete(schemaListener);
-
- // This check is needed for the case when we have registered
schemaListener,
- // but registriesVv has already been completed, so listener would be
triggered only for the next versioned value update.
- if (checkSchemaVersion(tblId, schemaVer)) {
- registriesVv.removeWhenComplete(schemaListener);
-
- registry = registriesVv.latest().get(tblId);
-
- SchemaDescriptor desc = registry.schemaCached(schemaVer);
-
- assert desc != null : "Unexpected empty schema description.";
-
- fut.complete(desc);
- }
-
- return fut.thenApply(res -> {
- registriesVv.removeWhenComplete(schemaListener);
- return res;
- });
- }
-
- /**
- * Checks that the provided schema version is less or equal than the
latest version from the schema registry.
- *
- * @param tblId Unique table id.
- * @param schemaVer Schema version for the table.
- * @return True, if the schema version is less or equal than the latest
version from the schema registry, false otherwise.
- */
- private boolean checkSchemaVersion(int tblId, int schemaVer) {
- SchemaRegistry registry = registriesVv.latest().get(tblId);
-
- assert registry != null : IgniteStringFormatter.format("Registry for
the table not found [tblId={}]", tblId);
-
- return schemaVer <= registry.lastSchemaVersion();
- }
-
/**
* Try to find schema in cache.
*
@@ -324,19 +257,6 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
}
- /**
- * Gets a schema descriptor from the configuration storage.
- *
- * @param schemaVer Schema version.
- * @param tblCfg Table configuration.
- * @return Schema descriptor.
- */
- private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int
schemaVer, TableConfiguration tblCfg) {
- CompletableFuture<Entry> ent =
metastorageMgr.get(schemaWithVerHistKey(tblCfg.id().value(), schemaVer));
-
- return ent.thenApply(e ->
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
- }
-
/**
* Get the schema registry for the given causality token and table id.
*
@@ -398,6 +318,8 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
busyLock.block();
+
+ IgniteUtils.closeAllManually(registriesVv.latest().values().stream());
}
/**
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index 2e8ecd8918..94022748f1 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
import org.apache.ignite.internal.schema.row.Row;
import org.jetbrains.annotations.Nullable;
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
* beginning.
* @implSpec Initial schema history MAY be registered without the first
outdated versions that could be cleaned up earlier.
*/
-public interface SchemaRegistry {
+public interface SchemaRegistry extends ManuallyCloseable {
/**
* Gets schema descriptor for the latest version if initialized.
*
@@ -97,4 +98,10 @@ public interface SchemaRegistry {
* @return Schema-aware rows. Contains {@code null} at the same positions
as in {@code rows}.
*/
List<Row> resolve(Collection<BinaryRow> rows);
+
+ /**
+ * Closes the registry freeing any resources it holds.
+ */
+ @Override
+ void close();
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 5368222c72..ade5ef054a 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.mapping.ColumnMapper;
import org.apache.ignite.internal.schema.mapping.ColumnMapping;
import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,27 +47,31 @@ public class SchemaRegistryImpl implements SchemaRegistry {
private final Map<Long, ColumnMapper> mappingCache = new
ConcurrentHashMap<>();
/** Schema store. */
- private final IntFunction<CompletableFuture<SchemaDescriptor>> history;
+ private final IntFunction<CompletableFuture<SchemaDescriptor>>
loadSchemaByVersion;
/** The method to provide the latest schema version on cluster. */
private final Supplier<CompletableFuture<Integer>> latestVersionStore;
+ private final PendingComparableValuesTracker<Integer, Void> versionTracker
= new PendingComparableValuesTracker<>(0);
+
/**
* Constructor.
*
- * @param history Schema history.
+ * @param loadSchemaByVersion Schema history.
* @param latestVersionStore The method to provide the latest version of
the schema.
* @param initialSchema Initial schema.
*/
public SchemaRegistryImpl(
- IntFunction<CompletableFuture<SchemaDescriptor>> history,
+ IntFunction<CompletableFuture<SchemaDescriptor>>
loadSchemaByVersion,
Supplier<CompletableFuture<Integer>> latestVersionStore,
SchemaDescriptor initialSchema
) {
- this.history = history;
+ this.loadSchemaByVersion = loadSchemaByVersion;
this.latestVersionStore = latestVersionStore;
schemaCache.put(initialSchema.version(), initialSchema);
+
+ versionTracker.update(initialSchema.version(), null);
}
/** {@inheritDoc} */
@@ -83,7 +88,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
return desc;
}
- CompletableFuture<SchemaDescriptor> descFut = history.apply(ver);
+ CompletableFuture<SchemaDescriptor> descFut = tableSchema(ver);
if (descFut != null) {
// TODO: remove blocking code
https://issues.apache.org/jira/browse/IGNITE-17931
@@ -92,6 +97,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
if (desc != null) {
schemaCache.putIfAbsent(ver, desc);
+ versionTracker.update(ver, null);
return desc;
}
@@ -161,6 +167,11 @@ public class SchemaRegistryImpl implements SchemaRegistry {
return rows;
}
+ @Override
+ public void close() {
+ versionTracker.close();
+ }
+
/**
* Resolves a schema for row. The method is optimal when the latest schema
is already got.
*
@@ -235,6 +246,8 @@ public class SchemaRegistryImpl implements SchemaRegistry {
}
schemaCache.put(desc.version(), desc);
+
+ versionTracker.update(desc.version(), null);
}
/**
@@ -263,4 +276,12 @@ public class SchemaRegistryImpl implements SchemaRegistry {
Map<Long, ColumnMapper> mappingCache() {
return mappingCache;
}
+
+ private CompletableFuture<SchemaDescriptor> tableSchema(int schemaVer) {
+ if (schemaVer < lastSchemaVersion()) {
+ return loadSchemaByVersion.apply(schemaVer);
+ }
+
+ return versionTracker.waitFor(schemaVer).thenApply(unused ->
schemaCached(schemaVer));
+ }
}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index dfd5e33c8b..75f7841c2d 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -18,11 +18,15 @@
package org.apache.ignite.internal.schema.registry;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
import static org.apache.ignite.internal.schema.NativeTypes.INT64;
import static org.apache.ignite.internal.schema.NativeTypes.STRING;
import static
org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
import static
org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -59,7 +63,11 @@ public class SchemaRegistryImplTest {
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valBytesCol", BYTES, true)});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV0);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+ v -> completedFuture(null),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schemaV0
+ );
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNotNull(reg.schema());
@@ -109,7 +117,11 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+ v -> completedFuture(null),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schemaV1
+ );
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
assertNotNull(reg.schema());
@@ -136,8 +148,12 @@ public class SchemaRegistryImplTest {
assertSameSchema(schemaV2, reg.schema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(4));
+ try {
+ assertThat(supplyAsync(() -> reg.schema(3)),
not(completedFuture()));
+ assertThat(supplyAsync(() -> reg.schema(4)),
not(completedFuture()));
+ } finally {
+ reg.close();
+ }
}
/**
@@ -163,7 +179,11 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+ v -> completedFuture(null),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schemaV1
+ );
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -180,7 +200,6 @@ public class SchemaRegistryImplTest {
assertEquals(1, reg.lastSchemaVersion());
assertSameSchema(schemaV1, reg.schema());
assertSameSchema(schemaV1, reg.schema(1));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
// Try to register another schema with same version and check nothing
was registered.
assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(wrongSchema));
@@ -190,7 +209,6 @@ public class SchemaRegistryImplTest {
assertSameSchema(schemaV1, reg.schema());
assertSameSchema(schemaV1, reg.schema(1));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
// Register schema with next version.
reg.onSchemaRegistered(schemaV2);
@@ -231,7 +249,11 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+ v -> completedFuture(null),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schemaV1
+ );
assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
@@ -361,14 +383,12 @@ public class SchemaRegistryImplTest {
assertSameSchema(schemaV2, reg.schema());
assertSameSchema(schemaV1, reg.schema(1));
assertSameSchema(schemaV2, reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
// Register schema with out-of-order version.
assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaRegistered(schemaV4));
assertEquals(2, reg.lastSchemaVersion());
assertSameSchema(schemaV2, reg.schema());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
// Register schema with next version.
reg.onSchemaRegistered(schemaV3);
@@ -545,7 +565,11 @@ public class SchemaRegistryImplTest {
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null, () ->
completedFuture(INITIAL_SCHEMA_VERSION), schemaV1);
+ final SchemaRegistryImpl reg = new SchemaRegistryImpl(
+ v -> completedFuture(null),
+ () -> completedFuture(INITIAL_SCHEMA_VERSION),
+ schemaV1
+ );
final Map<Long, ColumnMapper> cache = reg.mappingCache();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
index cbae76eda9..785f94ad06 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerFixlenOnlyBenchmark.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.Columns;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -108,7 +109,7 @@ public class TupleMarshallerFixlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
+ SchemaRegistry reg = new SchemaRegistryImpl(v ->
completedFuture(null), () -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
@Override
public SchemaDescriptor schema() {
return schema;
@@ -123,7 +124,8 @@ public class TupleMarshallerFixlenOnlyBenchmark {
public int lastSchemaVersion() {
return schema.version();
}
- });
+ };
+ marshaller = new TupleMarshallerImpl(reg);
vals = new Object[schema.valueColumns().length()];
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
index 896353ccb8..4c75ad44a2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/benchmarks/TupleMarshallerVarlenOnlyBenchmark.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.Columns;
import org.apache.ignite.internal.schema.DefaultValueProvider;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -121,7 +122,7 @@ public class TupleMarshallerVarlenOnlyBenchmark {
.toArray(Column[]::new)
);
- marshaller = new TupleMarshallerImpl(new SchemaRegistryImpl(v -> null,
() -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
+ SchemaRegistry reg = new SchemaRegistryImpl(v ->
completedFuture(null), () -> completedFuture(INITIAL_SCHEMA_VERSION), schema) {
@Override
public SchemaDescriptor schema() {
return schema;
@@ -136,7 +137,8 @@ public class TupleMarshallerVarlenOnlyBenchmark {
public int lastSchemaVersion() {
return schema.version();
}
- });
+ };
+ marshaller = new TupleMarshallerImpl(reg);
if (useString) {
final byte[] data = new byte[dataSize / fieldsCount];
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index b9e919d901..314777fa89 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -98,4 +98,9 @@ public class DummySchemaManagerImpl implements SchemaRegistry
{
public List<Row> resolve(Collection<BinaryRow> rows) {
return rows.stream().map(binaryRow -> binaryRow == null ? null :
resolve(binaryRow)).collect(toList());
}
+
+ @Override
+ public void close() {
+ // No-op.
+ }
}