This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-17964 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 05db73defe67187370a036ac4e445130bcfefab4 Author: amashenkov <[email protected]> AuthorDate: Mon Oct 24 19:48:18 2022 +0300 Fix the issue. --- .../stat/IgniteStatisticsConfigurationManager.java | 116 ++++++++++++++++----- 1 file changed, 89 insertions(+), 27 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java index b4cdbc38e07..0db8c9e7d36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java @@ -32,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; @@ -57,8 +58,12 @@ import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnC import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration; import org.apache.ignite.internal.processors.query.stat.view.ColumnConfigurationViewSupplier; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; @@ -132,7 +137,15 @@ public class IgniteStatisticsConfigurationManager { for (Consumer<StatisticsObjectConfiguration> subscriber : subscribers) subscriber.accept(newStatCfg); - mgmtBusyExecutor.execute(() -> updateLocalStatistics(newStatCfg)); + mgmtBusyExecutor.execute(() -> { + try { + while (!updateLocalStatisticsAsync((StatisticsObjectConfiguration)newV).get()) + ; // No-op + } + catch (IgniteCheckedException e) { + log.warning("Unexpected error during statistics collection: " + e.getMessage(), e); + } + }); } ); } @@ -266,7 +279,7 @@ public class IgniteStatisticsConfigurationManager { * * @param cfg Statistics object configuration to update statistics by. */ - private void updateLocalStatistics(StatisticsObjectConfiguration cfg) { + private IgniteInternalFuture<Boolean> updateLocalStatisticsAsync(StatisticsObjectConfiguration cfg) { TableDescriptor tbl = schemaMgr.table(cfg.key().schema(), cfg.key().obj()); GridQueryTypeDescriptor typeDesc = tbl != null ? tbl.type() : null; GridCacheContextInfo<?, ?> cacheInfo = tbl != null ? tbl.cacheInfo() : null; @@ -291,17 +304,17 @@ public class IgniteStatisticsConfigurationManager { if (log.isDebugEnabled()) log.debug("Removing config for non existing object " + cfg.key()); - dropStatistics(Collections.singletonList(new StatisticsTarget(cfg.key())), false); + return dropStatisticsAsync(Collections.singletonList(new StatisticsTarget(cfg.key())), false); } - return; + return new GridFinishedFuture<>(true); } if (cctx == null || !cctx.gate().enterIfNotStopped()) { if (log.isDebugEnabled()) log.debug("Unable to lock table by key " + cfg.key() + ". Skipping statistics collection."); - return; + return new GridFinishedFuture<>(true); } try { @@ -320,6 +333,8 @@ public class IgniteStatisticsConfigurationManager { finally { cctx.gate().leave(); } + + return new GridFinishedFuture<>(true); } /** @@ -358,10 +373,19 @@ public class IgniteStatisticsConfigurationManager { */ public void updateAllLocalStatistics() { try { + GridCompoundFuture<Boolean, Boolean> compoundFuture = new GridCompoundFuture<>(CU.boolReducer()); + distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> { StatisticsObjectConfiguration cfg = (StatisticsObjectConfiguration)v; - updateLocalStatistics(cfg); + compoundFuture.add(updateLocalStatisticsAsync(cfg)); + }); + + compoundFuture.markInitialized(); + + compoundFuture.listen(future -> { + if (future.error() == null && !future.result()) + mgmtBusyExecutor.execute(this::updateAllLocalStatistics); }); } catch (IgniteCheckedException e) { @@ -438,38 +462,76 @@ public class IgniteStatisticsConfigurationManager { * @param validate if {@code true} - validate statistics existence, otherwise - just try to remove. */ public void dropStatistics(List<StatisticsTarget> targets, boolean validate) { + try { + while (!dropStatisticsAsync(targets, validate).get()) + ; // No-op + } + catch (IgniteCheckedException ex) { + if (ex.getCause() instanceof IgniteSQLException) + throw (IgniteSQLException)ex.getCause(); + + throw new IgniteSQLException("Error occurs while updating statistics schema", + IgniteQueryErrorCode.UNKNOWN, ex); + } + } + + /** + * Drop local statistic for specified database objects on the cluster. + * Remove local aggregated and partitioned statistics that are stored at the local metastorage. + * + * @param targets DB objects to update statistics by. + * @param validate if {@code true} - validate statistics existence, otherwise - just try to remove. + */ + public IgniteInternalFuture<Boolean> dropStatisticsAsync(List<StatisticsTarget> targets, boolean validate) { if (log.isDebugEnabled()) log.debug("Drop statistics [targets=" + targets + ']'); + GridFutureAdapter<Boolean> resultFuture = new GridFutureAdapter<>(); + IgniteInternalFuture<Boolean> chainFuture = new GridFinishedFuture<>(true); + for (StatisticsTarget target : targets) { - String key = key2String(target.key()); + chainFuture = chainFuture.chainCompose(f -> { + if (f.error() == null && f.result() == Boolean.TRUE) + return removeFromMetastore(target, validate); - try { - while (true) { - StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key); + return f; + }); + } - if (validate) - validateDropRefresh(target, oldCfg); + chainFuture.listen(f -> { + if (f.error() != null) + resultFuture.onDone(f.error()); + else + resultFuture.onDone(f.result() == null || f.result().booleanValue()); + }); - if (oldCfg == null) - return; + return resultFuture; + } - Set<String> dropColNames = (target.columns() == null) ? Collections.emptySet() : - Arrays.stream(target.columns()).collect(Collectors.toSet()); + private IgniteInternalFuture<Boolean> removeFromMetastore(StatisticsTarget target, boolean validate) { + String key = key2String(target.key()); - StatisticsObjectConfiguration newCfg = oldCfg.dropColumns(dropColNames); + try { + StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key); - if (oldCfg.equals(newCfg)) - break; + if (validate) + validateDropRefresh(target, oldCfg); - if (distrMetaStorage.compareAndSet(key, oldCfg, newCfg)) - break; - } - } - catch (IgniteCheckedException ex) { - throw new IgniteSQLException( - "Error on get or update statistic schema", IgniteQueryErrorCode.UNKNOWN, ex); - } + if (oldCfg == null) + return new GridFinishedFuture<>(null); //Stop future chaining. Other thread\node makes the progress. + + Set<String> dropColNames = (target.columns() == null) ? Collections.emptySet() : + Arrays.stream(target.columns()).collect(Collectors.toSet()); + + StatisticsObjectConfiguration newCfg = oldCfg.dropColumns(dropColNames); + + if (oldCfg.equals(newCfg)) + return new GridFinishedFuture<>(true); //Skip. Nothing to do. + + return distrMetaStorage.compareAndSetAsync(key, oldCfg, newCfg); + } + catch (Throwable ex) { + return new GridFinishedFuture<>(ex); } }
