This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-20680 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d4232bd068ca65fc28afcb4e1bfa9c03746932b6 Author: amashenkov <[email protected]> AuthorDate: Wed Jan 31 17:29:22 2024 +0300 Implement TABLE_DESTROY and INDEX_DESTROY catalog events, and postpone table and index data removal for historical queries purposes. --- .../apache/ignite/internal/catalog/Catalog.java | 9 +++ .../internal/catalog/CatalogManagerImpl.java | 55 ++++++++++++++++--- .../internal/catalog/events/CatalogEvent.java | 6 ++ .../internal/catalog/events/DestroyIndexEvent.java | 64 ++++++++++++++++++++++ ...eters.java => DestroyIndexEventParameters.java} | 37 ++++++++----- .../internal/catalog/events/DestroyTableEvent.java | 61 +++++++++++++++++++++ .../events/DestroyTableEventParameters.java | 51 +++++++++++++++++ .../catalog/events/DropIndexEventParameters.java | 14 +---- ...ntParameters.java => IndexEventParameters.java} | 38 +++++++------ .../events/MakeIndexAvailableEventParameters.java | 13 +---- .../events/StartBuildingIndexEventParameters.java | 13 +---- .../catalog/events/TableEventParameters.java | 2 +- .../ignite/internal/catalog/storage/UpdateLog.java | 2 +- .../internal/catalog/CatalogManagerSelfTest.java | 26 ++++++++- .../handler/ClientPrimaryReplicaTracker.java | 17 +++--- .../apache/ignite/internal/index/IndexManager.java | 26 ++++++--- .../apache/ignite/internal/table/TableImpl.java | 2 - .../internal/table/distributed/TableManager.java | 29 ++++------ 18 files changed, 353 insertions(+), 112 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java index c7668e7eaa..50e98a9083 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java @@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry; import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -170,6 +171,14 @@ public class Catalog { return zonesByName.values(); } + IntSet managedTables() { + return tablesById.keySet(); + } + + IntSet managedIndexes() { + return indexesById.keySet(); + } + @Override public String toString() { return S.toString(this); diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index aa99fcdfff..54177e1f3c 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -25,6 +25,9 @@ import static org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParam import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import it.unimi.dsi.fastutil.ints.IntCollection; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -45,6 +48,8 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyIndexEvent; +import org.apache.ignite.internal.catalog.events.DestroyTableEvent; import org.apache.ignite.internal.catalog.storage.Fireable; import org.apache.ignite.internal.catalog.storage.SnapshotEntry; import org.apache.ignite.internal.catalog.storage.UpdateEntry; @@ -373,8 +378,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata } /** - * Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts - * until the max retry count is reached. + * Attempts to save a versioned update using a CAS-like logic. If the attempt fails, makes more attempts until the max retry count is + * reached. * * @param updateProducer Supplies simple updates to include into a versioned update to install. * @param attemptNo Ordinal number of an attempt. @@ -437,21 +442,56 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata @Override public CompletableFuture<Void> handle(UpdateLogEvent event, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { if (event instanceof SnapshotEntry) { - return handle((SnapshotEntry) event); + return handle((SnapshotEntry) event, causalityToken); } return handle((VersionedUpdate) event, metaStorageUpdateTimestamp, causalityToken); } - private CompletableFuture<Void> handle(SnapshotEntry event) { + private CompletableFuture<Void> handle(SnapshotEntry event, long causalityToken) { Catalog catalog = event.snapshot(); + // Collect destroy events for dropped tables/indexes. + List<Fireable> events = new ArrayList<>(); + IntSet managedTables = catalog.managedTables(); + IntSet managedIndexes = catalog.managedIndexes(); + Collection<Catalog> droppedCatalogVersions = catalogByVer.subMap(earliestCatalogVersion(), catalog.version()).values(); + + // Dropped indexes + droppedCatalogVersions.forEach(old -> { + old.managedIndexes().intStream() + .filter(id -> !managedIndexes.contains(id)) + .filter(new IntOpenHashSet()::add) // unique + .forEach(id -> events.add(new DestroyIndexEvent(id, old.index(id).tableId()))); + }); + // Dropped tables + droppedCatalogVersions.stream().map(Catalog::managedTables) + .flatMapToInt(IntCollection::intStream) + .filter(id -> !managedTables.contains(id)) + .filter(new IntOpenHashSet()::add) // unique + .forEach(id -> events.add(new DestroyTableEvent(id))); + // On recovery phase, we must register catalog from the snapshot. // In other cases, it is ok to rewrite an existed version, because it's exactly the same. registerCatalog(catalog); truncateUpTo(catalog); - return nullCompletedFuture(); + List<CompletableFuture<?>> eventFutures = new ArrayList<>(events.size()); + + for (Fireable fireEvent : events) { + eventFutures.add(fireEvent( + fireEvent.eventType(), + fireEvent.createEventParameters(causalityToken, catalog.version()) + )); + } + + return allOf(eventFutures.toArray(CompletableFuture[]::new)) + .whenComplete((ignore, err) -> { + if (err != null) { + LOG.warn("Failed to compact catalog.", err); + //TODO: IGNITE-14611 Pass exception to an error handler? + } + }); } private CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) { @@ -503,7 +543,7 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata assert activationTimestamp > catalog.time() : "Activation timestamp " + activationTimestamp + " must be greater than previous catalog version activation timestamp " - + catalog.time(); + + catalog.time(); return new Catalog( update.version(), @@ -615,8 +655,7 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata } /** - * A container that keeps given descriptor along with name of the schema this - * descriptor belongs to. + * A container that keeps given descriptor along with name of the schema this descriptor belongs to. */ private static class SchemaAwareDescriptor<T> { private final T descriptor; diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java index 77558e0365..85b88e3d25 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java @@ -32,6 +32,9 @@ public enum CatalogEvent implements Event { /** This event is fired when a table has been renamed or a column has been modified, added to, or removed from a table. */ TABLE_ALTER, + /** This event is fired, when all Catalog versions with a table were purged from the history. */ + TABLE_DESTROY, + /** This event is fired, when an index was created in Catalog. */ INDEX_CREATE, @@ -44,6 +47,9 @@ public enum CatalogEvent implements Event { /** This event is fired when the index becomes available, i.e. the index has been built. */ INDEX_AVAILABLE, + /** This event is fired, when all Catalog versions with an index were purged from the history. */ + INDEX_DESTROY, + /** This event is fired, when a distribution zone was created in Catalog. */ ZONE_CREATE, diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java new file mode 100644 index 0000000000..d807b2f581 --- /dev/null +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog.events; + +import org.apache.ignite.internal.catalog.storage.Fireable; + +/** {@link CatalogEvent#INDEX_DESTROY} event. */ +public class DestroyIndexEvent implements Fireable { + private final int indexId; + private final int tableId; + + /** + * Constructor. + * + * @param indexId An id of dropping index. + * @param tableId Table ID for which the index was removed. + */ + public DestroyIndexEvent(int indexId, int tableId) { + this.indexId = indexId; + this.tableId = tableId; + } + + @Override + public CatalogEvent eventType() { + return CatalogEvent.INDEX_DESTROY; + } + + @Override + public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) { + return new DestroyIndexEventParameters(causalityToken, catalogVersion, indexId, tableId); + } +} diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java similarity index 52% copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java index 96b0d16942..e33fd9df73 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java @@ -15,15 +15,29 @@ * limitations under the License. */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.catalog.events; /** - * Drop index event parameters that contains an id of dropped index. + * Destroy index event parameters contains an id of destroying index. */ -public class DropIndexEventParameters extends CatalogEventParameters { - - private final int indexId; - +public class DestroyIndexEventParameters extends IndexEventParameters { private final int tableId; /** @@ -31,21 +45,14 @@ public class DropIndexEventParameters extends CatalogEventParameters { * * @param causalityToken Causality token. * @param catalogVersion Catalog version. - * @param indexId An id of dropped index. + * @param indexId An Id of destroying index. * @param tableId Table ID for which the index was removed. */ - public DropIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) { - super(causalityToken, catalogVersion); - - this.indexId = indexId; + DestroyIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) { + super(causalityToken, catalogVersion, indexId); this.tableId = tableId; } - /** Returns an id of dropped index. */ - public int indexId() { - return indexId; - } - /** Returns table ID for which the index was removed. */ public int tableId() { return tableId; diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java new file mode 100644 index 0000000000..b47375c271 --- /dev/null +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog.events; + +import org.apache.ignite.internal.catalog.storage.Fireable; + +/** {@link CatalogEvent#TABLE_DESTROY} event. */ +public class DestroyTableEvent implements Fireable { + private final int tableId; + + /** + * Constructor. + * + * @param tableId An id of dropped table. + */ + public DestroyTableEvent(int tableId) { + this.tableId = tableId; + } + + @Override + public CatalogEvent eventType() { + return CatalogEvent.TABLE_DESTROY; + } + + @Override + public CatalogEventParameters createEventParameters(long causalityToken, int catalogVersion) { + return new DestroyTableEventParameters(causalityToken, catalogVersion, tableId); + } +} diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java new file mode 100644 index 0000000000..2d61da1878 --- /dev/null +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog.events; + +/** + * Destroy table event parameters contains an id of destroying table. + */ +public class DestroyTableEventParameters extends TableEventParameters { + /** + * Constructor. + * + * @param causalityToken Causality token. + * @param catalogVersion Catalog version. + * @param tableId An Id of destroying table. + */ + DestroyTableEventParameters(long causalityToken, int catalogVersion, int tableId) { + super(causalityToken, catalogVersion, tableId); + } +} diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java index 96b0d16942..caabc569ba 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java @@ -20,10 +20,7 @@ package org.apache.ignite.internal.catalog.events; /** * Drop index event parameters that contains an id of dropped index. */ -public class DropIndexEventParameters extends CatalogEventParameters { - - private final int indexId; - +public class DropIndexEventParameters extends IndexEventParameters { private final int tableId; /** @@ -35,17 +32,10 @@ public class DropIndexEventParameters extends CatalogEventParameters { * @param tableId Table ID for which the index was removed. */ public DropIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) { - super(causalityToken, catalogVersion); - - this.indexId = indexId; + super(causalityToken, catalogVersion, indexId); this.tableId = tableId; } - /** Returns an id of dropped index. */ - public int indexId() { - return indexId; - } - /** Returns table ID for which the index was removed. */ public int tableId() { return tableId; diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java similarity index 51% copy from modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java copy to modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java index 96b0d16942..0e3a3cc093 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java @@ -15,39 +15,45 @@ * limitations under the License. */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.catalog.events; /** - * Drop index event parameters that contains an id of dropped index. + * Event that is related to an index. */ -public class DropIndexEventParameters extends CatalogEventParameters { - +public abstract class IndexEventParameters extends CatalogEventParameters { private final int indexId; - private final int tableId; - /** * Constructor. * * @param causalityToken Causality token. * @param catalogVersion Catalog version. - * @param indexId An id of dropped index. - * @param tableId Table ID for which the index was removed. + * @param indexId ID of the index to which the event relates. */ - public DropIndexEventParameters(long causalityToken, int catalogVersion, int indexId, int tableId) { + IndexEventParameters(long causalityToken, int catalogVersion, int indexId) { super(causalityToken, catalogVersion); - this.indexId = indexId; - this.tableId = tableId; } - /** Returns an id of dropped index. */ + /** Returns an id of a modified index. */ public int indexId() { return indexId; } - - /** Returns table ID for which the index was removed. */ - public int tableId() { - return tableId; - } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java index c055ec619e..ab53b12f7f 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.catalog.events; /** {@link CatalogEvent#INDEX_AVAILABLE} event parameters. */ -public class MakeIndexAvailableEventParameters extends CatalogEventParameters { - private final int indexId; - +public class MakeIndexAvailableEventParameters extends IndexEventParameters { /** * Constructor. * @@ -29,13 +27,6 @@ public class MakeIndexAvailableEventParameters extends CatalogEventParameters { * @param indexId Index ID. */ public MakeIndexAvailableEventParameters(long causalityToken, int catalogVersion, int indexId) { - super(causalityToken, catalogVersion); - - this.indexId = indexId; - } - - /** Returns index ID. */ - public int indexId() { - return indexId; + super(causalityToken, catalogVersion, indexId); } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java index 797b75de53..d41e3aeff6 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.catalog.events; /** {@link CatalogEvent#INDEX_BUILDING} event parameters. */ -public class StartBuildingIndexEventParameters extends CatalogEventParameters { - private final int indexId; - +public class StartBuildingIndexEventParameters extends IndexEventParameters { /** * Constructor. * @@ -29,13 +27,6 @@ public class StartBuildingIndexEventParameters extends CatalogEventParameters { * @param indexId Index ID. */ public StartBuildingIndexEventParameters(long causalityToken, int catalogVersion, int indexId) { - super(causalityToken, catalogVersion); - - this.indexId = indexId; - } - - /** Returns index ID. */ - public int indexId() { - return indexId; + super(causalityToken, catalogVersion, indexId); } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java index 089a1be64e..5383a9835d 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java @@ -30,7 +30,7 @@ public abstract class TableEventParameters extends CatalogEventParameters { * @param catalogVersion Catalog version. * @param tableId ID of the table to which the event relates. */ - public TableEventParameters(long causalityToken, int catalogVersion, int tableId) { + TableEventParameters(long causalityToken, int catalogVersion, int tableId) { super(causalityToken, catalogVersion); this.tableId = tableId; } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java index ba1c7b495c..c8cd790836 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java @@ -46,7 +46,7 @@ public interface UpdateLog extends IgniteComponent { * * @param snapshotEntry An entry, which represents a result of merging updates of previous versions. * @return A {@code true} if snapshot has been successfully appended, {@code false} otherwise - * if a snapshot with the same or greater version already exists, or snapshots are not supported. + * if a snapshot with the same or greater version already exists. */ CompletableFuture<Boolean> saveSnapshot(SnapshotEntry snapshotEntry); diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index c3428141e0..ea4949bb94 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -48,6 +48,7 @@ import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus. import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -127,6 +128,8 @@ import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.DropColumnEventParameters; import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; import org.apache.ignite.internal.catalog.events.DropTableEventParameters; @@ -1132,12 +1135,13 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { } @Test - public void testTableEvents() { + public void testTableEvents() throws InterruptedException { EventListener<CatalogEventParameters> eventListener = mock(EventListener.class); when(eventListener.notify(any(), any())).thenReturn(falseCompletedFuture()); manager.listen(CatalogEvent.TABLE_CREATE, eventListener); manager.listen(CatalogEvent.TABLE_DROP, eventListener); + manager.listen(CatalogEvent.TABLE_DESTROY, eventListener); assertThat(manager.execute(simpleTable(TABLE_NAME)), willBe(nullValue())); verify(eventListener).notify(any(CreateTableEventParameters.class), isNull()); @@ -1145,11 +1149,19 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertThat(manager.execute(dropTableCommand(TABLE_NAME)), willBe(nullValue())); verify(eventListener).notify(any(DropTableEventParameters.class), isNull()); + verifyNoMoreInteractions(eventListener); + clearInvocations(eventListener); + + // Table destroyed after Catalog compaction. + assertThat(manager.compactCatalog(clock.nowLong()), willBe(nullValue())); + waitForCondition(() -> manager.earliestCatalogVersion() == manager.latestCatalogVersion(), 2_000); + verify(eventListener).notify(any(DestroyTableEventParameters.class), isNull()); + verifyNoMoreInteractions(eventListener); } @Test - public void testCreateIndexEvents() { + public void testIndexEvents() throws InterruptedException { CatalogCommand createIndexCmd = createHashIndexCommand(INDEX_NAME, List.of("ID")); CatalogCommand dropIndexCmd = DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build(); @@ -1159,6 +1171,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { manager.listen(CatalogEvent.INDEX_CREATE, eventListener); manager.listen(CatalogEvent.INDEX_DROP, eventListener); + manager.listen(CatalogEvent.INDEX_DESTROY, eventListener); // Try to create index without table. assertThat(manager.execute(createIndexCmd), willThrow(TableNotFoundValidationException.class)); @@ -1180,6 +1193,15 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertThat(manager.execute(dropIndexCmd), willBe(nullValue())); verify(eventListener).notify(any(DropIndexEventParameters.class), isNull()); + verifyNoMoreInteractions(eventListener); + clearInvocations(eventListener); + + // Index destroyed after Catalog compaction. + assertThat(manager.compactCatalog(clock.nowLong()), willBe(nullValue())); + waitForCondition(() -> manager.earliestCatalogVersion() == manager.latestCatalogVersion(), 2_000); + verify(eventListener).notify(any(DestroyIndexEventParameters.class), isNull()); + + verifyNoMoreInteractions(eventListener); clearInvocations(eventListener); // Drop table with pk index. diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java index 9095615e98..945ea7634c 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; -import org.apache.ignite.internal.catalog.events.DropTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.event.EventParameters; import org.apache.ignite.internal.hlc.HybridClock; @@ -284,8 +284,8 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter } private CompletableFuture<Boolean> notifyInternal(EventParameters parameters) { - if (parameters instanceof DropTableEventParameters) { - removeTable((DropTableEventParameters) parameters); + if (parameters instanceof DestroyTableEventParameters) { + removeTable((DestroyTableEventParameters) parameters); return falseCompletedFuture(); } @@ -308,18 +308,19 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter return falseCompletedFuture(); // false: don't remove listener. } - private void removeTable(DropTableEventParameters dropTableEvent) { + private void removeTable(DestroyTableEventParameters destroyTableEvent) { // Use previous version of the catalog to get the dropped table. - int prevCatalogVersion = dropTableEvent.catalogVersion() - 1; + int prevCatalogVersion = destroyTableEvent.catalogVersion() - 1; - CatalogTableDescriptor table = catalogService.table(dropTableEvent.tableId(), prevCatalogVersion); - assert table != null : "Table from DropTableEventParameters not found: " + dropTableEvent.tableId(); + //TODO: avoid getting a descriptor. + CatalogTableDescriptor table = catalogService.table(destroyTableEvent.tableId(), prevCatalogVersion); + assert table != null : "Table from DropTableEventParameters not found: " + destroyTableEvent.tableId(); CatalogZoneDescriptor zone = catalogService.zone(table.zoneId(), prevCatalogVersion); assert zone != null : "Zone from DropTableEventParameters not found: " + table.zoneId(); for (int partition = 0; partition < zone.partitions(); partition++) { - TablePartitionId tablePartitionId = new TablePartitionId(dropTableEvent.tableId(), partition); + TablePartitionId tablePartitionId = new TablePartitionId(destroyTableEvent.tableId(), partition); primaryReplicas.remove(tablePartitionId); } } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index fd2c1082c9..3cf85765e5 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.index; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; +import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; @@ -44,7 +46,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; -import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -139,6 +141,14 @@ public class IndexManager implements IgniteComponent { return onIndexCreate((CreateIndexEventParameters) parameters); }); + catalogService.listen(INDEX_DESTROY, (parameters, exception) -> { + if (exception != null) { + return failedFuture(exception); + } + + return onIndexDestroy((DestroyIndexEventParameters) parameters); + }); + LOG.info("Index manager started"); return nullCompletedFuture(); @@ -177,27 +187,27 @@ public class IndexManager implements IgniteComponent { return mvTableStoragesByIdVv.get(causalityToken).thenApply(mvTableStoragesById -> mvTableStoragesById.get(tableId)); } - // TODO: IGNITE-20121 Unregister index only before we physically start deleting the index before truncate catalog - private CompletableFuture<Boolean> onIndexDrop(DropIndexEventParameters parameters) { + private CompletableFuture<Boolean> onIndexDestroy(DestroyIndexEventParameters parameters) { int indexId = parameters.indexId(); int tableId = parameters.tableId(); long causalityToken = parameters.causalityToken(); - CompletableFuture<TableViewInternal> tableFuture = tableManager.tableAsync(causalityToken, tableId); + // TODO: fix getting table ??? + TableViewInternal table = tableManager.getTable(tableId); return inBusyLockAsync(busyLock, () -> mvTableStoragesByIdVv.update( causalityToken, - updater(mvTableStorageById -> tableFuture.thenApply(table -> inBusyLock(busyLock, () -> { + updater(mvTableStorageById -> inBusyLockAsync(busyLock, () -> { if (table != null) { // In case of DROP TABLE the table will be removed first. table.unregisterIndex(indexId); - return mvTableStorageById; + return completedFuture(mvTableStorageById); } else { - return removeMvTableStorageIfPresent(mvTableStorageById, tableId); + return completedFuture(removeMvTableStorageIfPresent(mvTableStorageById, tableId)); } - }))) + })) )).thenApply(unused -> false); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java index f12ddf0b31..508b912970 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java @@ -307,8 +307,6 @@ public class TableImpl implements TableViewInternal { indexWrapperById.remove(indexId); completeWaitIndex(indexId); - - // TODO: IGNITE-19150 Also need to destroy the index storages } private void awaitIndexes() { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index d1190184b1..d7a213a443 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -87,7 +87,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; -import org.apache.ignite.internal.catalog.events.DropTableEventParameters; +import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters; import org.apache.ignite.internal.catalog.events.RenameTableEventParameters; import org.apache.ignite.internal.causality.CompletionListener; import org.apache.ignite.internal.causality.IncrementalVersionedValue; @@ -538,10 +538,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return onTableCreate((CreateTableEventParameters) parameters).thenApply(unused -> false); }); - catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, exception) -> { + catalogService.listen(CatalogEvent.TABLE_DESTROY, (parameters, exception) -> { assert exception == null : parameters; - return onTableDelete(((DropTableEventParameters) parameters)).thenApply(unused -> false); + return onTableDestroy(((DestroyTableEventParameters) parameters)).thenApply(unused -> false); }); catalogService.listen(CatalogEvent.TABLE_ALTER, (parameters, exception) -> { @@ -688,17 +688,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private CompletableFuture<Void> onTableDelete(DropTableEventParameters parameters) { + private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { - long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); + long causalityToken = localPartsByTableIdVv.latestCausalityToken(); - int tableId = parameters.tableId(); - - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tableId, catalogVersion - 1); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion - 1); - - dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor); + tablesByIdVv.get(causalityToken).thenAccept(map -> dropTableLocally(causalityToken, parameters)); return nullCompletedFuture(); }); @@ -1338,12 +1332,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * Drops local structures for a table. * * @param causalityToken Causality token. - * @param tableDescriptor Catalog table descriptor. - * @param zoneDescriptor Catalog distributed zone descriptor. + * @param parameters Destroy table event parameters. */ - private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) { - int tableId = tableDescriptor.id(); - int partitions = zoneDescriptor.partitions(); + private void dropTableLocally(long causalityToken, DestroyTableEventParameters parameters) { + int tableId = parameters.tableId(); localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { if (e != null) { @@ -1356,6 +1348,9 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return completedFuture(newMap); })); + // TODO: assignments must be dropped on recovery as well. Is there a guarantee? + int partitions = tablesByIdVv.latest().get(tableId).internalTable().partitions(); + tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(e);
