This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 95d87679e1 IGNITE-19363 Split start of indexes and start of partition
raft group nodes (#2177)
95d87679e1 is described below
commit 95d87679e12dc9d8d1ad25f33d25a3029a45613a
Author: Semyon Danilov <[email protected]>
AuthorDate: Fri Jun 16 14:10:22 2023 +0400
IGNITE-19363 Split start of indexes and start of partition raft group nodes
(#2177)
---
.../apache/ignite/internal/index/IndexManager.java | 16 +-
.../ignite/internal/index/IndexManagerTest.java | 15 +-
.../storage/snapshot/SnapshotExecutorImpl.java | 8 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 2 +
.../apache/ignite/internal/table/IndexWrapper.java | 127 +++++
.../apache/ignite/internal/table/TableImpl.java | 92 +---
.../table/distributed/BitSetPartitionSet.java | 86 +++
.../internal/table/distributed/PartitionSet.java | 138 +++++
.../internal/table/distributed/TableManager.java | 592 ++++++++++++---------
.../distributed/AbstractPartitionSetTest.java | 117 ++++
.../table/distributed/BitSetPartitionSetTest.java | 26 +
.../table/distributed/TableManagerTest.java | 42 +-
12 files changed, 908 insertions(+), 353 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 390432bb44..8cbe2eca88 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -66,6 +66,7 @@ import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -465,11 +466,16 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
CompletableFuture<?> fireEventFuture =
fireEvent(IndexEvent.CREATE, new
IndexEventParameters(causalityToken, tableId, indexId, eventIndexDescriptor));
- CompletableFuture<TableImpl> tableFuture =
tableManager.tableAsync(causalityToken, tableId);
+ TableImpl table = tableManager.getTable(tableId);
+
+ assert table != null : tableId;
CompletableFuture<SchemaRegistry> schemaRegistryFuture =
schemaManager.schemaRegistry(causalityToken, tableId);
- CompletableFuture<?> createIndexFuture =
tableFuture.thenAcceptBoth(schemaRegistryFuture, (table, schemaRegistry) -> {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19712 Listen to
assignment changes and start new index storages.
+ CompletableFuture<PartitionSet> tablePartitionFuture =
tableManager.localPartitionSetAsync(causalityToken, tableId);
+
+ CompletableFuture<?> createIndexFuture =
tablePartitionFuture.thenAcceptBoth(schemaRegistryFuture, (partitions,
schemaRegistry) -> {
TableRowToIndexKeyConverter tableRowConverter = new
TableRowToIndexKeyConverter(
schemaRegistry,
eventIndexDescriptor.columns().toArray(STRING_EMPTY_ARRAY)
@@ -478,7 +484,8 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
if (eventIndexDescriptor instanceof SortedIndexDescriptor) {
table.registerSortedIndex(
(StorageSortedIndexDescriptor) storageIndexDescriptor,
- tableRowConverter::convert
+ tableRowConverter::convert,
+ partitions
);
} else {
boolean unique = indexDescriptor.unique();
@@ -486,7 +493,8 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
table.registerHashIndex(
(StorageHashIndexDescriptor) storageIndexDescriptor,
unique,
- tableRowConverter::convert
+ tableRowConverter::convert,
+ partitions
);
if (unique) {
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 4c75272ba3..8230656614 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -50,6 +51,7 @@ import
org.apache.ignite.internal.schema.configuration.index.SortedIndexChange;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteInternalException;
@@ -58,7 +60,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mockito;
/**
* Test class to verify {@link IndexManager}.
@@ -84,11 +85,21 @@ public class IndexManagerTest {
when(tableManagerMock.tableAsync(anyLong(), anyInt())).thenAnswer(inv
-> {
InternalTable tbl = mock(InternalTable.class);
- Mockito.doReturn(inv.getArgument(1)).when(tbl).tableId();
+ doReturn(inv.getArgument(1)).when(tbl).tableId();
return completedFuture(new TableImpl(tbl, new HeapLockManager()));
});
+ when(tableManagerMock.getTable(anyInt())).thenAnswer(inv -> {
+ InternalTable tbl = mock(InternalTable.class);
+
+ doReturn(inv.getArgument(0)).when(tbl).tableId();
+
+ return new TableImpl(tbl, new HeapLockManager());
+ });
+
+ when(tableManagerMock.localPartitionSetAsync(anyLong(),
anyInt())).thenReturn(completedFuture(PartitionSet.EMPTY_SET));
+
SchemaManager schManager = mock(SchemaManager.class);
when(schManager.schemaRegistry(anyLong(),
anyInt())).thenReturn(completedFuture(null));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index f005b3199a..30d966add3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -250,13 +250,7 @@ public class SnapshotExecutorImpl implements
SnapshotExecutor {
final FirstSnapshotLoadDone done = new FirstSnapshotLoadDone(reader);
Requires.requireTrue(this.fsmCaller.onSnapshotLoad(done));
try {
- // TODO: IGNITE-19363 We want to avoid deadlock for now, but this
is an ad-hoc decision.
- // We don't wait for the partition's snapshot load closure to
finish here.
- if (!node.getNodeId().getGroupId().contains("part")) {
- done.waitForRun();
- } else {
- done.status = Status.OK();
- }
+ done.waitForRun();
}
catch (final InterruptedException e) {
LOG.warn("Wait for FirstSnapshotLoadDone run is interrupted.");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 5ae16bd940..963a800946 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -83,6 +84,7 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
*
* @throws Exception If failed.
*/
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19712")
@Test
void assignmentsChangingOnNodeLeaveNodeJoin() throws Exception {
cluster.startAndInit(4);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
new file mode 100644
index 0000000000..23d1d8ea5a
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexWrapper.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.tx.LockManager;
+
+/** Class that creates index storage and locker decorators for given
partition. */
+abstract class IndexWrapper {
+ final InternalTable tbl;
+ final LockManager lockManager;
+ final int indexId;
+ final Function<BinaryRow, BinaryTuple> indexRowResolver;
+
+ private IndexWrapper(InternalTable tbl, LockManager lockManager, int
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver) {
+ this.tbl = tbl;
+ this.lockManager = lockManager;
+ this.indexId = indexId;
+ this.indexRowResolver = indexRowResolver;
+ }
+
+ /**
+ * Creates schema aware index storage wrapper.
+ *
+ * @param partitionId Partition id.
+ */
+ abstract TableSchemaAwareIndexStorage getStorage(int partitionId);
+
+ /**
+ * Creates schema aware index locker.
+ *
+ * @param partitionId Partition id.
+ */
+ abstract IndexLocker getLocker(int partitionId);
+
+ /** {@link IndexWrapper} for sorted indexes. */
+ static class SortedIndexWrapper extends IndexWrapper {
+ SortedIndexWrapper(InternalTable tbl, LockManager lockManager, int
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver) {
+ super(tbl, lockManager, indexId, indexRowResolver);
+ }
+
+ @Override
+ TableSchemaAwareIndexStorage getStorage(int partitionId) {
+ IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+ assert index != null : tbl.name() + " part " + partitionId;
+
+ return new TableSchemaAwareIndexStorage(
+ indexId,
+ index,
+ indexRowResolver
+ );
+ }
+
+ @Override
+ IndexLocker getLocker(int partitionId) {
+ IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+ assert index != null : tbl.name() + " part " + partitionId;
+
+ return new SortedIndexLocker(
+ indexId,
+ partitionId,
+ lockManager,
+ (SortedIndexStorage) index,
+ indexRowResolver
+ );
+ }
+ }
+
+ /** {@link IndexWrapper} for hash indexes. */
+ static class HashIndexWrapper extends IndexWrapper {
+ private final boolean unique;
+
+ HashIndexWrapper(InternalTable tbl, LockManager lockManager, int
indexId, Function<BinaryRow, BinaryTuple> indexRowResolver,
+ boolean unique) {
+ super(tbl, lockManager, indexId, indexRowResolver);
+ this.unique = unique;
+ }
+
+ @Override
+ TableSchemaAwareIndexStorage getStorage(int partitionId) {
+ IndexStorage index = tbl.storage().getIndex(partitionId, indexId);
+
+ assert index != null : tbl.name() + " part " + partitionId;
+
+ return new TableSchemaAwareIndexStorage(
+ indexId,
+ index,
+ indexRowResolver
+ );
+ }
+
+ @Override
+ IndexLocker getLocker(int partitionId) {
+ return new HashIndexLocker(
+ indexId,
+ unique,
+ lockManager,
+ indexRowResolver
+ );
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index f89d9a0d25..09a6137ad5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -39,9 +39,10 @@ import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
-import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.IndexWrapper.HashIndexWrapper;
+import org.apache.ignite.internal.table.IndexWrapper.SortedIndexWrapper;
import org.apache.ignite.internal.table.distributed.IndexLocker;
-import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.tx.LockManager;
@@ -71,8 +72,7 @@ public class TableImpl implements Table {
private final Map<Integer, CompletableFuture<?>> indexesToWait = new
ConcurrentHashMap<>();
- private final Map<Integer, IndexStorageAdapterFactory>
indexStorageAdapterFactories = new ConcurrentHashMap<>();
- private final Map<Integer, IndexLockerFactory> indexLockerFactories = new
ConcurrentHashMap<>();
+ private final Map<Integer, IndexWrapper> indexWrapperById = new
ConcurrentHashMap<>();
/**
* Constructor.
@@ -229,12 +229,12 @@ public class TableImpl implements Table {
public Map<Integer, TableSchemaAwareIndexStorage> get() {
awaitIndexes();
- List<IndexStorageAdapterFactory> factories = new
ArrayList<>(indexStorageAdapterFactories.values());
+ List<IndexWrapper> factories = new
ArrayList<>(indexWrapperById.values());
Map<Integer, TableSchemaAwareIndexStorage> adapters = new
HashMap<>();
- for (IndexStorageAdapterFactory factory : factories) {
- TableSchemaAwareIndexStorage storage =
factory.create(partId);
+ for (IndexWrapper factory : factories) {
+ TableSchemaAwareIndexStorage storage =
factory.getStorage(partId);
adapters.put(storage.id(), storage);
}
@@ -253,12 +253,12 @@ public class TableImpl implements Table {
return () -> {
awaitIndexes();
- List<IndexLockerFactory> factories = new
ArrayList<>(indexLockerFactories.values());
+ List<IndexWrapper> factories = new
ArrayList<>(indexWrapperById.values());
Map<Integer, IndexLocker> lockers = new
HashMap<>(factories.size());
- for (IndexLockerFactory factory : factories) {
- IndexLocker locker = factory.create(partId);
+ for (IndexWrapper factory : factories) {
+ IndexLocker locker = factory.getLocker(partId);
lockers.put(locker.id(), locker);
}
@@ -269,7 +269,7 @@ public class TableImpl implements Table {
/**
* The future completes when the primary key index is ready to use.
*
- * @return Future whcih complete when a primary index for the table is .
+ * @return Future which complete when a primary index for the table is .
*/
public CompletableFuture<Void> pkIndexesReadyFuture() {
var fut = new CompletableFuture<Void>();
@@ -289,27 +289,17 @@ public class TableImpl implements Table {
public void registerHashIndex(
StorageHashIndexDescriptor indexDescriptor,
boolean unique,
- Function<BinaryRow, BinaryTuple> searchRowResolver
+ Function<BinaryRow, BinaryTuple> searchRowResolver,
+ PartitionSet partitions
) {
int indexId = indexDescriptor.id();
- indexLockerFactories.put(
- indexId,
- partitionId -> new HashIndexLocker(
- indexId,
- unique,
- lockManager,
- searchRowResolver
- )
- );
- indexStorageAdapterFactories.put(
- indexId,
- partitionId -> new TableSchemaAwareIndexStorage(
- indexId,
- tbl.storage().getOrCreateHashIndex(partitionId,
indexDescriptor),
- searchRowResolver
- )
- );
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19112 Create
storages once.
+ partitions.stream().forEach(partitionId -> {
+ tbl.storage().getOrCreateHashIndex(partitionId, indexDescriptor);
+ });
+
+ indexWrapperById.put(indexId, new HashIndexWrapper(tbl, lockManager,
indexId, searchRowResolver, unique));
completeWaitIndex(indexId);
}
@@ -322,28 +312,17 @@ public class TableImpl implements Table {
*/
public void registerSortedIndex(
StorageSortedIndexDescriptor indexDescriptor,
- Function<BinaryRow, BinaryTuple> searchRowResolver
+ Function<BinaryRow, BinaryTuple> searchRowResolver,
+ PartitionSet partitions
) {
int indexId = indexDescriptor.id();
- indexLockerFactories.put(
- indexId,
- partitionId -> new SortedIndexLocker(
- indexId,
- partitionId,
- lockManager,
- tbl.storage().getOrCreateSortedIndex(partitionId,
indexDescriptor),
- searchRowResolver
- )
- );
- indexStorageAdapterFactories.put(
- indexId,
- partitionId -> new TableSchemaAwareIndexStorage(
- indexId,
- tbl.storage().getOrCreateSortedIndex(partitionId,
indexDescriptor),
- searchRowResolver
- )
- );
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19112 Create
storages once.
+ partitions.stream().forEach(partitionId -> {
+ tbl.storage().getOrCreateSortedIndex(partitionId, indexDescriptor);
+ });
+
+ indexWrapperById.put(indexId, new SortedIndexWrapper(tbl, lockManager,
indexId, searchRowResolver));
completeWaitIndex(indexId);
}
@@ -354,8 +333,7 @@ public class TableImpl implements Table {
* @param indexId An index id to unregister.
*/
public void unregisterIndex(int indexId) {
- indexLockerFactories.remove(indexId);
- indexStorageAdapterFactories.remove(indexId);
+ indexWrapperById.remove(indexId);
completeWaitIndex(indexId);
@@ -386,18 +364,6 @@ public class TableImpl implements Table {
indexesToWait.values().forEach(future ->
future.completeExceptionally(closeTableException));
}
- @FunctionalInterface
- private interface IndexLockerFactory {
- /** Creates the index decorator for given partition. */
- IndexLocker create(int partitionId);
- }
-
- @FunctionalInterface
- private interface IndexStorageAdapterFactory {
- /** Creates the index decorator for given partition. */
- TableSchemaAwareIndexStorage create(int partitionId);
- }
-
/**
* Adds indexes to wait, if not already created, before inserting data
into the table.
*
@@ -411,7 +377,7 @@ public class TableImpl implements Table {
return awaitIndexFuture;
}
- if (indexStorageAdapterFactories.containsKey(indexId) &&
indexLockerFactories.containsKey(indexId)) {
+ if (indexWrapperById.containsKey(indexId)) {
// Index is already registered and created.
return null;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
new file mode 100644
index 0000000000..a3a0854d2c
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.BitSet;
+import java.util.stream.IntStream;
+
+/**
+ * {@link BitSet} implementation of the {@link PartitionSet}.
+ */
+public class BitSetPartitionSet implements PartitionSet {
+ /** Backing BitSet. */
+ private final BitSet backingSet;
+
+ public BitSetPartitionSet() {
+ this.backingSet = new BitSet();
+ }
+
+ private BitSetPartitionSet(BitSet backingSet) {
+ this.backingSet = backingSet;
+ }
+
+ @Override
+ public void set(int partitionId) {
+ backingSet.set(partitionId);
+ }
+
+ @Override
+ public boolean get(int partitionId) {
+ return backingSet.get(partitionId);
+ }
+
+ @Override
+ public IntStream stream() {
+ return backingSet.stream();
+ }
+
+ @Override
+ public int size() {
+ return backingSet.cardinality();
+ }
+
+ @Override
+ public PartitionSet copy() {
+ return new BitSetPartitionSet((BitSet) backingSet.clone());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null) {
+ return false;
+ }
+
+ if (!(o instanceof BitSetPartitionSet)) {
+ return isEqual(o);
+ }
+
+ BitSetPartitionSet that = (BitSetPartitionSet) o;
+
+ return backingSet.equals(that.backingSet);
+ }
+
+ @Override
+ public int hashCode() {
+ return getHashCode();
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
new file mode 100644
index 0000000000..7d8244bb03
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionSet.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.PrimitiveIterator.OfInt;
+import java.util.stream.IntStream;
+
+/**
+ * Represents a collection of partition IDs.
+ */
+public interface PartitionSet {
+ PartitionSet EMPTY_SET = new PartitionSet() {
+ @Override
+ public void set(int partitionId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean get(int partitionId) {
+ return false;
+ }
+
+ @Override
+ public IntStream stream() {
+ return IntStream.empty();
+ }
+
+ @Override
+ public PartitionSet copy() {
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return getHashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return isEqual(obj);
+ }
+ };
+
+ /**
+ * Adds the partition to the partition set.
+ *
+ * @param partitionId Partition ID.
+ */
+ void set(int partitionId);
+
+ /**
+ * Returns {@code true} if partition with {@code partitionId} is present
in this set, {@code false} otherwise.
+ *
+ * @param partitionId Partition ID.
+ */
+ boolean get(int partitionId);
+
+ /**
+ * Returns a stream of partition IDs in this set.
+ * The IDs are returned in order, from lowest to highest.
+ */
+ IntStream stream();
+
+ /**
+ * Returns count of partitions.
+ */
+ int size();
+
+ /**
+ * Returns a copy of this {@link PartitionSet}.
+ */
+ PartitionSet copy();
+
+ /**
+ * Returns {@code true} if this partition set is equal to the parameter,
{@code false} otherwise.
+ *
+ * @param another Object to be compared for equality with this partition
set.
+ */
+ default boolean isEqual(Object another) {
+ if (!(another instanceof PartitionSet)) {
+ return false;
+ }
+
+ PartitionSet anotherSet = (PartitionSet) another;
+
+ if (size() != anotherSet.size()) {
+ return false;
+ }
+
+ OfInt iterator1 = stream().iterator();
+ OfInt iterator2 = anotherSet.stream().iterator();
+
+ while (iterator1.hasNext() && iterator2.hasNext()) {
+ if (iterator1.nextInt() != iterator2.nextInt()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Returns the hash code value for this partition set.
+ */
+ default int getHashCode() {
+ int h = 0;
+
+ OfInt iter = stream().iterator();
+
+ while (iter.hasNext()) {
+ int idx = iter.nextInt();
+
+ h += idx;
+ }
+
+ return h;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 57eafeb922..c392d7a019 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -60,7 +60,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -275,6 +274,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private final IncrementalVersionedValue<Map<Integer, TableImpl>>
tablesByIdVv;
+ /** Versioned store for local partition set by table id. */
+ private final IncrementalVersionedValue<Map<Integer, PartitionSet>>
localPartsByTableIdVv;
+
/**
* Versioned store for tracking RAFT groups initialization and starting
completion.
* Only explicitly updated in {@link #createTablePartitionsLocally(long,
List, int, TableImpl)}.
@@ -287,6 +289,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private final Map<Integer, TableImpl> pendingTables = new
ConcurrentHashMap<>();
+ /** Started tables. */
+ private final Map<Integer, TableImpl> startedTables = new
ConcurrentHashMap<>();
+
/** Resolver that resolves a node consistent ID to cluster node. */
private final Function<String, ClusterNode> clusterNodeResolver;
@@ -427,7 +432,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver = new PlacementDriver(replicaSvc, clusterNodeResolver);
tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
- assignmentsUpdatedVv = new
IncrementalVersionedValue<>(dependingOn(tablesByIdVv));
+
+ localPartsByTableIdVv = new IncrementalVersionedValue<>(registry,
HashMap::new);
+
+ assignmentsUpdatedVv = new
IncrementalVersionedValue<>(dependingOn(localPartsByTableIdVv));
txStateStorageScheduledPool =
Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(nodeName,
"tx-state-storage-scheduled-pool", LOG));
@@ -691,8 +699,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
int partitions = newAssignments.size();
CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process
assignments and set partitions only for assigned partitions.
+ PartitionSet parts = new BitSetPartitionSet();
+
for (int i = 0; i < futures.length; i++) {
futures[i] = new CompletableFuture<>();
+
+ parts.set(i);
}
String localMemberName = localNode().name();
@@ -715,121 +729,110 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TablePartitionId replicaGrpId = new TablePartitionId(tableId,
partId);
- placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
+ placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
+ .collect(toList()));
- PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeTracker =
- new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
- PendingComparableValuesTracker<Long, Void> storageIndexTracker
=
- new PendingComparableValuesTracker<>(0L);
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
+ new HybridTimestamp(1, 0)
+ );
+ var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
((InternalTableImpl)
internalTbl).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
- CompletableFuture<PartitionStorages> partitionStoragesFut =
getOrCreatePartitionStorages(table, partId);
+ PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
- CompletableFuture<PartitionDataStorage>
partitionDataStorageFut = partitionStoragesFut
- .thenApply(partitionStorages ->
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
- internalTbl, partId));
-
- CompletableFuture<PartitionUpdateHandlers>
partitionUpdateHandlersFut = partitionDataStorageFut
- .thenApply(storage -> {
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- storage,
- table,
- getZoneById(zonesConfig,
zoneId).dataStorage(),
- safeTimeTracker
- );
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId);
- mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ table,
+ getZoneById(zonesConfig, zoneId).dataStorage(),
+ safeTimeTracker
+ );
- return partitionUpdateHandlers;
- });
+ mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
CompletableFuture<Void> startGroupFut;
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (localMemberAssignment != null) {
- startGroupFut =
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
- CompletableFuture<Boolean> fut;
-
- // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
- // from the Raft group in order to avoid the double
vote problem.
- // <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
- if (internalTbl.storage().isVolatile()) {
- fut = queryDataNodesCount(tableId, partId,
newConfiguration.peers()).thenApply(dataNodesCount -> {
- boolean fullPartitionRestart = dataNodesCount
== 0;
-
- if (fullPartitionRestart) {
- return true;
- }
+ CompletableFuture<Boolean> shouldStartGroupFut;
+
+ // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
+ // from the Raft group in order to avoid the double vote
problem.
+ // <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
+ if (internalTbl.storage().isVolatile()) {
+ shouldStartGroupFut = queryDataNodesCount(tableId,
partId, newConfiguration.peers()).thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart = dataNodesCount == 0;
+
+ if (fullPartitionRestart) {
+ return true;
+ }
- boolean majorityAvailable = dataNodesCount >=
(newConfiguration.peers().size() / 2) + 1;
+ boolean majorityAvailable = dataNodesCount >=
(newConfiguration.peers().size() / 2) + 1;
- if (majorityAvailable) {
-
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment,
metaStorageMgr);
+ if (majorityAvailable) {
+ RebalanceUtil.startPeerRemoval(replicaGrpId,
localMemberAssignment, metaStorageMgr);
- return false;
- } else {
- // No majority and not a full partition
restart - need to restart nodes
- // with current partition.
- String msg = "Unable to start partition "
+ partId + ". Majority not available.";
+ return false;
+ } else {
+ // No majority and not a full partition
restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start partition " +
partId + ". Majority not available.";
- throw new IgniteInternalException(msg);
- }
- });
- } else {
- fut = completedFuture(true);
+ throw new IgniteInternalException(msg);
+ }
+ });
+ } else {
+ shouldStartGroupFut = completedFuture(true);
+ }
+
+ startGroupFut =
shouldStartGroupFut.thenAcceptAsync(startGroup -> inBusyLock(busyLock, () -> {
+ if (!startGroup) {
+ return;
}
+ TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
- return fut.thenCompose(startGroup ->
inBusyLock(busyLock, () -> {
- if (!startGroup) {
- return completedFuture(null);
- }
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(
+ internalTbl.storage(),
+ internalTbl.txStateStorage(),
+ partitionKey(internalTbl, partId),
+ partitionUpdateHandlers
+ );
- return
partitionDataStorageFut.thenAcceptBothAsync(partitionUpdateHandlersFut,
- (partitionDataStorage,
partitionUpdateHandlers) -> inBusyLock(busyLock, () -> {
- TxStateStorage txStatePartitionStorage
= partitionStorages.getTxStateStorage();
-
- RaftGroupOptions groupOptions =
groupOptionsForPartition(
- internalTbl.storage(),
- internalTbl.txStateStorage(),
- partitionKey(internalTbl,
partId),
- partitionUpdateHandlers
- );
-
- Peer serverPeer =
newConfiguration.peer(localMemberName);
-
- var raftNodeId = new
RaftNodeId(replicaGrpId, serverPeer);
-
- try {
- // TODO: use RaftManager
interface, see https://issues.apache.org/jira/browse/IGNITE-18273
- ((Loza)
raftMgr).startRaftGroupNode(
- raftNodeId,
- newConfiguration,
- new PartitionListener(
-
partitionDataStorage,
-
partitionUpdateHandlers.storageUpdateHandler,
-
txStatePartitionStorage,
- safeTimeTracker,
- storageIndexTracker
- ),
- new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- replicaGrpId,
- busyLock,
-
createPartitionMover(internalTbl, partId),
-
this::calculateAssignments,
- rebalanceScheduler
- ),
- groupOptions
- );
- } catch (NodeStoppingException ex) {
- throw new CompletionException(ex);
- }
- }), ioExecutor);
- }));
- }, ioExecutor);
+ Peer serverPeer =
newConfiguration.peer(localMemberName);
+
+ var raftNodeId = new RaftNodeId(replicaGrpId,
serverPeer);
+
+ try {
+ // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
+ ((Loza) raftMgr).startRaftGroupNode(
+ raftNodeId,
+ newConfiguration,
+ new PartitionListener(
+ partitionDataStorage,
+
partitionUpdateHandlers.storageUpdateHandler,
+ txStatePartitionStorage,
+ safeTimeTracker,
+ storageIndexTracker
+ ),
+ new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ replicaGrpId,
+ busyLock,
+ createPartitionMover(internalTbl,
partId),
+ this::calculateAssignments,
+ rebalanceScheduler
+ ),
+ groupOptions
+ );
+ } catch (NodeStoppingException ex) {
+ throw new CompletionException(ex);
+ }
+ }), ioExecutor);
} else {
startGroupFut = completedFuture(null);
}
@@ -843,55 +846,54 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return failedFuture(ex);
}
}), ioExecutor)
- .thenComposeAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
+ .thenAcceptAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
((InternalTableImpl)
internalTbl).updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
if (localMemberAssignment == null) {
- return completedFuture(null);
+ return;
}
- return
partitionStoragesFut.thenAcceptBoth(partitionUpdateHandlersFut,
- (partitionStorages,
partitionUpdateHandlers) -> {
- MvPartitionStorage partitionStorage =
partitionStorages.getMvPartitionStorage();
- TxStateStorage txStateStorage =
partitionStorages.getTxStateStorage();
-
- try {
- replicaMgr.startReplica(
- replicaGrpId,
- allOf(
- ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
-
table.pkIndexesReadyFuture()
- ),
- new
PartitionReplicaListener(
- partitionStorage,
-
updatedRaftGroupService,
- txManager,
- lockMgr,
-
scanRequestExecutor,
- partId,
- tableId,
-
table.indexesLockers(partId),
- new Lazy<>(() ->
table.indexStorageAdapters(partId).get().get(table.pkId())),
- () ->
table.indexStorageAdapters(partId).get(),
- clock,
- safeTimeTracker,
- txStateStorage,
- placementDriver,
-
partitionUpdateHandlers.storageUpdateHandler,
- new
NonHistoricSchemas(schemaManager),
-
schemaManager.schemaRegistry(causalityToken, tableId),
- localNode(),
-
table.internalTable().storage(),
- indexBuilder,
- tablesCfg
- ),
- updatedRaftGroupService,
- storageIndexTracker
- );
- } catch (NodeStoppingException ex) {
- throw new AssertionError("Loza was
stopped before Table manager", ex);
- }
- });
+ MvPartitionStorage partitionStorage =
partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStateStorage =
partitionStorages.getTxStateStorage();
+
+ try {
+ replicaMgr.startReplica(
+ replicaGrpId,
+ allOf(
+ ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
+ table.pkIndexesReadyFuture()
+ ),
+ new PartitionReplicaListener(
+ partitionStorage,
+ updatedRaftGroupService,
+ txManager,
+ lockMgr,
+ scanRequestExecutor,
+ partId,
+ tableId,
+ table.indexesLockers(partId),
+ new Lazy<>(
+ () ->
table.indexStorageAdapters(partId).get().get(table.pkId())
+ ),
+ () ->
table.indexStorageAdapters(partId).get(),
+ clock,
+ safeTimeTracker,
+ txStateStorage,
+ placementDriver,
+
partitionUpdateHandlers.storageUpdateHandler,
+ new
NonHistoricSchemas(schemaManager),
+
schemaManager.schemaRegistry(causalityToken, tableId),
+ localNode(),
+
table.internalTable().storage(),
+ indexBuilder,
+ tablesCfg
+ ),
+ updatedRaftGroupService,
+ storageIndexTracker
+ );
+ } catch (NodeStoppingException ex) {
+ throw new AssertionError("Loza was stopped
before Table manager", ex);
+ }
}), ioExecutor)
.whenComplete((res, ex) -> {
if (ex != null) {
@@ -905,17 +907,27 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return allOf(futures);
};
- CompletableFuture<Void> updateAssignmentsFuture =
tablesByIdVv.get(causalityToken).thenComposeAsync(
- tablesById -> inBusyLock(busyLock, updateAssignmentsClosure),
- ioExecutor
- );
+ return localPartsByTableIdVv.update(causalityToken, (previous,
throwable) -> inBusyLock(busyLock, () -> {
+ return getOrCreatePartitionStorages(table, parts).thenApply(u -> {
+ var newValue = new HashMap<>(previous);
- return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
+ newValue.put(tableId, parts);
- return updateAssignmentsFuture;
+ return newValue;
+ });
+ })).thenCompose(unused -> {
+ CompletableFuture<Void> updateAssignmentsFuture =
tablesByIdVv.get(causalityToken).thenComposeAsync(
+ tablesById -> inBusyLock(busyLock,
updateAssignmentsClosure),
+ ioExecutor
+ );
+
+ return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ return updateAssignmentsFuture;
+ });
});
}
@@ -1195,6 +1207,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
createTablePartitionsLocally(causalityToken, assignments,
zoneDescriptor.id(), table);
pendingTables.put(tableId, table);
+ startedTables.put(tableId, table);
tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock,
() -> {
pendingTables.remove(tableId);
@@ -1303,6 +1316,17 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.thenCompose((notUsed) ->
mvGc.removeStorage(replicationGroupId));
}
+ localPartsByTableIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ var newMap = new HashMap<>(previousVal);
+ newMap.remove(tableId);
+
+ return completedFuture(newMap);
+ }));
+
tablesByIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
@@ -1334,6 +1358,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.thenApply(v -> map);
}));
+ startedTables.remove(tableId);
+
fireEvent(TableEvent.DROP, new
TableEventParameters(causalityToken, tableId))
.whenComplete((v, e) -> {
if (e != null) {
@@ -1737,15 +1763,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.collect(toList());
}
- /**
- * Collects a list of direct index ids.
- *
- * @return A list of direct index ids.
- */
- private List<UUID> directIndexIds() {
- return directProxy(tablesCfg.indexes()).internalIds();
- }
-
/**
* Gets direct id of table with {@code tblName}.
*
@@ -1858,6 +1875,24 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
}
+ /**
+ * Asynchronously gets the local partitions set of a table using causality
token.
+ *
+ * @param causalityToken Causality token.
+ * @return Future.
+ */
+ public CompletableFuture<PartitionSet> localPartitionSetAsync(long
causalityToken, int tableId) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return
localPartsByTableIdVv.get(causalityToken).thenApply(partitionSetById ->
partitionSetById.get(tableId));
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/** {@inheritDoc} */
@Override
public TableImpl tableImpl(String name) {
@@ -2061,6 +2096,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
try {
return handleChangePendingAssignmentEvent(
+ evt.revision(),
replicaGrpId,
tables.get(tblId),
pendingAssignmentsWatchEntry,
@@ -2074,6 +2110,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+ long causalityToken,
TablePartitionId replicaGrpId,
TableImpl tbl,
Entry pendingAssignmentsEntry,
@@ -2086,6 +2123,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CatalogTableDescriptor tableDescriptor =
getTableDescriptor(tbl.tableId());
+ int tableId = tbl.tableId();
int partId = replicaGrpId.partitionId();
byte[] stableAssignmentsBytes = stableAssignmentsEntry.value();
@@ -2119,97 +2157,108 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> localServicesStartFuture;
if (shouldStartLocalServices) {
- ((InternalTableImpl)
internalTable).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
+ localServicesStartFuture =
localPartsByTableIdVv.get(causalityToken).thenComposeAsync(oldMap -> {
+ PartitionSet partitionSet = oldMap.get(tableId).copy();
- localServicesStartFuture = getOrCreatePartitionStorages(tbl,
partId)
- .thenAcceptAsync(partitionStorages -> {
- MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
- TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
+ return getOrCreatePartitionStorages(tbl,
partitionSet).thenApply(u -> {
+ var newMap = new HashMap<>(oldMap);
- PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
+ newMap.put(tableId, partitionSet);
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- partitionDataStorage,
- tbl,
- getZoneById(zonesConfig,
tableDescriptor.zoneId()).dataStorage(),
- safeTimeTracker
- );
+ return newMap;
+ });
+ }).thenComposeAsync(unused -> {
+ PartitionStorages partitionStorages =
getPartitionStorages(tbl, partId);
- RaftGroupOptions groupOptions =
groupOptionsForPartition(
- internalTable.storage(),
- internalTable.txStateStorage(),
- partitionKey(internalTable, partId),
- partitionUpdateHandlers
- );
+ MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
- RaftGroupListener raftGrpLsnr = new PartitionListener(
- partitionDataStorage,
- partitionUpdateHandlers.storageUpdateHandler,
- txStatePartitionStorage,
- safeTimeTracker,
- storageIndexTracker
- );
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
- RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
- replicaGrpId,
- busyLock,
- createPartitionMover(internalTable, partId),
- this::calculateAssignments,
- rebalanceScheduler
- );
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ tbl,
+ getZoneById(zonesConfig,
tableDescriptor.zoneId()).dataStorage(),
+ safeTimeTracker
+ );
- Peer serverPeer =
pendingConfiguration.peer(localMember.name());
+ RaftGroupOptions groupOptions = groupOptionsForPartition(
+ internalTable.storage(),
+ internalTable.txStateStorage(),
+ partitionKey(internalTable, partId),
+ partitionUpdateHandlers
+ );
- var raftNodeId = new RaftNodeId(replicaGrpId,
serverPeer);
+ RaftGroupListener raftGrpLsnr = new PartitionListener(
+ partitionDataStorage,
+ partitionUpdateHandlers.storageUpdateHandler,
+ txStatePartitionStorage,
+ safeTimeTracker,
+ storageIndexTracker
+ );
- try {
- // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
- ((Loza) raftMgr).startRaftGroupNode(
- raftNodeId,
- stableConfiguration,
- raftGrpLsnr,
- raftGrpEvtsLsnr,
- groupOptions
- );
+ RaftGroupEventsListener raftGrpEvtsLsnr = new
RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ replicaGrpId,
+ busyLock,
+ createPartitionMover(internalTable, partId),
+ this::calculateAssignments,
+ rebalanceScheduler
+ );
- replicaMgr.startReplica(
- replicaGrpId,
- allOf(
- ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
- tbl.pkIndexesReadyFuture()
- ),
- new PartitionReplicaListener(
- mvPartitionStorage,
-
internalTable.partitionRaftGroupService(partId),
- txManager,
- lockMgr,
- scanRequestExecutor,
- partId,
- tbl.tableId(),
- tbl.indexesLockers(partId),
- new Lazy<>(() ->
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
- () ->
tbl.indexStorageAdapters(partId).get(),
- clock,
- safeTimeTracker,
- txStatePartitionStorage,
- placementDriver,
-
partitionUpdateHandlers.storageUpdateHandler,
- new
NonHistoricSchemas(schemaManager),
-
completedFuture(schemaManager.schemaRegistry(tbl.tableId())),
- localNode(),
- internalTable.storage(),
- indexBuilder,
- tablesCfg
- ),
- (TopologyAwareRaftGroupService)
internalTable.partitionRaftGroupService(partId),
- storageIndexTracker
- );
- } catch (NodeStoppingException ignored) {
- // no-op
- }
- }, ioExecutor);
+ Peer serverPeer =
pendingConfiguration.peer(localMember.name());
+
+ var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
+
+ return runAsync(() -> inBusyLock(busyLock, () -> {
+ try {
+ // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
+ ((Loza) raftMgr).startRaftGroupNode(
+ raftNodeId,
+ stableConfiguration,
+ raftGrpLsnr,
+ raftGrpEvtsLsnr,
+ groupOptions
+ );
+
+ replicaMgr.startReplica(
+ replicaGrpId,
+ allOf(
+ ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
+ tbl.pkIndexesReadyFuture()
+ ),
+ new PartitionReplicaListener(
+ mvPartitionStorage,
+
internalTable.partitionRaftGroupService(partId),
+ txManager,
+ lockMgr,
+ scanRequestExecutor,
+ partId,
+ tableId,
+ tbl.indexesLockers(partId),
+ new Lazy<>(() ->
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
+ () ->
tbl.indexStorageAdapters(partId).get(),
+ clock,
+ safeTimeTracker,
+ txStatePartitionStorage,
+ placementDriver,
+
partitionUpdateHandlers.storageUpdateHandler,
+ new NonHistoricSchemas(schemaManager),
+
completedFuture(schemaManager.schemaRegistry(tableId)),
+ localNode(),
+ internalTable.storage(),
+ indexBuilder,
+ tablesCfg
+ ),
+ (TopologyAwareRaftGroupService)
internalTable.partitionRaftGroupService(partId),
+ storageIndexTracker
+ );
+ } catch (NodeStoppingException ignored) {
+ // No-op.
+ }
+ }), ioExecutor);
+ });
} else {
localServicesStartFuture = completedFuture(null);
}
@@ -2356,32 +2405,50 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Creates partition stores. If one of the storages has not completed the
rebalance, then the storages are cleared.
+ * Gets partition stores.
*
* @param table Table.
* @param partitionId Partition ID.
- * @return Future of creating or getting partition stores.
+ * @return PartitionStorages.
*/
- // TODO: IGNITE-18939 Create storages only once, then only get them
- private CompletableFuture<PartitionStorages>
getOrCreatePartitionStorages(TableImpl table, int partitionId) {
+ private PartitionStorages getPartitionStorages(TableImpl table, int
partitionId) {
InternalTable internalTable = table.internalTable();
MvPartitionStorage mvPartition =
internalTable.storage().getMvPartition(partitionId);
- return (mvPartition != null ? completedFuture(mvPartition) :
internalTable.storage().createMvPartition(partitionId))
- .thenComposeAsync(mvPartitionStorage -> {
- TxStateStorage txStateStorage =
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
+ assert mvPartition != null;
- if (mvPartitionStorage.persistedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS
- || txStateStorage.persistedIndex() ==
TxStateStorage.REBALANCE_IN_PROGRESS) {
- return allOf(
-
internalTable.storage().clearPartition(partitionId),
- txStateStorage.clear()
- ).thenApply(unused -> new
PartitionStorages(mvPartitionStorage, txStateStorage));
- } else {
- return completedFuture(new
PartitionStorages(mvPartitionStorage, txStateStorage));
- }
- }, ioExecutor);
+ TxStateStorage txStateStorage =
internalTable.txStateStorage().getTxStateStorage(partitionId);
+
+ assert txStateStorage != null;
+
+ return new PartitionStorages(mvPartition, txStateStorage);
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create
storages only once.
+ private CompletableFuture<Void> getOrCreatePartitionStorages(TableImpl
table, PartitionSet partitions) {
+ InternalTable internalTable = table.internalTable();
+
+ CompletableFuture<?>[] storageFuts =
partitions.stream().mapToObj(partitionId -> {
+ MvPartitionStorage mvPartition =
internalTable.storage().getMvPartition(partitionId);
+
+ return (mvPartition != null ? completedFuture(mvPartition) :
internalTable.storage().createMvPartition(partitionId))
+ .thenComposeAsync(mvPartitionStorage -> {
+ TxStateStorage txStateStorage =
internalTable.txStateStorage().getOrCreateTxStateStorage(partitionId);
+
+ if (mvPartitionStorage.persistedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS
+ || txStateStorage.persistedIndex() ==
TxStateStorage.REBALANCE_IN_PROGRESS) {
+ return allOf(
+
internalTable.storage().clearPartition(partitionId),
+ txStateStorage.clear()
+ ).thenApply(unused -> new
PartitionStorages(mvPartitionStorage, txStateStorage));
+ } else {
+ return completedFuture(new
PartitionStorages(mvPartitionStorage, txStateStorage));
+ }
+ }, ioExecutor);
+ }).toArray(CompletableFuture[]::new);
+
+ return allOf(storageFuts);
}
/**
@@ -2529,6 +2596,15 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return new PartitionUpdateHandlers(storageUpdateHandler,
indexUpdateHandler, gcUpdateHandler);
}
+ /**
+ * Returns a table instance if it exists, {@code null} otherwise.
+ *
+ * @param tableId Table id.
+ */
+ public @Nullable TableImpl getTable(int tableId) {
+ return startedTables.get(tableId);
+ }
+
private @Nullable CatalogTableDescriptor getTableDescriptor(int id) {
TableView tableView = findTableView(tablesCfg.value(), id);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
new file mode 100644
index 0000000000..1c5c534d37
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractPartitionSetTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Base tests for {@link PartitionSet}. */
+abstract class AbstractPartitionSetTest {
+ private PartitionSet partitionSet;
+
+ @BeforeEach
+ void setUp() {
+ partitionSet = createSet();
+ }
+
+ @Test
+ public void testSetPartition() {
+ int partCount = 65_000;
+
+ // Set every even partition.
+ for (int i = 0; i < partCount; i++) {
+ if ((i % 2) == 0) {
+ partitionSet.set(i);
+ }
+ }
+
+ // Check that only partitions that were set are actually set.
+ for (int i = 0; i < partCount; i++) {
+ boolean isSet = partitionSet.get(i);
+
+ assertEquals((i % 2) == 0, isSet);
+ }
+ }
+
+ @Test
+ public void testStream() {
+ int partCount = 65_000;
+
+ int setPartitionsCount = 0;
+
+ // Set every even partition.
+ for (int i = 0; i < partCount; i++) {
+ if ((i % 2) == 0) {
+ partitionSet.set(i);
+ setPartitionsCount++;
+ }
+ }
+
+ var realCount = new AtomicInteger();
+
+ // Check that only partitions that were set are actually set.
+ partitionSet.stream().forEach(value -> {
+ assertTrue((value % 2) == 0);
+ realCount.incrementAndGet();
+ });
+
+ assertEquals(setPartitionsCount, realCount.get());
+ }
+
+ /**
+ * Tests that copy is equal, but is not the same, as the copied set.
+ */
+ @Test
+ public void testCopy() {
+ PartitionSet copy = partitionSet.copy();
+
+ assertEquals(partitionSet, copy);
+ assertEquals(partitionSet.hashCode(), copy.hashCode());
+ assertNotSame(partitionSet, copy);
+ }
+
+ /**
+ * Tests that empty sets of different classes are equal.
+ */
+ @Test
+ public void testEmptyEqual() {
+ assertEquals(partitionSet, PartitionSet.EMPTY_SET);
+ assertEquals(PartitionSet.EMPTY_SET, partitionSet);
+
+ assertEquals(partitionSet.hashCode(),
PartitionSet.EMPTY_SET.hashCode());
+ assertEquals(PartitionSet.EMPTY_SET.hashCode(),
partitionSet.hashCode());
+ }
+
+ @Test
+ public void testDifferentNotEqual() {
+ partitionSet.set(0);
+
+ PartitionSet anotherSet = createSet();
+ anotherSet.set(1);
+
+ assertNotEquals(partitionSet, anotherSet);
+ }
+
+ abstract PartitionSet createSet();
+}
\ No newline at end of file
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
new file mode 100644
index 0000000000..54888676ca
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/BitSetPartitionSetTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+/** Tests of the {@link BitSetPartitionSet} implementation of {@link
PartitionSet}. */
+class BitSetPartitionSetTest extends AbstractPartitionSetTest {
+ @Override
+ PartitionSet createSet() {
+ return new BitSetPartitionSet();
+ }
+}
\ No newline at end of file
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a281cb67b4..d9ac117a58 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZoneManag
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -318,7 +319,6 @@ public class TableManagerTest extends IgniteAbstractTest {
createDistributionZone();
tblsCfg.tables().change(tablesChange -> {
-
tablesChange.create(scmTbl.name(), tableChange -> {
(SchemaConfigurationConverter.convert(scmTbl, tableChange))
.changeZoneId(ZONE_ID);
@@ -654,19 +654,13 @@ public class TableManagerTest extends IgniteAbstractTest {
when(rm.raftNodeReadyFuture(any())).thenReturn(completedFuture(1L));
when(bm.nodes()).thenReturn(Set.of(node));
- distributionZonesConfiguration.distributionZones().change(zones -> {
+
assertThat(distributionZonesConfiguration.distributionZones().change(zones -> {
zones.create(ZONE_NAME, ch -> {
ch.changeZoneId(ZONE_ID);
ch.changePartitions(1);
ch.changeReplicas(1);
});
- }).join();
-
- mockMetastore();
-
- TableManager tableManager = createTableManager(tblManagerFut);
-
- tblManagerFut.complete(tableManager);
+ }), willSucceedFast());
var txStateStorage = mock(TxStateStorage.class);
var mvPartitionStorage = mock(MvPartitionStorage.class);
@@ -679,21 +673,19 @@ public class TableManagerTest extends IgniteAbstractTest {
when(mvPartitionStorage.persistedIndex()).thenReturn(MvPartitionStorage.REBALANCE_IN_PROGRESS);
}
+
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
when(txStateStorage.clear()).thenReturn(completedFuture(null));
- // We need to mock storages inside a configuration listener because of
how mocks are created in the Table Manager,
- // see "createTableManager".
- tblsCfg.tables().any().listen(ctx -> {
- // For some reason, "when(something).thenReturn" does not work on
spies, but this notation works.
-
doReturn(txStateStorage).when(txStateTableStorage).getOrCreateTxStateStorage(anyInt());
-
doReturn(txStateStorage).when(txStateTableStorage).getTxStateStorage(anyInt());
+ mockMetastore();
+ // For some reason, "when(something).thenReturn" does not work on
spies, but this notation works.
+ TableManager tableManager = createTableManager(tblManagerFut,
(mvTableStorage) -> {
doReturn(completedFuture(mvPartitionStorage)).when(mvTableStorage).createMvPartition(anyInt());
doReturn(mvPartitionStorage).when(mvTableStorage).getMvPartition(anyInt());
-
doReturn(mock(PartitionTimestampCursor.class)).when(mvPartitionStorage).scan(any());
doReturn(completedFuture(null)).when(mvTableStorage).clearPartition(anyInt());
-
- return completedFuture(null);
+ }, (txStateTableStorage) -> {
+
doReturn(txStateStorage).when(txStateTableStorage).getOrCreateTxStateStorage(anyInt());
+
doReturn(txStateStorage).when(txStateTableStorage).getTxStateStorage(anyInt());
});
TableDefinition scmTbl = SchemaBuilders.tableBuilder("PUBLIC",
PRECONFIGURED_TABLE_NAME).columns(
@@ -839,13 +831,21 @@ public class TableManagerTest extends IgniteAbstractTest {
return tbl2;
}
+ private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut) {
+ return createTableManager(tblManagerFut, unused -> {}, unused -> {});
+ }
+
/**
* Creates Table manager.
*
* @param tblManagerFut Future to wrap Table manager.
+ * @param tableStorageDecorator Table storage spy decorator.
+ * @param txStateTableStorageDecorator Tx state table storage spy
decorator.
+ *
* @return Table manager.
*/
- private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut) {
+ private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
+ Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
VaultManager vaultManager = mock(VaultManager.class);
when(vaultManager.get(any(ByteArray.class))).thenReturn(completedFuture(null));
@@ -881,6 +881,8 @@ public class TableManagerTest extends IgniteAbstractTest {
protected MvTableStorage createTableStorage(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
mvTableStorage = spy(super.createTableStorage(tableDescriptor,
zoneDescriptor));
+ tableStorageDecorator.accept(mvTableStorage);
+
return mvTableStorage;
}
@@ -891,6 +893,8 @@ public class TableManagerTest extends IgniteAbstractTest {
) {
txStateTableStorage =
spy(super.createTxStateTableStorage(tableDescriptor, zoneDescriptor));
+ txStateTableStorageDecorator.accept(txStateTableStorage);
+
return txStateTableStorage;
}
};