This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 2158e3bfb457dea9931043e35cb5d4361a7aaf8e Merge: 26ec56a3e1 7415a252d8 Author: Christopher Tubbs <[email protected]> AuthorDate: Fri Feb 27 16:23:18 2026 -0500 Merge branch '2.1' .../accumulo/core/logging/ConditionalLogger.java | 167 +++++++++------------ .../spi/balancer/HostRegexTableLoadBalancer.java | 5 +- .../core/logging/DeduplicatingLoggerTest.java | 2 +- .../core/logging/EscalatingLoggerTest.java | 5 +- .../server/compaction/CompactionJobGenerator.java | 15 +- .../server/conf/CheckCompactionConfig.java | 17 +-- .../org/apache/accumulo/manager/EventQueue.java | 3 - .../accumulo/manager/TabletGroupWatcher.java | 8 +- .../coordinator/CompactionCoordinator.java | 12 +- .../manager/upgrade/UpgradeCoordinator.java | 3 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- 11 files changed, 105 insertions(+), 134 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index e391817452,49309dc352..5540eeec08 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@@ -66,9 -62,10 +66,8 @@@ import org.apache.commons.lang3.builder import org.apache.commons.lang3.builder.ToStringStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.slf4j.event.Level; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 6252d4b15d,0000000000..7e812ac872 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@@ -1,349 -1,0 +1,348 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.fate.FateId; - import org.apache.accumulo.core.logging.ConditionalLogger; ++import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.CompactionServices; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - import org.slf4j.event.Level; + +import com.github.benmanes.caffeine.cache.Cache; + +public class CompactionJobGenerator { + private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class); - private static final Logger UNKNOWN_SERVICE_ERROR_LOG = - new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); - private static final Logger PLANNING_INIT_ERROR_LOG = - new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); - private static final Logger PLANNING_ERROR_LOG = - new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); ++ private static final EscalatingLogger UNKNOWN_SERVICE_ERROR_LOG = ++ new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error); ++ private static final EscalatingLogger PLANNING_INIT_ERROR_LOG = ++ new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error); ++ private static final EscalatingLogger PLANNING_ERROR_LOG = ++ new EscalatingLogger(log, Duration.ofMinutes(5), 3000, Logger::error); + + private final CompactionServicesConfig servicesConfig; + private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>(); + private final Cache<TableId,CompactionDispatcher> dispatchers; + private final Set<CompactionServiceId> serviceIds; + private final PluginEnvironment env; + private final Map<FateId,Map<String,String>> allExecutionHints; + private final SteadyTime steadyTime; + + public CompactionJobGenerator(PluginEnvironment env, + Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) { + servicesConfig = new CompactionServicesConfig(env.getConfiguration()); + serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) + .collect(Collectors.toUnmodifiableSet()); + + dispatchers = Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false) + .maximumSize(10).build(); + this.env = env; + if (executionHints.isEmpty()) { + this.allExecutionHints = executionHints; + } else { + this.allExecutionHints = new HashMap<>(); + // Make the maps that will be passed to plugins unmodifiable. Do this once, so it does not + // need to be done for each tablet. + executionHints.forEach((k, v) -> allExecutionHints.put(k, + v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); + } + + this.steadyTime = steadyTime; + } + + public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) { + Collection<CompactionJob> systemJobs = Set.of(); + + log.trace("Planning for {} {} {}", tablet.getExtent(), kinds, this.hashCode()); + + if (kinds.contains(CompactionKind.SYSTEM)) { + CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of()); + systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of()); + } + + Collection<CompactionJob> userJobs = Set.of(); + + if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != null) { + var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateId()); + if (hints != null) { + CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, hints); + userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, hints); + } + } + + if (userJobs.isEmpty()) { + return systemJobs; + } else if (systemJobs.isEmpty()) { + return userJobs; + } else { + var all = new ArrayList<CompactionJob>(systemJobs.size() + userJobs.size()); + all.addAll(systemJobs); + all.addAll(userJobs); + return all; + } + } + + private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata tablet, + Map<String,String> executionHints) { + + CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(), + tableId -> CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId)); + + CompactionDispatcher.DispatchParameters dispatchParams = + new CompactionDispatcher.DispatchParameters() { + @Override + public CompactionServices getCompactionServices() { + return () -> serviceIds; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getCompactionKind() { + return kind; + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + }; + + return dispatcher.dispatch(dispatchParams).getService(); + } + + private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId, + CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) { + + if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) { + UNKNOWN_SERVICE_ERROR_LOG.trace( + "Table {} returned non-existent compaction service {} for compaction type {}. Check" + + " the table compaction dispatcher configuration. No compactions will happen" + + " until the configuration is fixed. This log message is temporarily suppressed.", + tablet.getExtent().tableId(), serviceId, kind); + return Set.of(); + } + + CompactionPlanner planner = + planners.computeIfAbsent(serviceId, sid -> createPlanner(tablet.getTableId(), serviceId)); + + // selecting indicator + // selected files + + String ratioStr = + env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey()); + if (ratioStr == null) { + ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue(); + } + + double ratio = Double.parseDouble(ratioStr); + + Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + Set<CompactableFile> candidates; + + if (kind == CompactionKind.SYSTEM) { + if (tablet.getExternalCompactions().isEmpty() && tablet.getSelectedFiles() == null) { + candidates = allFiles; + } else { + var tmpFiles = new HashMap<>(tablet.getFilesMap()); + // remove any files that are in active compactions + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(tmpFiles::remove); + // remove any files that are selected and the user compaction has completed + // at least 1 job, otherwise we can keep the files + var selectedFiles = tablet.getSelectedFiles(); + + if (selectedFiles != null) { + long selectedExpirationDuration = + ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId()) + .get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey())); + + // If jobs are completed, or selected time has not expired, the remove + // from the candidate list otherwise we can cancel the selection + if (selectedFiles.getCompletedJobs() > 0 + || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis() + < selectedExpirationDuration)) { + tmpFiles.keySet().removeAll(selectedFiles.getFiles()); + } + } + candidates = tmpFiles.entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + } + } else if (kind == CompactionKind.USER) { + var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles()); + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(selectedFiles::remove); + candidates = selectedFiles.stream() + .map(file -> new CompactableFileImpl(file, tablet.getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } else { + throw new UnsupportedOperationException(); + } + + if (candidates.isEmpty()) { + // there are not candidate files for compaction, so no reason to call the planner + return Set.of(); + } + + CompactionPlanner.PlanningParameters params = new CompactionPlanner.PlanningParameters() { + + @Override + public NamespaceId getNamespaceId() throws TableNotFoundException { + return ((ServiceEnvironmentImpl) env).getContext().getNamespaceId(tablet.getTableId()); + } + + @Override + public TableId getTableId() { + return tablet.getTableId(); + } + + @Override + public TabletId getTabletId() { + return new TabletIdImpl(tablet.getExtent()); + } + + @Override + public ServiceEnvironment getServiceEnvironment() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getKind() { + return kind; + } + + @Override + public double getRatio() { + return ratio; + } + + @Override + public Collection<CompactableFile> getAll() { + return allFiles; + } + + @Override + public Collection<CompactableFile> getCandidates() { + return candidates; + } + + @Override + public Collection<CompactionJob> getRunningCompactions() { + var allFiles2 = tablet.getFilesMap(); + return tablet.getExternalCompactions().values().stream().map(ecMeta -> { + Collection<CompactableFile> files = ecMeta.getJobFiles().stream() + .map(f -> new CompactableFileImpl(f, allFiles2.get(f))).collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionGroupId(), files, ecMeta.getKind()); + return job; + }).collect(Collectors.toUnmodifiableList()); + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + + @Override + public CompactionPlan.Builder createPlanBuilder() { + return new CompactionPlanImpl.BuilderImpl(kind, candidates); + } + }; + return planCompactions(planner, params, serviceId); + } + + private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) { + + CompactionPlanner planner; + String plannerClassName = null; + Map<String,String> options = null; + try { + plannerClassName = servicesConfig.getPlanners().get(serviceId.canonical()); + options = servicesConfig.getOptions().get(serviceId.canonical()); + planner = env.instantiate(tableId, plannerClassName, CompactionPlanner.class); + CompactionPlannerInitParams initParameters = new CompactionPlannerInitParams(serviceId, + servicesConfig.getPlannerPrefix(serviceId.canonical()), + servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); + planner.init(initParameters); + } catch (Exception e) { + PLANNING_INIT_ERROR_LOG.trace( + "Failed to create compaction planner for service:{} tableId:{} using class:{} options:{}. Compaction " + + "service will not start any new compactions until its configuration is fixed. This log message is " + + "temporarily suppressed.", + serviceId, tableId, plannerClassName, options, e); + planner = new ProvisionalCompactionPlanner(serviceId); + } + return planner; + } + + private Collection<CompactionJob> planCompactions(CompactionPlanner planner, + CompactionPlanner.PlanningParameters params, CompactionServiceId serviceId) { + try { + return planner.makePlan(params).getJobs(); + } catch (Exception e) { + PLANNING_ERROR_LOG.trace( + "Failed to plan compactions for service:{} kind:{} tableId:{} hints:{}. Compaction service may not start any" + + " new compactions until this issue is resolved. Duplicates of this log message are temporarily" + + " suppressed.", + serviceId, params.getKind(), params.getTableId(), params.getExecutionHints(), e); + return Set.of(); + } + } +} diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java index 4d30b2dba4,c02b36edf7..9a0e982a8a --- a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java @@@ -21,20 -21,13 +21,21 @@@ package org.apache.accumulo.server.conf import static org.apache.accumulo.core.Constants.DEFAULT_COMPACTION_SERVICE_NAME; import java.io.FileNotFoundException; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.cli.ClientKeywordExecutable; +import org.apache.accumulo.core.cli.ClientOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.ResourceGroupId; import org.apache.accumulo.core.data.TableId; ++import org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; @@@ -47,9 -37,7 +48,8 @@@ import org.apache.accumulo.start.spi.Co import org.apache.accumulo.start.spi.KeywordExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.slf4j.event.Level; +import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.auto.service.AutoService; @@@ -100,18 -94,17 +100,17 @@@ public class CheckCompactionConfig exte } AccumuloConfiguration config = SiteConfiguration.fromFile(path.toFile()).build(); - validate(config, Level.INFO); - validate(config); ++ validate(config, Logger::info); } - public static void validate(AccumuloConfiguration config, Level level) - public static void validate(AccumuloConfiguration config) ++ public static void validate(AccumuloConfiguration config, ConditionalLogAction logAction) throws ReflectiveOperationException, SecurityException, IllegalArgumentException { - var servicesConfig = new CompactionServicesConfig(config, log::warn); + var servicesConfig = new CompactionServicesConfig(config); ServiceEnvironment senv = createServiceEnvironment(config); - Set<String> defaultServices = Set.of(DEFAULT, META, ROOT); - if (servicesConfig.getPlanners().keySet().equals(defaultServices)) { - log.warn("Only the default compaction services were created - {}", defaultServices); + Set<String> defaultService = Set.of(DEFAULT_COMPACTION_SERVICE_NAME); + if (servicesConfig.getPlanners().keySet().equals(defaultService)) { - log.atLevel(level).log("Only the default compaction service was created - {}", - defaultService); ++ logAction.log(log, "Only the default compaction service was created - {}", defaultService); return; } @@@ -119,7 -111,8 +118,7 @@@ for (var entry : servicesConfig.getPlanners().entrySet()) { String serviceId = entry.getKey(); String plannerClassName = entry.getValue(); - log.atLevel(level).log("Service id: {}, planner class:{}", serviceId, plannerClassName); - - log.info("Service id: {}, planner class:{}", serviceId, plannerClassName); ++ logAction.log(log, "Service id: {}, planner class:{}", serviceId, plannerClassName); Class<? extends CompactionPlanner> plannerClass = Class.forName(plannerClassName).asSubclass(CompactionPlanner.class); @@@ -131,29 -123,19 +130,29 @@@ planner.init(initParams); - initParams.getRequestedExecutors() - .forEach((execId, numThreads) -> log.info( - "Compaction service '{}' requested creation of thread pool '{}' with {} threads.", - serviceId, execId, numThreads)); + initParams.getRequestedGroups().forEach(groupId -> { - log.atLevel(level).log("Compaction service '{}' requested with compactor group '{}'", - serviceId, groupId); ++ logAction.log(log, "Compaction service '{}' requested with compactor group '{}'", serviceId, ++ groupId); + groupToServices.computeIfAbsent(groupId, f -> new HashSet<>()).add(serviceId); + }); + } - initParams.getRequestedExternalExecutors() - .forEach(execId -> log.info( - "Compaction service '{}' requested with external execution queue '{}'", serviceId, - execId)); + boolean dupesFound = false; + for (Entry<ResourceGroupId,Set<String>> e : groupToServices.entrySet()) { + if (e.getValue().size() > 1) { + log.warn("Compaction services " + e.getValue().toString() + + " mapped to the same compactor group: " + e.getKey()); + dupesFound = true; + } + } + if (dupesFound) { + throw new IllegalStateException( + "Multiple compaction services configured to use the same group. This could lead" + + " to undesired behavior. Please fix the configuration"); } - log.atLevel(level).log("Properties file has passed all checks."); - log.info("Properties file has passed all checks."); ++ logAction.log(log, "Properties file has passed all checks."); } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java index c9698191e5,0000000000..53d22235c8 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java @@@ -1,157 -1,0 +1,154 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.CountDownTimer; +import org.apache.accumulo.manager.EventCoordinator.Event; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Event queue that collapses events when possible. + */ +public class EventQueue { + - private static final Logger log = LoggerFactory.getLogger(EventQueue.class); + private boolean allLevels = false; + + private static class Table { + final TableId tableId; + boolean allExtents = false; + Map<KeyExtent,Event> extents = new HashMap<>(); + + private Table(TableId tableId) { + this.tableId = tableId; + } + + public void add(Event event) { + if (allExtents) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.TABLE) { + allExtents = true; + extents.clear(); + } else { + Preconditions.checkArgument(event.getScope() == EventCoordinator.EventScope.TABLE_RANGE); + extents.put(event.getExtent(), event); + if (extents.size() > 10_000) { + allExtents = true; + extents.clear(); + } + } + } + + public void fill(List<Event> events) { + if (allExtents) { + events.add(new Event(tableId)); + } else { + events.addAll(extents.values()); + } + } + } + + private static class Level { + final Ample.DataLevel dataLevel; + boolean allTables = false; + Map<TableId,Table> tables = new HashMap<>(); + + private Level(Ample.DataLevel dataLevel) { + this.dataLevel = dataLevel; + } + + void add(Event event) { + if (allTables) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.DATA_LEVEL) { + allTables = true; + tables.clear(); + } else { + var table = tables.computeIfAbsent(event.getTableId(), Table::new); + table.add(event); + } + } + + public void fill(List<Event> events) { + if (allTables) { + events.add(new Event(dataLevel)); + } else { + tables.values().forEach(table -> table.fill(events)); + } + } + } + + private HashMap<Ample.DataLevel,Level> levels = new HashMap<>(); + + public synchronized void add(Event event) { + if (allLevels) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.ALL) { + allLevels = true; + levels.clear(); + } else { + var level = levels.computeIfAbsent(event.getLevel(), Level::new); + level.add(event); + } + notify(); + } + + private static final List<Event> ALL_LEVELS = List.of(new Event()); + + public synchronized List<Event> poll(long duration, TimeUnit timeUnit) + throws InterruptedException { + CountDownTimer timer = CountDownTimer.startNew(duration, timeUnit); + while (!allLevels && levels.isEmpty() && !timer.isExpired()) { + wait(Math.max(1, timer.timeLeft(TimeUnit.MILLISECONDS))); + } + + List<Event> events; + if (allLevels) { + events = ALL_LEVELS; + } else { + events = new ArrayList<>(); + levels.values().forEach(l -> l.fill(events)); + } + + // reset back to empty + allLevels = false; + levels.clear(); + + return events; + } + + public List<Event> take() throws InterruptedException { + return poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 5fd56b3c2c,45f55169a6..5c6a393126 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -94,17 -97,15 +94,16 @@@ import org.apache.accumulo.server.manag import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.ClosableIterator; import org.apache.accumulo.server.manager.state.DistributedStoreException; -import org.apache.accumulo.server.manager.state.MergeInfo; -import org.apache.accumulo.server.manager.state.MergeState; +import org.apache.accumulo.server.manager.state.TabletGoalState; +import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.manager.state.UnassignedTablet; -import org.apache.accumulo.server.tablets.TabletTime; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - import org.slf4j.event.Level; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; @@@ -113,11 -114,8 +112,10 @@@ import com.google.common.net.HostAndPor abstract class TabletGroupWatcher extends AccumuloDaemonThread { + private static final Logger LOG = LoggerFactory.getLogger(TabletGroupWatcher.class); + - private static final Logger TABLET_UNLOAD_LOGGER = - new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Level.INFO); - + private static final EscalatingLogger TABLET_UNLOAD_LOGGER = + new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Logger::info); private final Manager manager; private final TabletStateStore store; private final TabletGroupWatcher dependentWatcher; @@@ -225,565 -185,208 +223,565 @@@ } } - @Override - public void run() { - int[] oldCounts = new int[TabletState.values().length]; - EventCoordinator.Listener eventListener = this.manager.nextEvent.getListener(); + class EventHandler implements EventCoordinator.Listener { - WalStateManager wals = new WalStateManager(manager.getContext()); + // Setting this to true to start with because its not know what happended before this object was + // created, so just start off with full scan. + private boolean needsFullScan = true; - while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + private final EventQueue eventQueue; - final long waitTimeBetweenScans = manager.getConfiguration() - .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + class RangeProcessor implements Runnable { + @Override + public void run() { + try { + while (manager.stillManager()) { + var events = eventQueue.poll(100, TimeUnit.MILLISECONDS); - int totalUnloaded = 0; - int unloaded = 0; - ClosableIterator<TabletLocationState> iter = null; - try { - Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); - Map<TableId,MergeStats> currentMerges = new HashMap<>(); - for (MergeInfo merge : manager.merges()) { - if (merge.getExtent() != null) { - currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + if (events.isEmpty()) { + // check to see if still the manager + continue; + } + + EnumSet<EventScope> scopesSeen = EnumSet.noneOf(EventScope.class); + List<Range> ranges = new ArrayList<>(events.size()); + for (var event : events) { + scopesSeen.add(event.getScope()); + if (event.getScope() == EventScope.TABLE + || event.getScope() == EventScope.TABLE_RANGE) { + ranges.add(event.getExtent().toMetaRange()); + } + } + + if (scopesSeen.contains(EventScope.ALL) || scopesSeen.contains(EventScope.DATA_LEVEL)) { + // Since this code should only receive events for a single data level, and seeing a + // data level should squish all table and tablet events, then seeing ranges indicates + // assumptions this code is making are incorrect or there is a bug somewhere. + Preconditions.checkState(ranges.isEmpty()); + setNeedsFullScan(); + } else { + if (!processRanges(ranges)) { + setNeedsFullScan(); + } + } } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } + + EventHandler() { + eventQueue = new EventQueue(); + Threads.createCriticalThread("TGW [" + store.name() + "] event range processor", + new RangeProcessor()).start(); + } + + private synchronized void setNeedsFullScan() { + needsFullScan = true; + notifyAll(); + } - // Get the current status for the current list of tservers - SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { - currentTServers.put(entry, manager.tserverStatus.get(entry)); + public synchronized void clearNeedsFullScan() { + needsFullScan = false; + } + + public synchronized boolean isNeedsFullScan() { + return needsFullScan; + } + + @Override + public void process(Event event) { + eventQueue.add(event); + } + + synchronized void waitForFullScan(long millis) { + if (!needsFullScan) { + try { + wait(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } + } - if (currentTServers.isEmpty()) { - eventListener.waitForEvents(waitTimeBetweenScans); - synchronized (this) { - lastScanServers = Collections.emptySortedSet(); + private boolean processRanges(List<Range> ranges) { + if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { + return false; + } + + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); + + var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); + if (currentTservers.isEmpty()) { + return false; + } + + try (var iter = store.iterator(ranges, tabletMgmtParams)) { + long t1 = System.currentTimeMillis(); + manageTablets(iter, tabletMgmtParams, currentTservers, false); + long t2 = System.currentTimeMillis(); + Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", + store.name(), (t2 - t1) / 1000., ranges.size())); + } catch (Exception e) { + Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), store.name(), e); + } + + return true; + } + + private final Set<KeyExtent> hostingRequestInProgress = new ConcurrentSkipListSet<>(); + + public void hostOndemand(Collection<KeyExtent> extents) { + // This is only expected to be called for the user level + Preconditions.checkState(getLevel() == Ample.DataLevel.USER); + + final List<KeyExtent> inProgress = new ArrayList<>(); + extents.forEach(ke -> { + if (hostingRequestInProgress.add(ke)) { + LOG.info("Tablet hosting requested for: {} ", ke); + inProgress.add(ke); + } else { + LOG.trace("Ignoring hosting request because another thread is currently processing it {}", + ke); + } + }); + // Do not add any code here, it may interfere with the finally block removing extents from + // hostingRequestInProgress + try (var mutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + inProgress.forEach(ke -> mutator.mutateTablet(ke).requireAbsentOperation() + .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() + .setHostingRequested() + .submit(TabletMetadata::getHostingRequested, () -> "host ondemand")); + + List<Range> ranges = new ArrayList<>(); + + mutator.process().forEach((extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + // cache this success for a bit + ranges.add(extent.toMetaRange()); + } else { + if (LOG.isTraceEnabled()) { + // only read the metadata if the logging is enabled + LOG.trace("Failed to set hosting request {}", result.readMetadata()); } - continue; } + }); + + if (!ranges.isEmpty()) { + processRanges(ranges); + } + } finally { + inProgress.forEach(hostingRequestInProgress::remove); + } + } - TabletLists tLists = new TabletLists(manager, currentTServers); + private TabletManagementParameters + createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { - RecoveryManager.RecoverySession recoverySession = - manager.recoveryManager.newRecoverySession(); + HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>(); + UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); + for (var level : Ample.DataLevel.values()) { + parentLevelUpgrade.put(level, upgradeStatus.isParentLevelUpgraded(level)); + } - ManagerState managerState = manager.getManagerState(); - int[] counts = new int[TabletState.values().length]; - stats.begin(); - // Walk through the tablets in our store, and work tablets - // towards their goal - iter = store.iterator(); - while (iter.hasNext()) { - TabletLocationState tls = iter.next(); - if (tls == null) { - continue; - } + Set<TServerInstance> shutdownServers; + if (store.getLevel() == Ample.DataLevel.USER) { + shutdownServers = manager.shutdownServers(); + } else { + // Use the servers to shutdown filtered by the dependent watcher. These are servers to + // shutdown that the dependent watcher has determined it has no tablets hosted on or assigned + // to. + shutdownServers = dependentWatcher.getFilteredServersToShutdown(); + } - // ignore entries for tables that do not exist in zookeeper - if (manager.getTableManager().getTableState(tls.extent.tableId()) == null) { - continue; - } + var tServersSnapshot = manager.tserversSnapshot(); + + var tabletMgmtParams = new TabletManagementParameters(manager.getManagerState(), + parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, shutdownServers, + store.getLevel(), manager.getCompactionHints(store.getLevel()), canSuspendTablets(), + lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() + : Map.of(), + manager.getSteadyTime()); + + if (LOG.isTraceEnabled()) { + // Log the json that will be passed to iterators to make tablet filtering decisions. + LOG.trace("{}:{}", TabletManagementParameters.class.getSimpleName(), + tabletMgmtParams.serialize()); + } + + return tabletMgmtParams; + } + + private Set<TServerInstance> getFilteredServersToShutdown() { + return filteredServersToShutdown; + } + + private static class TableMgmtStats { + final int[] counts = new int[TabletState.values().length]; + private int totalUnloaded; + private long totalVolumeReplacements; + private int tabletsWithErrors; + } + + private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, + TabletManagementParameters tableMgmtParams, + SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) + throws TException, DistributedStoreException, WalMarkerException, IOException { + + // When upgrading the Manager needs the TabletGroupWatcher + // to assign and balance the root and metadata tables, but + // the Manager does not fully start up until the upgrade + // is complete. This means that objects like the Splitter + // are not going to be initialized and the Coordinator + // is not going to be started. + final boolean currentlyUpgrading = manager.isUpgrading(); + if (currentlyUpgrading) { + LOG.debug( + "Currently upgrading, splits and compactions for tables in level {} will occur once upgrade is completed.", + store.getLevel()); + } - // Don't overwhelm the tablet servers with work - if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(tLists, wals); - tLists.reset(); - unloaded = 0; - eventListener.waitForEvents(waitTimeBetweenScans); + final TableMgmtStats tableMgmtStats = new TableMgmtStats(); + final boolean shuttingDownAllTabletServers = + tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()); + if (shuttingDownAllTabletServers && !isFullScan) { + // If we are shutting down all of the TabletServers, then don't process any events + // from the EventCoordinator. + LOG.debug("Partial scan requested, but aborted due to shutdown of all TabletServers"); + return tableMgmtStats; + } + + int unloaded = 0; + + TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(), + tableMgmtParams.getServersToShutdown()); + + CompactionJobGenerator compactionGenerator = + new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()), + tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime()); + + try { - CheckCompactionConfig.validate(manager.getConfiguration(), Level.TRACE); ++ CheckCompactionConfig.validate(manager.getConfiguration(), Logger::trace); + this.metrics.clearCompactionServiceConfigurationError(); + } catch (RuntimeException | ReflectiveOperationException e) { + this.metrics.setCompactionServiceConfigurationError(); + LOG.error( + "Error validating compaction configuration, all {} compactions are paused until the configuration is fixed.", + store.getLevel(), e); + compactionGenerator = null; + } + + Set<TServerInstance> filteredServersToShutdown = + new HashSet<>(tableMgmtParams.getServersToShutdown()); + + while (iter.hasNext() && !manager.isShutdownRequested()) { + final TabletManagement mti = iter.next(); + if (mti == null) { + throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); + } + + final String mtiError = mti.getErrorMessage(); + if (mtiError != null) { + LOG.warn( + "Error on TabletServer trying to get Tablet management information for metadata tablet. Error message: {}", + mtiError); + this.metrics.incrementTabletGroupWatcherError(this.store.getLevel()); + tableMgmtStats.tabletsWithErrors++; + continue; + } + + RecoveryManager.RecoverySession recoverySession = + manager.recoveryManager.newRecoverySession(); + + final TabletMetadata tm = mti.getTabletMetadata(); + final TableId tableId = tm.getTableId(); + // ignore entries for tables that do not exist in zookeeper + if (manager.getTableManager().getTableState(tableId) == TableState.UNKNOWN) { + continue; + } + + // Don't overwhelm the tablet servers with work + if (tLists.unassigned.size() + unloaded + > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size() + || tLists.volumeReplacements.size() > 1000) { + flushChanges(tLists); + tLists.reset(); + unloaded = 0; + } + + final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); + + TabletState state = TabletState.compute(tm, currentTServers.keySet()); + if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) { + /* + * This code exists to deal with a race condition caused by two threads running in this + * class that compute tablets actions. One thread does full scans and the other reacts to + * events and does partial scans. Below is an example of the race condition this is + * handling. + * + * - TGW Thread 1 : reads the set of tablets servers and its empty + * + * - TGW Thread 2 : reads the set of tablet servers and its [TS1] + * + * - TGW Thread 2 : Sees tabletX without a location and assigns it to TS1 + * + * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's assigned to a dead tablet + * server because its set of live servers is the empty set. + * + * To deal with this race condition, this code recomputes the tablet state using the latest + * tservers when a tablet is seen assigned to a dead tserver. + */ + + TabletState newState = TabletState.compute(tm, manager.tserversSnapshot().getTservers()); + if (newState != state) { + LOG.debug("Tablet state changed when using latest set of tservers {} {} {}", + tm.getExtent(), state, newState); + state = newState; + } + } + tableMgmtStats.counts[state.ordinal()]++; + + // This is final because nothing in this method should change the goal. All computation of the + // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code + // will compute a consistent goal. + final TabletGoalState goal = TabletGoalState.compute(tm, state, + manager.getBalanceManager().getBalancer(), tableMgmtParams); + + final Set<ManagementAction> actions = mti.getActions(); + + if (actions.contains(ManagementAction.NEEDS_RECOVERY) && goal != TabletGoalState.HOSTED) { + LOG.warn("Tablet has wals, but goal is not hosted. Tablet: {}, goal:{}", tm.getExtent(), + goal); + } + + if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) { + tableMgmtStats.totalVolumeReplacements++; + if (state == TabletState.UNASSIGNED || state == TabletState.SUSPENDED) { + var volRep = + VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm); + if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) { + if (tm.getLocation() != null) { + // since the totalVolumeReplacements counter was incremented, should try this again + // later after its unassigned + LOG.debug("Volume replacement needed for {} but it has a location {}.", + tm.getExtent(), tm.getLocation()); + } else if (tm.getOperationId() != null) { + LOG.debug("Volume replacement needed for {} but it has an active operation {}.", + tm.getExtent(), tm.getOperationId()); + } else { + LOG.debug("Volume replacement needed for {}.", tm.getExtent()); + // buffer replacements so that multiple mutations can be done at once + tLists.volumeReplacements.add(volRep); + } + } else { + LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent()); } - TableId tableId = tls.extent.tableId(); - TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - - MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> { - var mStats = currentMerges.get(k); - return mStats != null ? mStats : new MergeStats(new MergeInfo()); - }); - TabletGoalState goal = manager.getGoalState(tls, mergeStats.getMergeInfo()); - Location location = tls.getLocation(); - TabletState state = tls.getState(currentTServers.keySet()); - - TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), - tls.getFutureServer(), tls.getCurrentServer(), tls.walogs.size()); - - stats.update(tableId, state); - mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); - sendChopRequest(mergeStats.getMergeInfo(), state, tls); - sendSplitRequest(mergeStats.getMergeInfo(), state, tls); - - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { - goal = TabletGoalState.HOSTED; + } else { + LOG.debug("Volume replacement needed for {} but its tablet state is {}.", tm.getExtent(), + state); + } + } + + if (actions.contains(ManagementAction.BAD_STATE) && tm.isFutureAndCurrentLocationSet()) { + Manager.log.error("{}, saw tablet with multiple locations, which should not happen", + tm.getExtent()); + logIncorrectTabletLocations(tm); + // take no further action for this tablet + continue; + } + + final Location location = tm.getLocation(); + Location current = null; + Location future = null; + if (tm.hasCurrent()) { + current = tm.getLocation(); + } else { + future = tm.getLocation(); + } + TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(), + future != null ? future.getServerInstance() : null, + current != null ? current.getServerInstance() : null, tm.getLogs().size()); + + if (isFullScan) { + stats.update(tableId, state); + } + + if (Manager.log.isTraceEnabled()) { + Manager.log.trace( + "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{} #wals:{}", + store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()), + dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), + state, goal, actions, tm.getLogs().size()); + } + + final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING); + if (!currentlyUpgrading && needsSplit) { + LOG.debug("{} may need splitting.", tm.getExtent()); + manager.getSplitter().initiateSplit(tm.getExtent()); + } + + if (!currentlyUpgrading && actions.contains(ManagementAction.NEEDS_COMPACTING) + && compactionGenerator != null) { + // Check if tablet needs splitting, priority should be giving to splits over + // compactions because it's best to compact after a split + if (!needsSplit) { + var jobs = compactionGenerator.generateJobs(tm, + TabletManagementIterator.determineCompactionKinds(actions)); + LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); + manager.getCompactionCoordinator().addJobs(tm, jobs); + } else { + LOG.trace("skipping compaction job generation because {} may need splitting.", + tm.getExtent()); + } + } + + if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE) + || actions.contains(ManagementAction.NEEDS_RECOVERY)) { + + if (tm.getLocation() != null) { + filteredServersToShutdown.remove(tm.getLocation().getServerInstance()); + } + + if (goal == TabletGoalState.HOSTED) { + + // RecoveryManager.recoverLogs will return false when all of the logs + // have been sorted so that recovery can occur. Delay the hosting of + // the Tablet until the sorting is finished. + if ((state != TabletState.HOSTED && actions.contains(ManagementAction.NEEDS_RECOVERY)) + && recoverySession.recoverLogs(tm.getLogs())) { + LOG.debug("Not hosting {} as it needs recovery, logs: {}", tm.getExtent(), + tm.getLogs().size()); + continue; } - if (Manager.log.isTraceEnabled()) { - Manager.log.trace( - "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), - dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tls.extent, - state, goal); + switch (state) { + case ASSIGNED_TO_DEAD_SERVER: + hostDeadTablet(tLists, tm, location); + break; + case SUSPENDED: + hostSuspendedTablet(tLists, tm, location, tableConf); + break; + case UNASSIGNED: + hostUnassignedTablet(tLists, tm.getExtent(), + new UnassignedTablet(location, tm.getLast())); + break; + case ASSIGNED: + // Send another reminder + tLists.assigned.add(new Assignment(tm.getExtent(), + future != null ? future.getServerInstance() : null, tm.getLast())); + break; + case HOSTED: + break; } - - // if we are shutting down all the tabletservers, we have to do it in order - if ((goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) - && manager.serversToShutdown.equals(currentTServers.keySet())) { - if (dependentWatcher != null) { - // If the dependentWatcher is for the user tables, check to see - // that user tables exist. - DataLevel dependentLevel = dependentWatcher.store.getLevel(); - boolean userTablesExist = true; - switch (dependentLevel) { - case USER: - Set<TableId> onlineTables = manager.onlineTables(); - onlineTables.remove(RootTable.ID); - onlineTables.remove(MetadataTable.ID); - userTablesExist = !onlineTables.isEmpty(); - break; - case METADATA: - case ROOT: - default: - break; - } - // If the stats object in the dependentWatcher is empty, then it - // currently does not have data about what is hosted or not. In - // that case host these tablets until the dependent watcher can - // gather some data. - final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); - if (dependentLevel == DataLevel.USER) { - if (userTablesExist - && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { - goal = TabletGoalState.HOSTED; - } - } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { - goal = TabletGoalState.HOSTED; + } else { + switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + tLists.suspendedToGoneServers.add(tm); + break; + case ASSIGNED_TO_DEAD_SERVER: + unassignDeadTablet(tLists, tm); + break; + case HOSTED: + TServerConnection client = + manager.tserverSet.getConnection(location.getServerInstance()); + if (client != null) { + TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}", + store.name(), location.getServerInstance(), tm.getExtent(), goal.howUnload()); + client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), + manager.getSteadyTime().getMillis()); + tableMgmtStats.totalUnloaded++; + unloaded++; + } else { + Manager.log.warn("Could not connect to server {}", location); } - } + break; + case ASSIGNED: + case UNASSIGNED: + break; } + } + } + } - if (goal == TabletGoalState.HOSTED) { - if ((state != TabletState.HOSTED && !tls.walogs.isEmpty()) - && recoverySession.recoverLogs(tls.walogs)) { - continue; - } - switch (state) { - case HOSTED: - if (location.getServerInstance().equals(manager.migrations.get(tls.extent))) { - manager.migrations.removeExtent(tls.extent); - } - break; - case ASSIGNED_TO_DEAD_SERVER: - hostDeadTablet(tLists, tls, location, wals); - break; - case SUSPENDED: - hostSuspendedTablet(tLists, tls, location, tableConf); - break; - case UNASSIGNED: - hostUnassignedTablet(tLists, tls.extent, new UnassignedTablet(location, tls.last)); - break; - case ASSIGNED: - // Send another reminder - tLists.assigned.add(new Assignment(tls.extent, tls.getFutureServer(), tls.last)); - break; - } - } else { - switch (state) { - case SUSPENDED: - // Request a move to UNASSIGNED, so as to allow balancing to continue. - tLists.suspendedToGoneServers.add(tls); - cancelOfflineTableMigrations(tls.extent); - break; - case UNASSIGNED: - cancelOfflineTableMigrations(tls.extent); - break; - case ASSIGNED_TO_DEAD_SERVER: - unassignDeadTablet(tLists, tls, wals); - break; - case HOSTED: - TServerConnection client = - manager.tserverSet.getConnection(location.getServerInstance()); - if (client != null) { - try { - TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}", - store.name(), location.getServerInstance(), tls.extent, goal.howUnload()); - client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(), - manager.getSteadyTime()); - unloaded++; - totalUnloaded++; - } catch (TException tException) { - Manager.log.warn("[{}] Failed to request tablet unload {} {} {}", store.name(), - location.getServerInstance(), tls.extent, goal.howUnload(), tException); - } - } else { - Manager.log.warn("Could not connect to server {}", location); - } - break; - case ASSIGNED: - break; - } + flushChanges(tLists); + + if (isFullScan) { + this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown); + } + + return tableMgmtStats; + } + + private SortedMap<TServerInstance,TabletServerStatus> + getCurrentTservers(Set<TServerInstance> onlineTservers) { + // Get the current status for the current list of tservers + final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); + for (TServerInstance entry : onlineTservers) { + currentTServers.put(entry, manager.getTserverStatus().status.get(entry)); + } + return currentTServers; + } + + @Override + public void run() { + int[] oldCounts = new int[TabletState.values().length]; + boolean lookForTabletsNeedingVolReplacement = true; + + while (manager.stillManager() && !manager.isShutdownRequested()) { + if (!eventHandler.isNeedsFullScan()) { + // If an event handled by the EventHandler.RangeProcessor indicated + // that we need to do a full scan, then do it. Otherwise wait a bit + // before re-checking the tablets. + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + final long waitTimeBetweenScans = manager.getConfiguration() + .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + + TabletManagementParameters tableMgmtParams = + createTabletManagementParameters(lookForTabletsNeedingVolReplacement); + var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); + + ClosableIterator<TabletManagement> iter = null; + try { + if (currentTServers.isEmpty()) { + eventHandler.waitForFullScan(waitTimeBetweenScans); + synchronized (this) { + lastScanServers = Collections.emptySortedSet(); } - counts[state.ordinal()]++; + continue; } - flushChanges(tLists, wals); + stats.begin(); + + ManagerState managerState = tableMgmtParams.getManagerState(); + + // Clear the need for a full scan before starting a full scan inorder to detect events that + // happen during the full scan. + eventHandler.clearNeedsFullScan(); + + iter = store.iterator(tableMgmtParams); + manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel()); + var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); + manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel()); + + // If currently looking for volume replacements, determine if the next round needs to look. + if (lookForTabletsNeedingVolReplacement) { + // Continue to look for tablets needing volume replacement if there was an error + // processing tablets in the call to manageTablets() or if we are still performing volume + // replacement. We only want to stop looking for tablets that need volume replacement when + // we have successfully processed all tablet metadata and no more volume replacements are + // being performed. + Manager.log.debug("[{}] saw {} tablets needing volume replacement", store.name(), + tabletMgmtStats.totalVolumeReplacements); + lookForTabletsNeedingVolReplacement = tabletMgmtStats.totalVolumeReplacements != 0 + || tabletMgmtStats.tabletsWithErrors != 0; + if (!lookForTabletsNeedingVolReplacement) { + Manager.log.debug("[{}] no longer looking for volume replacements", store.name()); + } + } // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index d6915b5155,0000000000..57d339485b mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@@ -1,1428 -1,0 +1,1426 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap; +import org.apache.accumulo.core.compaction.thrift.TNextCompactionJob; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.ResourceGroupId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateClient; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; ++import org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction; +import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metadata.schema.filters.HasExternalCompactionsFilter; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; +import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; +import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob; +import org.apache.accumulo.manager.tableOps.FateEnv; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; +import org.apache.accumulo.server.compaction.CompactionPluginUtils; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - import org.slf4j.event.Level; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; + +import io.micrometer.core.instrument.MeterRegistry; + +public class CompactionCoordinator + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { + + // Object that serves as a TopN view of the RunningCompactions, ordered by + // RunningCompaction start time. The first entry in this Set should be the + // oldest RunningCompaction. + public static class TimeOrderedRunningCompactionSet { + + private static final int UPPER_LIMIT = 50; + + Comparator<RunningCompaction> oldestFirstComparator = + Comparator.comparingLong(RunningCompaction::getStartTime) + .thenComparing(rc -> rc.getJob().getExternalCompactionId()); + private final ConcurrentSkipListSet<RunningCompaction> compactions = + new ConcurrentSkipListSet<>(oldestFirstComparator); + + // Tracking size here as ConcurrentSkipListSet.size() is not constant time + private final AtomicInteger size = new AtomicInteger(0); + + public int size() { + return size.get(); + } + + public boolean add(RunningCompaction e) { + boolean added = compactions.add(e); + if (added) { + if (size.incrementAndGet() > UPPER_LIMIT) { + this.remove(compactions.last()); + } + } + return added; + } + + public boolean remove(Object o) { + boolean removed = compactions.remove(o); + if (removed) { + size.decrementAndGet(); + } + return removed; + } + + public Iterator<RunningCompaction> iterator() { + return compactions.iterator(); + } + + public Stream<RunningCompaction> stream() { + return compactions.stream(); + } + + } + + static class FailureCounts { + long failures; + long successes; + + FailureCounts(long failures, long successes) { + this.failures = failures; + this.successes = successes; + } + + static FailureCounts incrementFailure(Object key, FailureCounts counts) { + if (counts == null) { + return new FailureCounts(1, 0); + } + counts.failures++; + return counts; + } + + static FailureCounts incrementSuccess(Object key, FailureCounts counts) { + if (counts == null) { + return new FailureCounts(0, 1); + } + counts.successes++; + return counts; + } + } + + private final ConcurrentHashMap<ResourceGroupId,FailureCounts> failingQueues = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String,FailureCounts> failingCompactors = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<TableId,FailureCounts> failingTables = new ConcurrentHashMap<>(); + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + + public static final String RESTART_UPDATE_MSG = + "Coordinator restarted, compaction found in progress"; + + /* + * Map of compactionId to RunningCompactions. This is an informational cache of what external + * compactions may be running. Its possible it may contain external compactions that are not + * actually running. It may not contain compactions that are actually running. The metadata table + * is the most authoritative source of what external compactions are currently running, but it + * does not have the stats that this map has. + */ + protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = + new ConcurrentHashMap<>(); + + protected final Map<String,TimeOrderedRunningCompactionSet> LONG_RUNNING_COMPACTIONS_BY_RG = + new ConcurrentHashMap<>(); + + /* Map of group name to last time compactor called to get a compaction job */ + private final Map<ResourceGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + + private final ServerContext ctx; + private final AuditedSecurityOperation security; + private final CompactionJobQueues jobQueues; + private final Function<FateInstanceType,FateClient<FateEnv>> fateClients; + // Exposed for tests + protected final CountDownLatch shutdown = new CountDownLatch(1); + + private final Cache<ExternalCompactionId,RunningCompaction> completed; + private final LoadingCache<FateId,CompactionConfig> compactionConfigCache; + private final Cache<Path,Integer> tabletDirCache; + private final DeadCompactionDetector deadCompactionDetector; + + private final QueueMetrics queueMetrics; + private final Manager manager; + + private final LoadingCache<ResourceGroupId,Integer> compactorCounts; + + private volatile long coordinatorStartTime; + + private final Map<DataLevel,ThreadPoolExecutor> reservationPools; + private final Set<String> activeCompactorReservationRequest = ConcurrentHashMap.newKeySet(); + + public CompactionCoordinator(Manager manager, + Function<FateInstanceType,FateClient<FateEnv>> fateClients) { + this.ctx = manager.getContext(); + this.security = ctx.getSecurityOperation(); + this.manager = Objects.requireNonNull(manager); + + long jobQueueMaxSize = + ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE); + + this.jobQueues = new CompactionJobQueues(jobQueueMaxSize); + + this.queueMetrics = new QueueMetrics(jobQueues); + + this.fateClients = fateClients; + + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) + .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); + + CacheLoader<FateId,CompactionConfig> loader = + fateId -> CompactionConfigStorage.getConfig(ctx, fateId); + + // Keep a small short lived cache of compaction config. Compaction config never changes, however + // when a compaction is canceled it is deleted which is why there is a time limit. It does not + // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable + // config will help avoid reading the same data over and over. + compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true) + .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader); + + Weigher<Path,Integer> weigher = (path, count) -> { + return path.toUri().toString().length(); + }; + + tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) + .maximumWeight(10485760L).weigher(weigher).build(); + + deadCompactionDetector = + new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(), fateClients); + + var rootReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true); + + var metaReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true); + + var userReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true); + + reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, Ample.DataLevel.METADATA, + metaReservationPool, Ample.DataLevel.USER, userReservationPool); + + compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false) + .expireAfterWrite(2, TimeUnit.MINUTES).build(this::countCompactors); + // At this point the manager does not have its lock so no actions should be taken yet + } + + protected int countCompactors(ResourceGroupId groupName) { + return ExternalCompactionUtil.countCompactors(groupName, ctx); + } + + private volatile Thread serviceThread = null; + + public void start() { + serviceThread = Threads.createCriticalThread("CompactionCoordinator Thread", this); + serviceThread.start(); + } + + public void shutdown() { + shutdown.countDown(); + + reservationPools.values().forEach(ExecutorService::shutdownNow); + + var localThread = serviceThread; + if (localThread != null) { + try { + localThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception stopping compaction coordinator thread", e); + } + } + } + + protected void startCompactorZKCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = schedExecutor + .scheduleWithFixedDelay(this::cleanUpEmptyCompactorPathInZK, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startInternalStateCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpInternalState, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startConfigMonitor(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::checkForConfigChanges, 0, 1, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void checkForConfigChanges() { + long jobQueueMaxSize = + ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE); + jobQueues.resetMaxSize(jobQueueMaxSize); + } + + @Override + public void run() { + + this.coordinatorStartTime = System.currentTimeMillis(); + startConfigMonitor(ctx.getScheduledExecutor()); + startCompactorZKCleaner(ctx.getScheduledExecutor()); + + // On a re-start of the coordinator it's possible that external compactions are in-progress. + // Attempt to get the running compactions on the compactors and then resolve which tserver + // the external compaction came from to re-populate the RUNNING collection. + LOG.info("Checking for running external compactions"); + // On re-start contact the running Compactors to try and seed the list of running compactions + List<RunningCompaction> running = getCompactionsRunningOnCompactors(); + if (running.isEmpty()) { + LOG.info("No running external compactions found"); + } else { + LOG.info("Found {} running external compactions", running.size()); + running.forEach(rc -> { + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.IN_PROGRESS); + update.setMessage(RESTART_UPDATE_MSG); + rc.addUpdate(System.currentTimeMillis(), update); + rc.setStartTime(this.coordinatorStartTime); + RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); + LONG_RUNNING_COMPACTIONS_BY_RG + .computeIfAbsent(rc.getGroup().canonical(), k -> new TimeOrderedRunningCompactionSet()) + .add(rc); + }); + } + + startDeadCompactionDetector(); + startQueueRunningSummaryLogging(); + startFailureSummaryLogging(); + startInternalStateCleaner(ctx.getScheduledExecutor()); + + try { + shutdown.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for shutdown latch.", e); + } + + LOG.info("Shutting down"); + } + + private Map<String,Set<HostAndPort>> getIdleCompactors(Set<ServerId> runningCompactors) { + + final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); + runningCompactors.forEach((csi) -> allCompactors + .computeIfAbsent(csi.getResourceGroup().canonical(), (k) -> new HashSet<>()) + .add(HostAndPort.fromParts(csi.getHost(), csi.getPort()))); + + final Set<String> emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + Set<HostAndPort> busyCompactors = allCompactors.get(rc.getGroup().canonical()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getGroup().canonical()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + + protected void startDeadCompactionDetector() { + deadCompactionDetector.start(); + } + + protected long getMissingCompactorWarningTime() { + return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; + } + + public long getNumRunningCompactions() { + return RUNNING_CACHE.size(); + } + + /** + * Return the next compaction job from the queue to a Compactor + * + * @param groupName group + * @param compactorAddress compactor address + * @throws ThriftSecurityException when permission error + * @return compaction job + */ + @Override + public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactorAddress, String externalCompactionId) + throws ThriftSecurityException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + ResourceGroupId groupId = ResourceGroupId.of(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); + + TExternalCompactionJob result = null; + + ResolvedCompactionJob rcJob = (ResolvedCompactionJob) jobQueues.poll(groupId); + + while (rcJob != null) { + + Optional<CompactionConfig> compactionConfig = getCompactionConfig(rcJob); + + // this method may reread the metadata, do not use the metadata in rcJob for anything after + // this method + CompactionMetadata ecm = null; + + var kind = rcJob.getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled the + // config is deleted. + var cid = ExternalCompactionId.from(externalCompactionId); + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(rcJob, compactorAddress, cid); + } + + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, rcJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, groupId)); + TabletLogger.compacting(rcJob.getExtent(), rcJob.getSelectedFateId(), cid, compactorAddress, + rcJob, ecm.getCompactTmpName()); + break; + } else { + LOG.debug( + "Unable to reserve compaction job for {}, pulling another off the queue for group {}", + rcJob.getExtent(), groupName); + rcJob = (ResolvedCompactionJob) jobQueues.poll(ResourceGroupId.of(groupName)); + } + } + + if (rcJob == null) { + LOG.trace("No jobs found in group {} ", groupName); + } + + if (result == null) { + LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, + compactorAddress); + result = new TExternalCompactionJob(); + } + + return new TNextCompactionJob(result, compactorCounts.get(groupId)); + } + + private void checkTabletDir(KeyExtent extent, Path path) { + try { + if (tabletDirCache.getIfPresent(path) == null) { + FileStatus[] files = null; + try { + files = ctx.getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + LOG.debug("Tablet {} had no dir, creating {}", extent, path); + + ctx.getVolumeManager().mkdirs(path); + } + tabletDirCache.put(path, 1); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected CompactionMetadata createExternalCompactionMetadata(ResolvedCompactionJob job, + String compactorAddress, ExternalCompactionId externalCompactionId) { + boolean propDels = !job.isCompactingAll(); + FateId fateId = job.getSelectedFateId(); + + Consumer<String> directoryCreator = dir -> checkTabletDir(job.getExtent(), new Path(dir)); + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, + job.getExtent(), job.getTabletDir(), directoryCreator, externalCompactionId); + + return new CompactionMetadata(job.getJobFiles(), newFile, compactorAddress, job.getKind(), + job.getPriority(), job.getGroup(), propDels, fateId); + + } + + private class ReserveCompactionTask implements Supplier<CompactionMetadata> { + private final ResolvedCompactionJob rcJob; + private final String compactorAddress; + private final ExternalCompactionId externalCompactionId; + + private ReserveCompactionTask(ResolvedCompactionJob rcJob, String compactorAddress, + ExternalCompactionId externalCompactionId) { + Preconditions.checkArgument( + rcJob.getKind() == CompactionKind.SYSTEM || rcJob.getKind() == CompactionKind.USER); + this.rcJob = Objects.requireNonNull(rcJob); + this.compactorAddress = Objects.requireNonNull(compactorAddress); + this.externalCompactionId = Objects.requireNonNull(externalCompactionId); + Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress), + "compactor %s already on has a reservation in flight, cannot process %s", + compactorAddress, externalCompactionId); + } + + @Override + public CompactionMetadata get() { + if (ctx.getTableState(rcJob.getExtent().tableId()) != TableState.ONLINE) { + return null; + } + + try { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = rcJob.getExtent(); + var jobFiles = rcJob.getJobFiles(); + long selectedExpirationDuration = ctx.getTableConfiguration(extent.tableId()) + .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION); + var reservationCheck = new CompactionReservationCheck(rcJob.getKind(), jobFiles, + rcJob.getSelectedFateId(), rcJob.isOverlapsSelectedFiles(), manager.getSteadyTime(), + selectedExpirationDuration); + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireCheckSuccess(reservationCheck); + + var ecm = createExternalCompactionMetadata(rcJob, compactorAddress, externalCompactionId); + + if (rcJob.isOverlapsSelectedFiles()) { + // There is corresponding code in CompactionReservationCheck that ensures this delete is + // safe to do. + tabletMutator.deleteSelectedFiles(); + } + tabletMutator.putExternalCompaction(externalCompactionId, ecm); + + tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId), + () -> "compaction reservation"); + + var result = tabletsMutator.process().get(extent); + + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + return null; + } + } + } finally { + Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress), + "compactorAddress:%s", compactorAddress); + } + } + } + + protected CompactionMetadata reserveCompaction(ResolvedCompactionJob rcJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + + if (activeCompactorReservationRequest.contains(compactorAddress)) { + // In this case the compactor has a previously submitted reservation request that is still + // processing. Do not want to let it queue up another reservation request. One possible cause + // of this is that compactor timed out waiting for its last request to process and is now + // making another request. The previously submitted request can not be used because the + // compactor generates a new uuid for each request it makes. So the best thing to do is to + // return null and wait for this situation to resolve. This will likely happen when some part + // of the distributed system is not working well, so at this point want to avoid making + // problems worse instead of trying to reserve a job. + LOG.warn( + "Ignoring request from {} to reserve compaction job because it has a reservation request in progress.", + compactorAddress); + return null; + } + + var dataLevel = DataLevel.of(rcJob.getExtent().tableId()); + var future = CompletableFuture.supplyAsync( + new ReserveCompactionTask(rcJob, compactorAddress, externalCompactionId), + reservationPools.get(dataLevel)); + return future.join(); + } + + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, ResolvedCompactionJob rcJob, + Optional<CompactionConfig> compactionConfig) { + + // Only reach out to metadata table and get these if requested, usually not needed unless + // plugiun requests it. + Supplier<Set<CompactableFile>> selectedFiles = Suppliers.memoize(() -> { + if (rcJob.getKind() == CompactionKind.SYSTEM) { + return Set.of(); + } else { + var tabletMetadata = + ctx.getAmple().readTablet(rcJob.getExtent(), SELECTED, FILES, PREV_ROW); + Preconditions.checkState( + tabletMetadata.getSelectedFiles().getFateId().equals(rcJob.getSelectedFateId())); + return tabletMetadata.getSelectedFiles().getFiles().stream() + .map(file -> new CompactableFileImpl(file, tabletMetadata.getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } + }); + + Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, + rcJob.getExtent(), rcJob.getFiles(), selectedFiles, ecm.getCompactTmpName()); + + IteratorConfig iteratorSettings = SystemIteratorUtil + .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of())); + + var files = rcJob.getJobFilesMap().entrySet().stream().map(e -> { + StoredTabletFile file = e.getKey(); + DataFileValue dfv = e.getValue(); + return new InputFile(file.getMetadata(), dfv.getSize(), dfv.getNumEntries(), dfv.getTime()); + }).collect(toList()); + + // The fateId here corresponds to the Fate transaction that is driving a user initiated + // compaction. A system initiated compaction has no Fate transaction driving it so its ok to set + // it to null. If anything tries to use the id for a system compaction and triggers a NPE it's + // probably a bug that needs to be fixed. + FateId fateId = null; + if (rcJob.getKind() == CompactionKind.USER) { + fateId = rcJob.getSelectedFateId(); + } + + return new TExternalCompactionJob(externalCompactionId, rcJob.getExtent().toThrift(), files, + iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), fateId == null ? null : fateId.toThrift(), + overrides); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + queueMetrics.registerMetrics(registry); + } + + public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + ArrayList<CompactionJob> resolvedJobs = new ArrayList<>(jobs.size()); + for (var job : jobs) { + resolvedJobs.add(new ResolvedCompactionJob(job, tabletMetadata)); + } + + jobQueues.add(tabletMetadata.getExtent(), resolvedJobs); + } + + public CompactionCoordinatorService.Iface getThriftService() { + return this; + } + + private Optional<CompactionConfig> getCompactionConfig(ResolvedCompactionJob rcJob) { + if (rcJob.getKind() == CompactionKind.USER) { + var cconf = compactionConfigCache.get(rcJob.getSelectedFateId()); + return Optional.ofNullable(cconf); + } + return Optional.empty(); + } + + /** + * Compactors calls this method when they have finished a compaction. This method does the + * following. + * + * <ol> + * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that + * things changed while the compaction was running and it can no longer commit.</li> + * <li>Commit the compaction using a conditional mutation. If the tablets files or location + * changed since reading the tablets metadata, then conditional mutation will fail. When this + * happens it will reread the metadata and go back to step 1 conceptually. When committing a + * compaction the compacted files are removed and scan entries are added to the tablet in case the + * files are in use, this prevents GC from deleting the files between updating tablet metadata and + * refreshing the tablet. The scan entries are only added when a tablet has a location.</li> + * <li>After successful commit a refresh request is sent to the tablet if it has a location. This + * will cause the tablet to start using the newly compacted files for future scans. Also the + * tablet can delete the scan entries if there are no active scans using them.</li> + * </ol> + * + * <p> + * User compactions will be refreshed as part of the fate operation. The user compaction fate + * operation will see the compaction was committed after this code updates the tablet metadata, + * however if it were to rely on this code to do the refresh it would not be able to know when the + * refresh was actually done. Therefore, user compactions will refresh as part of the fate + * operation so that it's known to be done before the fate operation returns. Since the fate + * operation will do it, there is no need to do it here for user compactions. + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param textent tablet extent + * @param stats compaction stats + * @throws ThriftSecurityException when permission error + */ + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + // maybe fate has not started yet + var extent = KeyExtent.fromThrift(textent); + var fateType = FateInstanceType.fromTableId(extent.tableId()); + var localFate = fateClients.apply(fateType); + + LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, + extent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + captureSuccess(ecid, extent); + var tabletMeta = + ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + + var tableState = manager.getContext().getTableState(extent.tableId()); + if (tableState != TableState.ONLINE) { + // Its important this check is done after the compaction id is set in the metadata table to + // avoid race conditions with the client code that waits for tables to go offline. That code + // looks for compaction ids in the metadata table after setting the table state. When that + // client code sees nothing for a tablet its important that nothing will changes the tablets + // files after that point in time which this check ensure. + LOG.debug("Not committing compaction {} for {} because of table state {}", ecid, extent, + tableState); + // cleanup metadata table and files related to the compaction + compactionsFailed(Map.of(ecid, extent)); + return; + } + + if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { + return; + } + + // Start a fate transaction to commit the compaction. + CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); + var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); + localFate.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION, + FateKey.forCompactionCommit(ecid), renameOp, true); + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent, String exceptionMessage, TCompactionState failureState) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + if (failureState != TCompactionState.CANCELLED || failureState != TCompactionState.FAILED) { + LOG.error("Unexpected failure state sent to compactionFailed: {}. This is likely a bug.", + failureState); + } + KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); + LOG.info("Compaction {}: id: {}, extent: {}, compactor exception:{}", failureState, + externalCompactionId, fromThriftExtent, exceptionMessage); + final var ecid = ExternalCompactionId.of(externalCompactionId); + if (failureState == TCompactionState.FAILED) { + captureFailure(ecid, fromThriftExtent); + } + compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + } + + private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) { + var rc = RUNNING_CACHE.get(ecid); + if (rc != null) { + failingQueues.compute(rc.getGroup(), FailureCounts::incrementFailure); + final String compactor = rc.getCompactorAddress(); + failingCompactors.compute(compactor, FailureCounts::incrementFailure); + } + failingTables.compute(extent.tableId(), FailureCounts::incrementFailure); + } + + protected void startQueueRunningSummaryLogging() { + CoordinatorSummaryLogger summaryLogger = + new CoordinatorSummaryLogger(ctx, this.jobQueues, this.RUNNING_CACHE, compactorCounts); + + ScheduledFuture<?> future = ctx.getScheduledExecutor() + .scheduleWithFixedDelay(summaryLogger::logSummary, 0, 1, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startFailureSummaryLogging() { + ScheduledFuture<?> future = + ctx.getScheduledExecutor().scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private <T> void printStats(String logPrefix, ConcurrentHashMap<T,FailureCounts> failureCounts, + boolean logSuccessAtTrace) { + for (var key : failureCounts.keySet()) { + failureCounts.compute(key, (k, counts) -> { + if (counts != null) { - Level level; ++ ConditionalLogAction logAction = Logger::debug; + if (counts.failures > 0) { - level = Level.WARN; ++ logAction = Logger::warn; + } else if (logSuccessAtTrace) { - level = Level.TRACE; - } else { - level = Level.DEBUG; ++ logAction = Logger::trace; + } + - LOG.atLevel(level).log("{} {} failures:{} successes:{} since last time this was logged ", ++ logAction.log(LOG, "{} {} failures:{} successes:{} since last time this was logged ", + logPrefix, k, counts.failures, counts.successes); + } + + // clear the counts so they can start building up for the next logging if this key is ever + // used again + return null; + }); + } + } + + private void printStats() { + // Remove down compactors from failing list + Map<String,Set<HostAndPort>> allCompactors = ExternalCompactionUtil.getCompactorAddrs(ctx); + Set<String> allCompactorAddrs = new HashSet<>(); + allCompactors.values().forEach(l -> l.forEach(c -> allCompactorAddrs.add(c.toString()))); + failingCompactors.keySet().retainAll(allCompactorAddrs); + printStats("Queue", failingQueues, false); + printStats("Table", failingTables, false); + printStats("Compactor", failingCompactors, true); + } + + private void captureSuccess(ExternalCompactionId ecid, KeyExtent extent) { + var rc = RUNNING_CACHE.get(ecid); + if (rc != null) { + failingQueues.compute(rc.getGroup(), FailureCounts::incrementSuccess); + final String compactor = rc.getCompactorAddress(); + failingCompactors.compute(compactor, FailureCounts::incrementSuccess); + } + failingTables.compute(extent.tableId(), FailureCounts::incrementSuccess); + } + + void compactionsFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + // Need to process each level by itself because the conditional tablet mutator does not support + // mutating multiple data levels at the same time. Also the conditional tablet mutator does not + // support submitting multiple mutations for a single tablet, so need to group by extent. + + Map<DataLevel,Map<KeyExtent,Set<ExternalCompactionId>>> groupedCompactions = + new EnumMap<>(DataLevel.class); + + compactions.forEach((ecid, extent) -> { + groupedCompactions.computeIfAbsent(DataLevel.of(extent.tableId()), dl -> new HashMap<>()) + .computeIfAbsent(extent, e -> new HashSet<>()).add(ecid); + }); + + groupedCompactions + .forEach((dataLevel, levelCompactions) -> compactionFailedForLevel(levelCompactions)); + } + + void compactionFailedForLevel(Map<KeyExtent,Set<ExternalCompactionId>> compactions) { + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + compactions.forEach((extent, ecids) -> { + try { + ctx.requireNotDeleted(extent.tableId()); + var mutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation(); + ecids.forEach(mutator::requireCompaction); + ecids.forEach(mutator::deleteExternalCompaction); + mutator.submit(new RejectionHandler() { + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } + + @Override + public boolean test(TabletMetadata tabletMetadata) { + return tabletMetadata == null + || Collections.disjoint(tabletMetadata.getExternalCompactions().keySet(), ecids); + } + + }); + } catch (TableDeletedException e) { + LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", + extent.tableId()); + } + }); + + final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>(); + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + + // this should try again later when the dead compaction detector runs, lets log it in case + // its a persistent problem + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to remove failed compaction {} {}", extent, compactions.get(extent)); + } + } else { + // compactionFailed is called from the Compactor when either a compaction fails or + // is cancelled and it's called from the DeadCompactionDetector. This block is + // entered when the conditional mutator above successfully deletes an ecid from + // the tablet metadata. Remove compaction tmp files from the tablet directory + // that have a corresponding ecid in the name. + + ecidsForTablet.clear(); + ecidsForTablet.addAll(compactions.get(extent)); + + if (!ecidsForTablet.isEmpty()) { + final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); + if (tm != null) { + final Collection<Volume> vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = + vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = null; + try { + files = fs.listStatus(new Path(volPath), + (path) -> path.getName().endsWith(fileSuffix)); + } catch (FileNotFoundException e) { + LOG.trace("Failed to list tablet dir {}", volPath, e); + } + if (files != null) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); + } + } + } else { + // TabletMetadata does not exist for the extent. This could be due to a merge or + // split operation. Use the utility to find tmp files at the table level + deadCompactionDetector.addTableId(extent.tableId()); + } + } + } + }); + } + + compactions.values().forEach(ecids -> ecids.forEach(this::recordCompletion)); + } + + /** + * Compactor calls to update the status of the assigned compaction + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param update compaction status update + * @param timestamp timestamp of the message + * @throws ThriftSecurityException when permission error + */ + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId, + timestamp, update); + final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + if (null != rc) { + rc.addUpdate(timestamp, update); + switch (update.state) { + case STARTED: + LONG_RUNNING_COMPACTIONS_BY_RG.computeIfAbsent(rc.getGroup().canonical(), + k -> new TimeOrderedRunningCompactionSet()).add(rc); + break; + case CANCELLED: + case FAILED: + case SUCCEEDED: + var compactionSet = LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical()); + if (compactionSet != null) { + compactionSet.remove(rc); + } + break; + case ASSIGNED: + case IN_PROGRESS: + default: + // do nothing + break; + + } + } + } + + public void recordCompletion(ExternalCompactionId ecid) { + var rc = RUNNING_CACHE.remove(ecid); + if (rc != null) { + completed.put(ecid, rc); + var compactionSet = LONG_RUNNING_COMPACTIONS_BY_RG.get(rc.getGroup().canonical()); + if (compactionSet != null) { + compactionSet.remove(rc); + } + } + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + try (TabletsMetadata tabletsMetadata = + this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { + return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + } + + /** + * Return information about running compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionMap getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + final TExternalCompactionMap result = new TExternalCompactionMap(); + RUNNING_CACHE.forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroup().canonical()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setUpdates(rc.getUpdates()); + trc.setJob(rc.getJob()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + /** + * Return top 50 longest running compactions for each resource group + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of group name to list of up to 50 compactions in sorted order, oldest compaction + * first. + * @throws ThriftSecurityException permission error + */ + @Override + public Map<String,TExternalCompactionList> getLongRunningCompactions(TInfo tinfo, + TCredentials credentials) throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + final Map<String,TExternalCompactionList> result = new HashMap<>(); + + for (Entry<String,TimeOrderedRunningCompactionSet> e : LONG_RUNNING_COMPACTIONS_BY_RG + .entrySet()) { + final TExternalCompactionList compactions = new TExternalCompactionList(); + Iterator<RunningCompaction> iter = e.getValue().iterator(); + while (iter.hasNext()) { + RunningCompaction rc = iter.next(); + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroup().canonical()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setUpdates(rc.getUpdates()); + trc.setJob(rc.getJob()); + compactions.addToCompactions(trc); + } + result.put(e.getKey(), compactions); + } + return result; + } + + /** + * Return information about recently completed compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionMap getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + final TExternalCompactionMap result = new TExternalCompactionMap(); + completed.asMap().forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroup().canonical()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setJob(rc.getJob()); + trc.setUpdates(rc.getUpdates()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); + try { + NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId()); + if (!security.canCompact(credentials, extent.tableId(), nsId)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + } catch (TableNotFoundException e) { + throw new ThriftTableOperationException(extent.tableId().canonical(), null, + TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage()); + } + + cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); + } + + /* Method exists to be called from test */ + public CompactionJobQueues getJobQueues() { + return jobQueues; + } + + /* Method exists to be overridden in test to hide static method */ + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); + } + + /* Method exists to be overridden in test to hide static method */ + protected Set<ServerId> getRunningCompactors() { + return ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR); + } + + /* Method exists to be overridden in test to hide static method */ + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { + HostAndPort hostPort = HostAndPort.fromString(address); + ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId); + } + + private void deleteEmpty(ZooReaderWriter zoorw, String path) + throws KeeperException, InterruptedException { + try { + LOG.debug("Deleting empty ZK node {}", path); + zoorw.delete(path); + } catch (KeeperException.NotEmptyException e) { + LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path); + } + } + + private void cleanUpEmptyCompactorPathInZK() { + + final var zoorw = this.ctx.getZooSession().asReaderWriter(); + + try { + var groups = zoorw.getChildren(Constants.ZCOMPACTORS); + + for (String group : groups) { + final String qpath = Constants.ZCOMPACTORS + "/" + group; + final ResourceGroupId cgid = ResourceGroupId.of(group); + final var compactors = zoorw.getChildren(qpath); + + if (compactors.isEmpty()) { + deleteEmpty(zoorw, qpath); + // Group has no compactors, we can clear its + // associated priority queue of jobs + CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); + if (queue != null) { + queue.clearIfInactive(Duration.ofMinutes(10)); + } + } else { + for (String compactor : compactors) { + String cpath = Constants.ZCOMPACTORS + "/" + group + "/" + compactor; + var lockNodes = + zoorw.getChildren(Constants.ZCOMPACTORS + "/" + group + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } + } + } + } + } catch (KeeperException | RuntimeException e) { + LOG.warn("Failed to clean up compactors", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + private Set<ResourceGroupId> getCompactionServicesConfigurationGroups() + throws ReflectiveOperationException, IllegalArgumentException, SecurityException { + + Set<ResourceGroupId> groups = new HashSet<>(); + AccumuloConfiguration config = ctx.getConfiguration(); + CompactionServicesConfig servicesConfig = new CompactionServicesConfig(config); + + for (var entry : servicesConfig.getPlanners().entrySet()) { + String serviceId = entry.getKey(); + String plannerClassName = entry.getValue(); + + Class<? extends CompactionPlanner> plannerClass = + Class.forName(plannerClassName).asSubclass(CompactionPlanner.class); + CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance(); + + var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId), + servicesConfig.getPlannerPrefix(serviceId), servicesConfig.getOptions().get(serviceId), + new ServiceEnvironmentImpl(ctx)); + + planner.init(initParams); + + groups.addAll(initParams.getRequestedGroups()); + } + return groups; + } + + public void cleanUpInternalState() { + + // This method does the following: + // + // 1. Removes entries from RUNNING_CACHE and LONG_RUNNING_COMPACTIONS_BY_RG that are not really + // running + // 2. Cancels running compactions for groups that are not in the current configuration + // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED + // 4. Log groups with no compactors + // 5. Log compactors with no groups + // 6. Log groups with compactors and queued jos that have not checked in + + var config = ctx.getConfiguration(); + ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT); + ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META); + ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER); + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + final Set<ExternalCompactionId> idsSnapshot = Set.copyOf(Sets.union(RUNNING_CACHE.keySet(), + LONG_RUNNING_COMPACTIONS_BY_RG.values().stream() + .flatMap(TimeOrderedRunningCompactionSet::stream) + .map(rc -> rc.getJob().getExternalCompactionId()).map(ExternalCompactionId::of) + .collect(Collectors.toSet()))); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + final Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + LOG.trace("Current ECIDs in metadata: {}", idsInMetadata.size()); + LOG.trace("Current ECIDs in running cache: {}", idsSnapshot.size()); + + final Set<ExternalCompactionId> idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + + // Get the set of groups being referenced in the current configuration + Set<ResourceGroupId> groupsInConfiguration = null; + try { + groupsInConfiguration = getCompactionServicesConfigurationGroups(); + } catch (RuntimeException | ReflectiveOperationException e) { + LOG.error( + "Error getting groups from the compaction services configuration. Unable to clean up internal state.", + e); + return; + } + + // Compaction jobs are created in the TabletGroupWatcher and added to the Coordinator + // via the addJobs method which adds the job to the CompactionJobQueues object. + final Set<ResourceGroupId> groupsWithJobs = jobQueues.getQueueIds(); + + final Set<ResourceGroupId> jobGroupsNotInConfiguration = + Sets.difference(groupsWithJobs, groupsInConfiguration); + + if (jobGroupsNotInConfiguration != null && !jobGroupsNotInConfiguration.isEmpty()) { + RUNNING_CACHE.values().forEach(rc -> { + if (jobGroupsNotInConfiguration.contains(ResourceGroupId.of(rc.getGroup().canonical()))) { + LOG.warn( + "External compaction {} running in group {} on compactor {}," + + " but group not found in current configuration. Failing compaction...", + rc.getJob().getExternalCompactionId(), rc.getGroup(), rc.getCompactorAddress()); + cancelCompactionOnCompactor(rc.getCompactorAddress(), + rc.getJob().getExternalCompactionId()); + } + }); + + final Set<ResourceGroupId> trackedGroups = Set.copyOf(TIME_COMPACTOR_LAST_CHECKED.keySet()); + TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(groupsInConfiguration); + LOG.debug("No longer tracking compactor check-in times for groups: {}", + Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); + } + + final Set<ServerId> runningCompactors = getRunningCompactors(); + + final Set<ResourceGroupId> runningCompactorGroups = new HashSet<>(); + runningCompactors.forEach( + c -> runningCompactorGroups.add(ResourceGroupId.of(c.getResourceGroup().canonical()))); + + final Set<ResourceGroupId> groupsWithNoCompactors = + Sets.difference(groupsInConfiguration, runningCompactorGroups); + if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) { + for (ResourceGroupId group : groupsWithNoCompactors) { + long queuedJobCount = jobQueues.getQueuedJobs(group); + if (queuedJobCount > 0) { + LOG.warn("Compactor group {} has {} queued compactions but no running compactors", group, + queuedJobCount); + } + } + } + + final Set<ResourceGroupId> compactorsWithNoGroups = + Sets.difference(runningCompactorGroups, groupsInConfiguration); + if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) { + LOG.warn( + "The following groups have running compactors, but are not in the current configuration: {}", + compactorsWithNoGroups); + } + + final long now = System.currentTimeMillis(); + final long warningTime = getMissingCompactorWarningTime(); + Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors(runningCompactors); + for (ResourceGroupId groupName : groupsInConfiguration) { + long lastCheckTime = + TIME_COMPACTOR_LAST_CHECKED.getOrDefault(groupName, coordinatorStartTime); + if ((now - lastCheckTime) > warningTime && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { + LOG.warn( + "The group {} has queued jobs and {} idle compactors, however none have checked in " + + "with coordinator for {}ms", + groupName, idleCompactors.get(groupName.canonical()).size(), warningTime); + } + } + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 1bc3f51303,ca9901a7ae..186a53e299 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -328,8 -247,8 +327,8 @@@ public class UpgradeCoordinator throw new IllegalStateException("Error checking properties", e); } try { - CheckCompactionConfig.validate(context.getConfiguration(), Level.INFO); - CheckCompactionConfig.validate(context.getConfiguration()); - } catch (SecurityException | IllegalArgumentException | ReflectiveOperationException e) { ++ CheckCompactionConfig.validate(context.getConfiguration(), Logger::info); + } catch (RuntimeException | ReflectiveOperationException e) { throw new IllegalStateException("Error validating compaction configuration", e); } }
