This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-20680
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit d4232bd068ca65fc28afcb4e1bfa9c03746932b6
Author: amashenkov <[email protected]>
AuthorDate: Wed Jan 31 17:29:22 2024 +0300

    Implement TABLE_DESTROY and INDEX_DESTROY catalog events, and postpone 
table and index data removal for historical queries purposes.
---
 .../apache/ignite/internal/catalog/Catalog.java    |  9 +++
 .../internal/catalog/CatalogManagerImpl.java       | 55 ++++++++++++++++---
 .../internal/catalog/events/CatalogEvent.java      |  6 ++
 .../internal/catalog/events/DestroyIndexEvent.java | 64 ++++++++++++++++++++++
 ...eters.java => DestroyIndexEventParameters.java} | 37 ++++++++-----
 .../internal/catalog/events/DestroyTableEvent.java | 61 +++++++++++++++++++++
 .../events/DestroyTableEventParameters.java        | 51 +++++++++++++++++
 .../catalog/events/DropIndexEventParameters.java   | 14 +----
 ...ntParameters.java => IndexEventParameters.java} | 38 +++++++------
 .../events/MakeIndexAvailableEventParameters.java  | 13 +----
 .../events/StartBuildingIndexEventParameters.java  | 13 +----
 .../catalog/events/TableEventParameters.java       |  2 +-
 .../ignite/internal/catalog/storage/UpdateLog.java |  2 +-
 .../internal/catalog/CatalogManagerSelfTest.java   | 26 ++++++++-
 .../handler/ClientPrimaryReplicaTracker.java       | 17 +++---
 .../apache/ignite/internal/index/IndexManager.java | 26 ++++++---
 .../apache/ignite/internal/table/TableImpl.java    |  2 -
 .../internal/table/distributed/TableManager.java   | 29 ++++------
 18 files changed, 353 insertions(+), 112 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index c7668e7eaa..50e98a9083 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -170,6 +171,14 @@ public class Catalog {
         return zonesByName.values();
     }
 
+    IntSet managedTables() {
+        return tablesById.keySet();
+    }
+
+    IntSet managedIndexes() {
+        return indexesById.keySet();
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index aa99fcdfff..54177e1f3c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -25,6 +25,9 @@ import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParam
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import it.unimi.dsi.fastutil.ints.IntCollection;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,6 +48,8 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyIndexEvent;
+import org.apache.ignite.internal.catalog.events.DestroyTableEvent;
 import org.apache.ignite.internal.catalog.storage.Fireable;
 import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateEntry;
@@ -373,8 +378,8 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     }
 
     /**
-     * Attempts to save a versioned update using a CAS-like logic. If the 
attempt fails, makes more attempts
-     * until the max retry count is reached.
+     * Attempts to save a versioned update using a CAS-like logic. If the 
attempt fails, makes more attempts until the max retry count is
+     * reached.
      *
      * @param updateProducer Supplies simple updates to include into a 
versioned update to install.
      * @param attemptNo Ordinal number of an attempt.
@@ -437,21 +442,56 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
         @Override
         public CompletableFuture<Void> handle(UpdateLogEvent event, 
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
             if (event instanceof SnapshotEntry) {
-                return handle((SnapshotEntry) event);
+                return handle((SnapshotEntry) event, causalityToken);
             }
 
             return handle((VersionedUpdate) event, metaStorageUpdateTimestamp, 
causalityToken);
         }
 
-        private CompletableFuture<Void> handle(SnapshotEntry event) {
+        private CompletableFuture<Void> handle(SnapshotEntry event, long 
causalityToken) {
             Catalog catalog = event.snapshot();
 
+            // Collect destroy events for dropped tables/indexes.
+            List<Fireable> events = new ArrayList<>();
+            IntSet managedTables = catalog.managedTables();
+            IntSet managedIndexes = catalog.managedIndexes();
+            Collection<Catalog> droppedCatalogVersions = 
catalogByVer.subMap(earliestCatalogVersion(), catalog.version()).values();
+
+            // Dropped indexes
+            droppedCatalogVersions.forEach(old -> {
+                old.managedIndexes().intStream()
+                        .filter(id -> !managedIndexes.contains(id))
+                        .filter(new IntOpenHashSet()::add) // unique
+                        .forEach(id -> events.add(new DestroyIndexEvent(id, 
old.index(id).tableId())));
+            });
+            // Dropped tables
+            droppedCatalogVersions.stream().map(Catalog::managedTables)
+                    .flatMapToInt(IntCollection::intStream)
+                    .filter(id -> !managedTables.contains(id))
+                    .filter(new IntOpenHashSet()::add) // unique
+                    .forEach(id -> events.add(new DestroyTableEvent(id)));
+
             // On recovery phase, we must register catalog from the snapshot.
             // In other cases, it is ok to rewrite an existed version, because 
it's exactly the same.
             registerCatalog(catalog);
             truncateUpTo(catalog);
 
-            return nullCompletedFuture();
+            List<CompletableFuture<?>> eventFutures = new 
ArrayList<>(events.size());
+
+            for (Fireable fireEvent : events) {
+                eventFutures.add(fireEvent(
+                        fireEvent.eventType(),
+                        fireEvent.createEventParameters(causalityToken, 
catalog.version())
+                ));
+            }
+
+            return allOf(eventFutures.toArray(CompletableFuture[]::new))
+                    .whenComplete((ignore, err) -> {
+                        if (err != null) {
+                            LOG.warn("Failed to compact catalog.", err);
+                            //TODO: IGNITE-14611 Pass exception to an error 
handler?
+                        }
+                    });
         }
 
         private CompletableFuture<Void> handle(VersionedUpdate update, 
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
@@ -503,7 +543,7 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
 
         assert activationTimestamp > catalog.time()
                 : "Activation timestamp " + activationTimestamp + " must be 
greater than previous catalog version activation timestamp "
-                        + catalog.time();
+                + catalog.time();
 
         return new Catalog(
                 update.version(),
@@ -615,8 +655,7 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     }
 
     /**
-     * A container that keeps given descriptor along with name of the schema 
this
-     * descriptor belongs to.
+     * A container that keeps given descriptor along with name of the schema 
this descriptor belongs to.
      */
     private static class SchemaAwareDescriptor<T> {
         private final T descriptor;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
index 77558e0365..85b88e3d25 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -32,6 +32,9 @@ public enum CatalogEvent implements Event {
     /** This event is fired when a table has been renamed or a column has been 
modified, added to, or removed from a table. */
     TABLE_ALTER,
 
+    /** This event is fired, when all Catalog versions with a table were 
purged from the history. */
+    TABLE_DESTROY,
+
     /** This event is fired, when an index was created in Catalog. */
     INDEX_CREATE,
 
@@ -44,6 +47,9 @@ public enum CatalogEvent implements Event {
     /** This event is fired when the index becomes available, i.e. the index 
has been built. */
     INDEX_AVAILABLE,
 
+    /** This event is fired, when all Catalog versions with an index were 
purged from the history. */
+    INDEX_DESTROY,
+
     /** This event is fired, when a distribution zone was created in Catalog. 
*/
     ZONE_CREATE,
 
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java
new file mode 100644
index 0000000000..d807b2f581
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.catalog.storage.Fireable;
+
+/** {@link CatalogEvent#INDEX_DESTROY} event. */
+public class DestroyIndexEvent implements Fireable {
+    private final int indexId;
+    private final int tableId;
+
+    /**
+     * Constructor.
+     *
+     * @param indexId An id of dropping index.
+     * @param tableId Table ID for which the index was removed.
+     */
+    public DestroyIndexEvent(int indexId, int tableId) {
+        this.indexId = indexId;
+        this.tableId = tableId;
+    }
+
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.INDEX_DESTROY;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DestroyIndexEventParameters(causalityToken, catalogVersion, 
indexId, tableId);
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java
similarity index 52%
copy from 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
copy to 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java
index 96b0d16942..e33fd9df73 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyIndexEventParameters.java
@@ -15,15 +15,29 @@
  * limitations under the License.
  */
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.catalog.events;
 
 /**
- * Drop index event parameters that contains an id of dropped index.
+ * Destroy index event parameters contains an id of destroying index.
  */
-public class DropIndexEventParameters extends CatalogEventParameters {
-
-    private final int indexId;
-
+public class DestroyIndexEventParameters extends IndexEventParameters {
     private final int tableId;
 
     /**
@@ -31,21 +45,14 @@ public class DropIndexEventParameters extends 
CatalogEventParameters {
      *
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
-     * @param indexId An id of dropped index.
+     * @param indexId An Id of destroying index.
      * @param tableId Table ID for which the index was removed.
      */
-    public DropIndexEventParameters(long causalityToken, int catalogVersion, 
int indexId, int tableId) {
-        super(causalityToken, catalogVersion);
-
-        this.indexId = indexId;
+    DestroyIndexEventParameters(long causalityToken, int catalogVersion, int 
indexId, int tableId) {
+        super(causalityToken, catalogVersion, indexId);
         this.tableId = tableId;
     }
 
-    /** Returns an id of dropped index. */
-    public int indexId() {
-        return indexId;
-    }
-
     /** Returns table ID for which the index was removed. */
     public int tableId() {
         return tableId;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java
new file mode 100644
index 0000000000..b47375c271
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.catalog.storage.Fireable;
+
+/** {@link CatalogEvent#TABLE_DESTROY} event. */
+public class DestroyTableEvent implements Fireable {
+    private final int tableId;
+
+    /**
+     * Constructor.
+     *
+     * @param tableId An id of dropped table.
+     */
+    public DestroyTableEvent(int tableId) {
+        this.tableId = tableId;
+    }
+
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_DESTROY;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DestroyTableEventParameters(causalityToken, catalogVersion, 
tableId);
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java
new file mode 100644
index 0000000000..2d61da1878
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DestroyTableEventParameters.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+/**
+ * Destroy table event parameters contains an id of destroying table.
+ */
+public class DestroyTableEventParameters extends TableEventParameters {
+    /**
+     * Constructor.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
+     * @param tableId An Id of destroying table.
+     */
+    DestroyTableEventParameters(long causalityToken, int catalogVersion, int 
tableId) {
+        super(causalityToken, catalogVersion, tableId);
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
index 96b0d16942..caabc569ba 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
@@ -20,10 +20,7 @@ package org.apache.ignite.internal.catalog.events;
 /**
  * Drop index event parameters that contains an id of dropped index.
  */
-public class DropIndexEventParameters extends CatalogEventParameters {
-
-    private final int indexId;
-
+public class DropIndexEventParameters extends IndexEventParameters {
     private final int tableId;
 
     /**
@@ -35,17 +32,10 @@ public class DropIndexEventParameters extends 
CatalogEventParameters {
      * @param tableId Table ID for which the index was removed.
      */
     public DropIndexEventParameters(long causalityToken, int catalogVersion, 
int indexId, int tableId) {
-        super(causalityToken, catalogVersion);
-
-        this.indexId = indexId;
+        super(causalityToken, catalogVersion, indexId);
         this.tableId = tableId;
     }
 
-    /** Returns an id of dropped index. */
-    public int indexId() {
-        return indexId;
-    }
-
     /** Returns table ID for which the index was removed. */
     public int tableId() {
         return tableId;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java
similarity index 51%
copy from 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
copy to 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java
index 96b0d16942..0e3a3cc093 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/IndexEventParameters.java
@@ -15,39 +15,45 @@
  * limitations under the License.
  */
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.catalog.events;
 
 /**
- * Drop index event parameters that contains an id of dropped index.
+ * Event that is related to an index.
  */
-public class DropIndexEventParameters extends CatalogEventParameters {
-
+public abstract class IndexEventParameters extends CatalogEventParameters {
     private final int indexId;
 
-    private final int tableId;
-
     /**
      * Constructor.
      *
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
-     * @param indexId An id of dropped index.
-     * @param tableId Table ID for which the index was removed.
+     * @param indexId ID of the index to which the event relates.
      */
-    public DropIndexEventParameters(long causalityToken, int catalogVersion, 
int indexId, int tableId) {
+    IndexEventParameters(long causalityToken, int catalogVersion, int indexId) 
{
         super(causalityToken, catalogVersion);
-
         this.indexId = indexId;
-        this.tableId = tableId;
     }
 
-    /** Returns an id of dropped index. */
+    /** Returns an id of a modified index. */
     public int indexId() {
         return indexId;
     }
-
-    /** Returns table ID for which the index was removed. */
-    public int tableId() {
-        return tableId;
-    }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java
index c055ec619e..ab53b12f7f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/MakeIndexAvailableEventParameters.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.catalog.events;
 
 /** {@link CatalogEvent#INDEX_AVAILABLE} event parameters. */
-public class MakeIndexAvailableEventParameters extends CatalogEventParameters {
-    private final int indexId;
-
+public class MakeIndexAvailableEventParameters extends IndexEventParameters {
     /**
      * Constructor.
      *
@@ -29,13 +27,6 @@ public class MakeIndexAvailableEventParameters extends 
CatalogEventParameters {
      * @param indexId Index ID.
      */
     public MakeIndexAvailableEventParameters(long causalityToken, int 
catalogVersion, int indexId) {
-        super(causalityToken, catalogVersion);
-
-        this.indexId = indexId;
-    }
-
-    /** Returns index ID. */
-    public int indexId() {
-        return indexId;
+        super(causalityToken, catalogVersion, indexId);
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java
index 797b75de53..d41e3aeff6 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StartBuildingIndexEventParameters.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.catalog.events;
 
 /** {@link CatalogEvent#INDEX_BUILDING} event parameters. */
-public class StartBuildingIndexEventParameters extends CatalogEventParameters {
-    private final int indexId;
-
+public class StartBuildingIndexEventParameters extends IndexEventParameters {
     /**
      * Constructor.
      *
@@ -29,13 +27,6 @@ public class StartBuildingIndexEventParameters extends 
CatalogEventParameters {
      * @param indexId Index ID.
      */
     public StartBuildingIndexEventParameters(long causalityToken, int 
catalogVersion, int indexId) {
-        super(causalityToken, catalogVersion);
-
-        this.indexId = indexId;
-    }
-
-    /** Returns index ID. */
-    public int indexId() {
-        return indexId;
+        super(causalityToken, catalogVersion, indexId);
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
index 089a1be64e..5383a9835d 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
@@ -30,7 +30,7 @@ public abstract class TableEventParameters extends 
CatalogEventParameters {
      * @param catalogVersion Catalog version.
      * @param tableId ID of the table to which the event relates.
      */
-    public TableEventParameters(long causalityToken, int catalogVersion, int 
tableId) {
+    TableEventParameters(long causalityToken, int catalogVersion, int tableId) 
{
         super(causalityToken, catalogVersion);
         this.tableId = tableId;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index ba1c7b495c..c8cd790836 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -46,7 +46,7 @@ public interface UpdateLog extends IgniteComponent {
      *
      * @param snapshotEntry An entry, which represents a result of merging 
updates of previous versions.
      * @return A {@code true} if snapshot has been successfully appended, 
{@code false} otherwise
-     *      if a snapshot with the same or greater version already exists, or 
snapshots are not supported.
+     *      if a snapshot with the same or greater version already exists.
      */
     CompletableFuture<Boolean> saveSnapshot(SnapshotEntry snapshotEntry);
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index c3428141e0..ea4949bb94 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -48,6 +48,7 @@ import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -127,6 +128,8 @@ import 
org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
@@ -1132,12 +1135,13 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
     }
 
     @Test
-    public void testTableEvents() {
+    public void testTableEvents() throws InterruptedException {
         EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
         when(eventListener.notify(any(), 
any())).thenReturn(falseCompletedFuture());
 
         manager.listen(CatalogEvent.TABLE_CREATE, eventListener);
         manager.listen(CatalogEvent.TABLE_DROP, eventListener);
+        manager.listen(CatalogEvent.TABLE_DESTROY, eventListener);
 
         assertThat(manager.execute(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
         verify(eventListener).notify(any(CreateTableEventParameters.class), 
isNull());
@@ -1145,11 +1149,19 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertThat(manager.execute(dropTableCommand(TABLE_NAME)), 
willBe(nullValue()));
         verify(eventListener).notify(any(DropTableEventParameters.class), 
isNull());
 
+        verifyNoMoreInteractions(eventListener);
+        clearInvocations(eventListener);
+
+        // Table destroyed after Catalog compaction.
+        assertThat(manager.compactCatalog(clock.nowLong()), 
willBe(nullValue()));
+        waitForCondition(() -> manager.earliestCatalogVersion() == 
manager.latestCatalogVersion(), 2_000);
+        verify(eventListener).notify(any(DestroyTableEventParameters.class), 
isNull());
+
         verifyNoMoreInteractions(eventListener);
     }
 
     @Test
-    public void testCreateIndexEvents() {
+    public void testIndexEvents() throws InterruptedException {
         CatalogCommand createIndexCmd = createHashIndexCommand(INDEX_NAME, 
List.of("ID"));
 
         CatalogCommand dropIndexCmd = 
DropIndexCommand.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build();
@@ -1159,6 +1171,7 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
 
         manager.listen(CatalogEvent.INDEX_CREATE, eventListener);
         manager.listen(CatalogEvent.INDEX_DROP, eventListener);
+        manager.listen(CatalogEvent.INDEX_DESTROY, eventListener);
 
         // Try to create index without table.
         assertThat(manager.execute(createIndexCmd), 
willThrow(TableNotFoundValidationException.class));
@@ -1180,6 +1193,15 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertThat(manager.execute(dropIndexCmd), willBe(nullValue()));
         verify(eventListener).notify(any(DropIndexEventParameters.class), 
isNull());
 
+        verifyNoMoreInteractions(eventListener);
+        clearInvocations(eventListener);
+
+        // Index destroyed after Catalog compaction.
+        assertThat(manager.compactCatalog(clock.nowLong()), 
willBe(nullValue()));
+        waitForCondition(() -> manager.earliestCatalogVersion() == 
manager.latestCatalogVersion(), 2_000);
+        verify(eventListener).notify(any(DestroyIndexEventParameters.class), 
isNull());
+
+        verifyNoMoreInteractions(eventListener);
         clearInvocations(eventListener);
 
         // Drop table with pk index.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
index 9095615e98..945ea7634c 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.event.EventParameters;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -284,8 +284,8 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
     }
 
     private CompletableFuture<Boolean> notifyInternal(EventParameters 
parameters) {
-        if (parameters instanceof DropTableEventParameters) {
-            removeTable((DropTableEventParameters) parameters);
+        if (parameters instanceof DestroyTableEventParameters) {
+            removeTable((DestroyTableEventParameters) parameters);
 
             return falseCompletedFuture();
         }
@@ -308,18 +308,19 @@ public class ClientPrimaryReplicaTracker implements 
EventListener<EventParameter
         return falseCompletedFuture(); // false: don't remove listener.
     }
 
-    private void removeTable(DropTableEventParameters dropTableEvent) {
+    private void removeTable(DestroyTableEventParameters destroyTableEvent) {
         // Use previous version of the catalog to get the dropped table.
-        int prevCatalogVersion = dropTableEvent.catalogVersion() - 1;
+        int prevCatalogVersion = destroyTableEvent.catalogVersion() - 1;
 
-        CatalogTableDescriptor table = 
catalogService.table(dropTableEvent.tableId(), prevCatalogVersion);
-        assert table != null : "Table from DropTableEventParameters not found: 
" + dropTableEvent.tableId();
+        //TODO: avoid getting a descriptor.
+        CatalogTableDescriptor table = 
catalogService.table(destroyTableEvent.tableId(), prevCatalogVersion);
+        assert table != null : "Table from DropTableEventParameters not found: 
" + destroyTableEvent.tableId();
 
         CatalogZoneDescriptor zone = catalogService.zone(table.zoneId(), 
prevCatalogVersion);
         assert zone != null : "Zone from DropTableEventParameters not found: " 
+ table.zoneId();
 
         for (int partition = 0; partition < zone.partitions(); partition++) {
-            TablePartitionId tablePartitionId = new 
TablePartitionId(dropTableEvent.tableId(), partition);
+            TablePartitionId tablePartitionId = new 
TablePartitionId(destroyTableEvent.tableId(), partition);
             primaryReplicas.remove(tablePartitionId);
         }
     }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index fd2c1082c9..3cf85765e5 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
+import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DESTROY;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -44,7 +46,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyIndexEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -139,6 +141,14 @@ public class IndexManager implements IgniteComponent {
             return onIndexCreate((CreateIndexEventParameters) parameters);
         });
 
+        catalogService.listen(INDEX_DESTROY, (parameters, exception) -> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onIndexDestroy((DestroyIndexEventParameters) parameters);
+        });
+
         LOG.info("Index manager started");
 
         return nullCompletedFuture();
@@ -177,27 +187,27 @@ public class IndexManager implements IgniteComponent {
         return 
mvTableStoragesByIdVv.get(causalityToken).thenApply(mvTableStoragesById -> 
mvTableStoragesById.get(tableId));
     }
 
-    // TODO: IGNITE-20121 Unregister index only before we physically start 
deleting the index before truncate catalog
-    private CompletableFuture<Boolean> onIndexDrop(DropIndexEventParameters 
parameters) {
+    private CompletableFuture<Boolean> 
onIndexDestroy(DestroyIndexEventParameters parameters) {
         int indexId = parameters.indexId();
         int tableId = parameters.tableId();
 
         long causalityToken = parameters.causalityToken();
 
-        CompletableFuture<TableViewInternal> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
+        // TODO: fix getting table ???
+        TableViewInternal table = tableManager.getTable(tableId);
 
         return inBusyLockAsync(busyLock, () -> mvTableStoragesByIdVv.update(
                 causalityToken,
-                updater(mvTableStorageById -> tableFuture.thenApply(table -> 
inBusyLock(busyLock, () -> {
+                updater(mvTableStorageById -> inBusyLockAsync(busyLock, () -> {
                     if (table != null) {
                         // In case of DROP TABLE the table will be removed 
first.
                         table.unregisterIndex(indexId);
 
-                        return mvTableStorageById;
+                        return completedFuture(mvTableStorageById);
                     } else {
-                        return 
removeMvTableStorageIfPresent(mvTableStorageById, tableId);
+                        return 
completedFuture(removeMvTableStorageIfPresent(mvTableStorageById, tableId));
                     }
-                })))
+                }))
         )).thenApply(unused -> false);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index f12ddf0b31..508b912970 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -307,8 +307,6 @@ public class TableImpl implements TableViewInternal {
         indexWrapperById.remove(indexId);
 
         completeWaitIndex(indexId);
-
-        // TODO: IGNITE-19150 Also need to destroy the index storages
     }
 
     private void awaitIndexes() {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d1190184b1..d7a213a443 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -87,7 +87,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DestroyTableEventParameters;
 import org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import org.apache.ignite.internal.causality.CompletionListener;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
@@ -538,10 +538,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 return onTableCreate((CreateTableEventParameters) 
parameters).thenApply(unused -> false);
             });
 
-            catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, 
exception) -> {
+            catalogService.listen(CatalogEvent.TABLE_DESTROY, (parameters, 
exception) -> {
                 assert exception == null : parameters;
 
-                return onTableDelete(((DropTableEventParameters) 
parameters)).thenApply(unused -> false);
+                return onTableDestroy(((DestroyTableEventParameters) 
parameters)).thenApply(unused -> false);
             });
 
             catalogService.listen(CatalogEvent.TABLE_ALTER, (parameters, 
exception) -> {
@@ -688,17 +688,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private CompletableFuture<Void> onTableDelete(DropTableEventParameters 
parameters) {
+    private CompletableFuture<Void> onTableDestroy(DestroyTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            long causalityToken = parameters.causalityToken();
-            int catalogVersion = parameters.catalogVersion();
+            long causalityToken = localPartsByTableIdVv.latestCausalityToken();
 
-            int tableId = parameters.tableId();
-
-            CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tableId, catalogVersion - 1);
-            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion - 1);
-
-            dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor);
+            tablesByIdVv.get(causalityToken).thenAccept(map -> 
dropTableLocally(causalityToken, parameters));
 
             return nullCompletedFuture();
         });
@@ -1338,12 +1332,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * Drops local structures for a table.
      *
      * @param causalityToken Causality token.
-     * @param tableDescriptor Catalog table descriptor.
-     * @param zoneDescriptor Catalog distributed zone descriptor.
+     * @param parameters Destroy table event parameters.
      */
-    private void dropTableLocally(long causalityToken, CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-        int tableId = tableDescriptor.id();
-        int partitions = zoneDescriptor.partitions();
+    private void dropTableLocally(long causalityToken, 
DestroyTableEventParameters parameters) {
+        int tableId = parameters.tableId();
 
         localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
@@ -1356,6 +1348,9 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return completedFuture(newMap);
         }));
 
+        // TODO: assignments must be dropped on recovery as well. Is there a 
guarantee?
+        int partitions = 
tablesByIdVv.latest().get(tableId).internalTable().partitions();
+
         tablesByIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(e);


Reply via email to