This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17964
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 05db73defe67187370a036ac4e445130bcfefab4
Author: amashenkov <[email protected]>
AuthorDate: Mon Oct 24 19:48:18 2022 +0300

    Fix the issue.
---
 .../stat/IgniteStatisticsConfigurationManager.java | 116 ++++++++++++++++-----
 1 file changed, 89 insertions(+), 27 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index b4cdbc38e07..0db8c9e7d36 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -57,8 +58,12 @@ import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnC
 import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import 
org.apache.ignite.internal.processors.query.stat.view.ColumnConfigurationViewSupplier;
 import 
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 
@@ -132,7 +137,15 @@ public class IgniteStatisticsConfigurationManager {
                         for (Consumer<StatisticsObjectConfiguration> 
subscriber : subscribers)
                             subscriber.accept(newStatCfg);
 
-                        mgmtBusyExecutor.execute(() -> 
updateLocalStatistics(newStatCfg));
+                        mgmtBusyExecutor.execute(() -> {
+                            try {
+                                while 
(!updateLocalStatisticsAsync((StatisticsObjectConfiguration)newV).get())
+                                    ; // No-op
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.warning("Unexpected error during 
statistics collection: " + e.getMessage(), e);
+                            }
+                        });
                     }
                 );
             }
@@ -266,7 +279,7 @@ public class IgniteStatisticsConfigurationManager {
      *
      * @param cfg Statistics object configuration to update statistics by.
      */
-    private void updateLocalStatistics(StatisticsObjectConfiguration cfg) {
+    private IgniteInternalFuture<Boolean> 
updateLocalStatisticsAsync(StatisticsObjectConfiguration cfg) {
         TableDescriptor tbl = schemaMgr.table(cfg.key().schema(), 
cfg.key().obj());
         GridQueryTypeDescriptor typeDesc = tbl != null ? tbl.type() : null;
         GridCacheContextInfo<?, ?> cacheInfo = tbl != null ? tbl.cacheInfo() : 
null;
@@ -291,17 +304,17 @@ public class IgniteStatisticsConfigurationManager {
                 if (log.isDebugEnabled())
                     log.debug("Removing config for non existing object " + 
cfg.key());
 
-                dropStatistics(Collections.singletonList(new 
StatisticsTarget(cfg.key())), false);
+                return dropStatisticsAsync(Collections.singletonList(new 
StatisticsTarget(cfg.key())), false);
             }
 
-            return;
+            return new GridFinishedFuture<>(true);
         }
 
         if (cctx == null || !cctx.gate().enterIfNotStopped()) {
             if (log.isDebugEnabled())
                 log.debug("Unable to lock table by key " + cfg.key() + ". 
Skipping statistics collection.");
 
-            return;
+            return new GridFinishedFuture<>(true);
         }
 
         try {
@@ -320,6 +333,8 @@ public class IgniteStatisticsConfigurationManager {
         finally {
             cctx.gate().leave();
         }
+
+        return new GridFinishedFuture<>(true);
     }
 
     /**
@@ -358,10 +373,19 @@ public class IgniteStatisticsConfigurationManager {
      */
     public void updateAllLocalStatistics() {
         try {
+            GridCompoundFuture<Boolean, Boolean> compoundFuture = new 
GridCompoundFuture<>(CU.boolReducer());
+
             distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> {
                 StatisticsObjectConfiguration cfg = 
(StatisticsObjectConfiguration)v;
 
-                updateLocalStatistics(cfg);
+                compoundFuture.add(updateLocalStatisticsAsync(cfg));
+            });
+
+            compoundFuture.markInitialized();
+
+            compoundFuture.listen(future -> {
+                if (future.error() == null && !future.result())
+                    mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
             });
         }
         catch (IgniteCheckedException e) {
@@ -438,38 +462,76 @@ public class IgniteStatisticsConfigurationManager {
      * @param validate if {@code true} - validate statistics existence, 
otherwise - just try to remove.
      */
     public void dropStatistics(List<StatisticsTarget> targets, boolean 
validate) {
+        try {
+            while (!dropStatisticsAsync(targets, validate).get())
+                ; // No-op
+        }
+        catch (IgniteCheckedException ex) {
+            if (ex.getCause() instanceof IgniteSQLException)
+                throw (IgniteSQLException)ex.getCause();
+
+            throw new IgniteSQLException("Error occurs while updating 
statistics schema",
+                    IgniteQueryErrorCode.UNKNOWN, ex);
+        }
+    }
+
+    /**
+     * Drop local statistic for specified database objects on the cluster.
+     * Remove local aggregated and partitioned statistics that are stored at 
the local metastorage.
+     *
+     * @param targets  DB objects to update statistics by.
+     * @param validate if {@code true} - validate statistics existence, 
otherwise - just try to remove.
+     */
+    public IgniteInternalFuture<Boolean> 
dropStatisticsAsync(List<StatisticsTarget> targets, boolean validate) {
         if (log.isDebugEnabled())
             log.debug("Drop statistics [targets=" + targets + ']');
 
+        GridFutureAdapter<Boolean> resultFuture = new GridFutureAdapter<>();
+        IgniteInternalFuture<Boolean> chainFuture = new 
GridFinishedFuture<>(true);
+
         for (StatisticsTarget target : targets) {
-            String key = key2String(target.key());
+            chainFuture = chainFuture.chainCompose(f -> {
+                if (f.error() == null && f.result() == Boolean.TRUE)
+                    return removeFromMetastore(target, validate);
 
-            try {
-                while (true) {
-                    StatisticsObjectConfiguration oldCfg = 
distrMetaStorage.read(key);
+                return f;
+            });
+        }
 
-                    if (validate)
-                        validateDropRefresh(target, oldCfg);
+        chainFuture.listen(f -> {
+            if (f.error() != null)
+                resultFuture.onDone(f.error());
+            else
+                resultFuture.onDone(f.result() == null || 
f.result().booleanValue());
+        });
 
-                    if (oldCfg == null)
-                        return;
+        return resultFuture;
+    }
 
-                    Set<String> dropColNames = (target.columns() == null) ? 
Collections.emptySet() :
-                        
Arrays.stream(target.columns()).collect(Collectors.toSet());
+    private IgniteInternalFuture<Boolean> removeFromMetastore(StatisticsTarget 
target, boolean validate) {
+        String key = key2String(target.key());
 
-                    StatisticsObjectConfiguration newCfg = 
oldCfg.dropColumns(dropColNames);
+        try {
+            StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
 
-                    if (oldCfg.equals(newCfg))
-                        break;
+            if (validate)
+                validateDropRefresh(target, oldCfg);
 
-                    if (distrMetaStorage.compareAndSet(key, oldCfg, newCfg))
-                        break;
-                }
-            }
-            catch (IgniteCheckedException ex) {
-                throw new IgniteSQLException(
-                    "Error on get or update statistic schema", 
IgniteQueryErrorCode.UNKNOWN, ex);
-            }
+            if (oldCfg == null)
+                return new GridFinishedFuture<>(null); //Stop future chaining. 
Other thread\node makes the progress.
+
+            Set<String> dropColNames = (target.columns() == null) ? 
Collections.emptySet() :
+                                               
Arrays.stream(target.columns()).collect(Collectors.toSet());
+
+            StatisticsObjectConfiguration newCfg = 
oldCfg.dropColumns(dropColNames);
+
+            if (oldCfg.equals(newCfg))
+                return new GridFinishedFuture<>(true); //Skip. Nothing to do.
+
+            return distrMetaStorage.compareAndSetAsync(key, oldCfg, newCfg);
+        }
+        catch (Throwable ex) {
+            return new GridFinishedFuture<>(ex);
         }
     }
 

Reply via email to