This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7ef48f0fa87 IGNITE-27192 Optimize catalog access in CatalogManager
(#7089)
7ef48f0fa87 is described below
commit 7ef48f0fa875c25a72d7327f97935a2ecf6329a8
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Dec 2 17:59:04 2025 +0300
IGNITE-27192 Optimize catalog access in CatalogManager (#7089)
---
.../ignite/internal/catalog/CatalogByIndexMap.java | 158 +++++++++++++++++++++
.../internal/catalog/CatalogManagerImpl.java | 66 ++++++---
.../internal/catalog/CatalogByIndexMapTest.java | 109 ++++++++++++++
3 files changed, 312 insertions(+), 21 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogByIndexMap.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogByIndexMap.java
new file mode 100644
index 00000000000..9b148993a8d
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogByIndexMap.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An immutable map-like structure that maps long keys to {@link Catalog}
values.
+ *
+ * <p>
+ * The keys are stored in a sorted array to allow efficient binary search
lookups.
+ */
+class CatalogByIndexMap {
+ private final long[] keys;
+
+ private final Catalog[] values;
+
+ CatalogByIndexMap() {
+ this(new long[0], new Catalog[0]);
+ }
+
+ private CatalogByIndexMap(long[] keys, Catalog[] values) {
+ this.keys = keys;
+ this.values = values;
+ }
+
+ /**
+ * Returns the first (lowest) key in the map.
+ */
+ long firstKey() {
+ checkNotEmpty();
+
+ return keys[0];
+ }
+
+ /**
+ * Returns the last (highest) key in the map.
+ */
+ long lastKey() {
+ checkNotEmpty();
+
+ return keys[keys.length - 1];
+ }
+
+ /**
+ * Returns the first (earliest) value in the map.
+ */
+ Catalog firstValue() {
+ checkNotEmpty();
+
+ return values[0];
+ }
+
+ /**
+ * Returns the last (latest) value in the map.
+ */
+ Catalog lastValue() {
+ checkNotEmpty();
+
+ return values[values.length - 1];
+ }
+
+ /**
+ * Returns the value associated with the given key, or {@code null} if the
key is not present.
+ */
+ @Nullable Catalog get(long key) {
+ int idx = Arrays.binarySearch(keys, key);
+
+ return idx >= 0 ? values[idx] : null;
+ }
+
+ /**
+ * Returns the greatest value associated with a key less than or equal to
the given key.
+ */
+ @Nullable Catalog floorValue(long key) {
+ int idx = Arrays.binarySearch(keys, key);
+
+ if (idx < 0) {
+ // Get an index before the insertion point.
+ idx = ~idx - 1;
+ }
+
+ return idx >= 0 ? values[idx] : null;
+ }
+
+ /**
+ * Returns a new map with the given key and catalog added as the latest.
If the key already exists, its value is updated.
+ */
+ CatalogByIndexMap appendOrUpdate(long key, Catalog catalog) {
+ int length = this.keys.length;
+
+ if (length == 0) {
+ return new CatalogByIndexMap(new long[] {key}, new Catalog[]
{catalog});
+ } else {
+ int idx = Arrays.binarySearch(keys, key);
+ assert idx >= 0 || key > lastKey()
+ : "Keys must be inserted in ascending order [keys=" +
Arrays.toString(keys) + ", newKey=" + key + ']';
+
+ if (idx < 0) {
+ long[] keys = Arrays.copyOf(this.keys, length + 1);
+ keys[length] = key;
+
+ Catalog[] values = Arrays.copyOf(this.values, length + 1);
+ values[length] = catalog;
+
+ return new CatalogByIndexMap(keys, values);
+ } else {
+ Catalog[] values = this.values.clone();
+ values[idx] = catalog;
+
+ return new CatalogByIndexMap(keys, values);
+ }
+ }
+ }
+
+ /**
+ * Returns a new map with all entries strictly before the given key
removed.
+ */
+ CatalogByIndexMap clearHead(long keyToLeave) {
+ int idx = Arrays.binarySearch(keys, keyToLeave);
+
+ assert idx >= 0;
+
+ if (idx == 0) {
+ return this;
+ } else {
+ int length = this.keys.length;
+
+ long[] keys = Arrays.copyOfRange(this.keys, idx, length);
+ Catalog[] values = Arrays.copyOfRange(this.values, idx, length);
+
+ return new CatalogByIndexMap(keys, values);
+ }
+ }
+
+ private void checkNotEmpty() {
+ if (keys.length == 0) {
+ throw new NoSuchElementException();
+ }
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 26be9cd28dd..ffbb4771e78 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -33,10 +33,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -79,10 +77,10 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
private static final IgniteLogger LOG =
Loggers.forClass(CatalogManagerImpl.class);
/** Versioned catalog descriptors. */
- private final NavigableMap<Integer, Catalog> catalogByVer = new
ConcurrentSkipListMap<>();
+ private final AtomicReference<CatalogByIndexMap> catalogByVer = new
AtomicReference<>(new CatalogByIndexMap());
/** Versioned catalog descriptors sorted in chronological order. */
- private final NavigableMap<Long, Catalog> catalogByTs = new
ConcurrentSkipListMap<>();
+ private final AtomicReference<CatalogByIndexMap> catalogByTs = new
AtomicReference<>(new CatalogByIndexMap());
/** A future that completes when an empty catalog is initialised. If
catalog is not empty this future when this completes starts. */
private final CompletableFuture<Void> catalogInitializationFuture = new
CompletableFuture<>();
@@ -173,17 +171,17 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
@Override
public int earliestCatalogVersion() {
- return catalogByVer.firstEntry().getKey();
+ return (int) catalogByVer.get().firstKey();
}
@Override
public Catalog earliestCatalog() {
- return catalogByVer.firstEntry().getValue();
+ return catalogByVer.get().firstValue();
}
@Override
public Catalog latestCatalog() {
- return catalogByVer.lastEntry().getValue();
+ return catalogByVer.get().lastValue();
}
@Override
@@ -198,7 +196,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
@Override
public Catalog catalog(int catalogVersion) {
- Catalog catalog = catalogByVer.get(catalogVersion);
+ Catalog catalog = catalogByVer.get().get(catalogVersion);
if (catalog == null) {
throw new CatalogNotFoundException("Catalog version not found: " +
catalogVersion);
@@ -213,13 +211,13 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
}
private Catalog catalogAt(long timestamp) {
- Entry<Long, Catalog> entry = catalogByTs.floorEntry(timestamp);
+ Catalog catalog = catalogByTs.get().floorValue(timestamp);
- if (entry == null) {
+ if (catalog == null) {
throw new CatalogNotFoundException("Catalog not found for given
timestamp: " + timestamp);
}
- return entry.getValue();
+ return catalog;
}
@Override
@@ -273,13 +271,39 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
}
private void registerCatalog(Catalog newCatalog) {
- catalogByVer.put(newCatalog.version(), newCatalog);
- catalogByTs.put(newCatalog.time(), newCatalog);
+ while (true) {
+ CatalogByIndexMap catalogMap = catalogByVer.get();
+
+ if (catalogByVer.compareAndSet(catalogMap,
catalogMap.appendOrUpdate(newCatalog.version(), newCatalog))) {
+ break;
+ }
+ }
+
+ while (true) {
+ CatalogByIndexMap catalogMap = catalogByTs.get();
+
+ if (catalogByTs.compareAndSet(catalogMap,
catalogMap.appendOrUpdate(newCatalog.time(), newCatalog))) {
+ break;
+ }
+ }
}
private void truncateUpTo(Catalog catalog) {
- catalogByVer.headMap(catalog.version(), false).clear();
- catalogByTs.headMap(catalog.time(), false).clear();
+ while (true) {
+ CatalogByIndexMap catalogMap = catalogByVer.get();
+
+ if (catalogByVer.compareAndSet(catalogMap,
catalogMap.clearHead(catalog.version()))) {
+ break;
+ }
+ }
+
+ while (true) {
+ CatalogByIndexMap catalogMap = catalogByTs.get();
+
+ if (catalogByTs.compareAndSet(catalogMap,
catalogMap.clearHead(catalog.time()))) {
+ break;
+ }
+ }
LOG.info("Catalog history was truncated up to version=" +
catalog.version());
}
@@ -295,7 +319,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
if (errUnwrapped instanceof
CatalogVersionAwareValidationException) {
CatalogVersionAwareValidationException err0 =
(CatalogVersionAwareValidationException) errUnwrapped;
- Catalog catalog = catalogByVer.get(err0.version());
+ Catalog catalog =
catalogByVer.get().get(err0.version());
Throwable error = err0.initial();
if (catalog.version() == 0) {
@@ -342,7 +366,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
}
private CompletableFuture<Integer> awaitVersionActivation(int version) {
- Catalog catalog = catalogByVer.get(version);
+ Catalog catalog = catalogByVer.get().get(version);
HybridTimestamp tsSafeForRoReadingInPastOptimization =
calcClusterWideEnsureActivationTime(catalog);
@@ -372,7 +396,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
return failedFuture(new IgniteInternalException(INTERNAL_ERR,
"Max retry limit exceeded: " + attemptNo));
}
- Catalog catalog = catalogByVer.lastEntry().getValue();
+ Catalog catalog = catalogByVer.get().lastValue();
BitSet applyResults = new BitSet(updateProducers.size());
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();
@@ -417,7 +441,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
})
.thenCompose(result -> {
if (result) {
- long newCatalogTime =
catalogByVer.get(newVersion).time();
+ long newCatalogTime =
catalogByVer.get().get(newVersion).time();
return completedFuture(new
CatalogApplyResult(applyResults, newVersion, newCatalogTime));
}
@@ -456,7 +480,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
private CompletableFuture<Void> handle(VersionedUpdate update,
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
int version = update.version();
- Catalog catalog = catalogByVer.get(version - 1);
+ Catalog catalog = catalogByVer.get().get(version - 1);
assert catalog != null : version - 1;
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogByIndexMapTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogByIndexMapTest.java
new file mode 100644
index 00000000000..690602b3466
--- /dev/null
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogByIndexMapTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for {@link CatalogByIndexMap}.
+ */
+class CatalogByIndexMapTest extends BaseIgniteAbstractTest {
+ private CatalogByIndexMap map;
+ private final Catalog catalog1 = mock(Catalog.class);
+ private final Catalog catalog2 = mock(Catalog.class);
+
+ @BeforeEach
+ void setUp() {
+ map = new CatalogByIndexMap();
+ }
+
+ @Test
+ void testPutAndGet() {
+ map = map.appendOrUpdate(1, catalog1);
+ map = map.appendOrUpdate(2, catalog2);
+
+ assertNull(map.get(0));
+ assertSame(catalog1, map.get(1));
+ assertSame(catalog2, map.get(2));
+ assertNull(map.get(3));
+ }
+
+ @Test
+ void testFirstLast() {
+ map = map.appendOrUpdate(1, catalog1);
+ map = map.appendOrUpdate(2, catalog2);
+
+ assertEquals(1, map.firstKey());
+ assertEquals(2, map.lastKey());
+
+ assertSame(catalog1, map.firstValue());
+ assertSame(catalog2, map.lastValue());
+ }
+
+ @Test
+ void testFloorValue() {
+ map = map.appendOrUpdate(1, catalog1);
+ map = map.appendOrUpdate(3, catalog2);
+
+ assertNull(map.floorValue(0));
+ assertSame(catalog1, map.floorValue(1));
+ assertSame(catalog1, map.floorValue(2));
+ assertSame(catalog2, map.floorValue(3));
+ assertSame(catalog2, map.floorValue(4));
+ }
+
+ @Test
+ void testOverwrite() {
+ map = map.appendOrUpdate(1, catalog1);
+ map = map.appendOrUpdate(1, catalog2);
+
+ assertSame(catalog2, map.get(1));
+ }
+
+ @Test
+ void testGetNonExistent() {
+ assertNull(map.get(42));
+ }
+
+ @Test
+ void testClearEmptyHead() {
+ map = map.appendOrUpdate(1, catalog1);
+
+ map = map.clearHead(1);
+
+ assertSame(catalog1, map.get(1));
+ }
+
+ @Test
+ void testClearNonEmptyHead() {
+ map = map.appendOrUpdate(1, catalog1);
+ map = map.appendOrUpdate(2, catalog2);
+
+ map = map.clearHead(2);
+
+ assertNull(map.get(1));
+ assertSame(catalog2, map.get(2));
+ }
+}