This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit aaa757e4372a22b17201c1bee96b243b620345d3 Author: Keith Turner <[email protected]> AuthorDate: Tue Jul 3 17:08:40 2018 -0400 ACCUMULO-4074 Added support for multiple scan executors (#549) * Created plugin API for prioritizing scans that avoids accessing internal types. This should make user written comparators more stable over time. * Made it possible to configure a comparator for each scan executor. Was previously a single comparator type for all executors. * Made it possible to pass options for comparator creation * Made it possible to configure a per table dispatcher. This dispatcher chooses which scan executor to use. Dispatcher are user written plugins. * Fixed comparator type casting. I don't think the comparators in #510 worked because the Runnables were never the type expected. Also, abstracted all of this away from a user who may write a comparator. --- core/pom.xml | 19 ++ .../accumulo/core/conf/AccumuloConfiguration.java | 136 +++++++++ .../accumulo/core/conf/DefaultConfiguration.java | 5 + .../org/apache/accumulo/core/conf/Property.java | 55 +++- .../accumulo/core/conf/SiteConfiguration.java | 10 + .../core/spi/common/IteratorConfiguration.java | 30 +- .../common/Stats.java} | 36 ++- .../core/spi/scan/IdleRatioScanPrioritizer.java | 51 ++++ .../accumulo/core/spi/scan/ScanDispatcher.java | 52 ++++ .../accumulo/core/spi/scan/ScanExecutor.java | 60 ++++ .../apache/accumulo/core/spi/scan/ScanInfo.java | 116 ++++++++ .../accumulo/core/spi/scan/ScanPrioritizer.java | 22 +- .../core/spi/scan/SimpleScanDispatcher.java | 74 +++++ .../util/AccumuloUncaughtExceptionHandler.java | 1 - .../accumulo/core/util/NamingThreadFactory.java | 16 +- .../java/org/apache/accumulo/core/util/Stat.java | 75 ++--- .../core/conf/AccumuloConfigurationTest.java | 112 +++++++- .../spi/scan/IdleRatioScanPrioritizerTest.java | 61 +++++ .../core/spi/scan/SimpleScanDispatcherTest.java | 62 +++++ .../accumulo/core/spi/scan/TestScanInfo.java | 101 +++++++ .../org/apache/accumulo/core/util/StatTest.java | 40 ++- .../accumulo/server/conf/TableConfiguration.java | 37 +++ .../accumulo/server/conf/ZooConfiguration.java | 13 +- .../org/apache/accumulo/tserver/TabletServer.java | 41 ++- .../tserver/TabletServerResourceManager.java | 304 +++++++++++---------- .../accumulo/tserver/scan/NextBatchTask.java | 7 +- .../tserver/session/DefaultSessionComparator.java | 67 ----- .../accumulo/tserver/session/MultiScanSession.java | 32 +-- .../accumulo/tserver/session/ScanSession.java | 162 ++++++++--- .../apache/accumulo/tserver/session/Session.java | 18 +- .../accumulo/tserver/session/SessionManager.java | 11 +- .../{ScanSession.java => SingleScanSession.java} | 29 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 3 +- .../tserver/session/SessionComparatorTest.java | 170 ------------ .../org/apache/accumulo/test/IMMLGBenchmark.java | 2 +- .../test/performance/scan/CollectTabletStats.java | 5 +- 36 files changed, 1411 insertions(+), 624 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index cf45283..4535f50 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -212,6 +212,25 @@ </allows> </configuration> </execution> + <execution> + <id>apilyzer-spi</id> + <goals> + <goal>analyze</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile> + <includes> + <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include> + </includes> + <excludes /> + <allows> + <allow>org[.]apache[.]hadoop[.]io[.]Text</allow> + <allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow> + <allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow> + <allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow> + </allows> + </configuration> + </execution> </executions> </plugin> <plugin> diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java index 5a9ce21..5427d79 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java @@ -16,12 +16,17 @@ */ package org.apache.accumulo.core.conf; +import java.util.ArrayList; +import java.util.Collection; import java.util.EnumMap; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -29,10 +34,12 @@ import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.PropertyType.PortRange; +import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.core.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; /** @@ -348,6 +355,135 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str return maxFilesPerTablet; } + public class ScanExecutorConfig { + public final String name; + public final int maxThreads; + public final OptionalInt priority; + public final Optional<String> prioritizerClass; + public final Map<String,String> prioritizerOpts; + + public ScanExecutorConfig(String name, int maxThreads, OptionalInt priority, + Optional<String> comparatorFactory, Map<String,String> comparatorFactoryOpts) { + this.name = name; + this.maxThreads = maxThreads; + this.priority = priority; + this.prioritizerClass = comparatorFactory; + this.prioritizerOpts = comparatorFactoryOpts; + } + + /** + * Re-reads the max threads from the configuration that created this class + */ + public int getCurrentMaxThreads() { + Integer depThreads = getDeprecatedScanThreads(name); + if (depThreads != null) { + return depThreads; + } + + String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "." + SCAN_EXEC_THREADS; + String val = getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop); + return Integer.parseInt(val); + } + } + + public boolean isPropertySet(Property prop) { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("deprecation") + Integer getDeprecatedScanThreads(String name) { + + Property prop; + Property deprecatedProp; + + if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) { + prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS; + deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT; + } else if (name.equals("meta")) { + prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS; + deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT; + } else { + return null; + } + + if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) { + log.warn("Property {} is deprecated, use {} instead.", prop.getKey(), + deprecatedProp.getKey()); + return Integer.valueOf(get(deprecatedProp)); + } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) { + log.warn("Deprecated property {} ignored because {} is set", deprecatedProp.getKey(), + prop.getKey()); + } + + return null; + } + + private static final String SCAN_EXEC_THREADS = "threads"; + private static final String SCAN_EXEC_PRIORITY = "priority"; + private static final String SCAN_EXEC_PRIORITIZER = "prioritizer"; + private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts."; + + public Collection<ScanExecutorConfig> getScanExecutors() { + + Map<String,Map<String,String>> propsByName = new HashMap<>(); + + List<ScanExecutorConfig> scanResources = new ArrayList<>(); + + for (Entry<String,String> entry : getAllPropertiesWithPrefix( + Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) { + + String suffix = entry.getKey() + .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length()); + String[] tokens = suffix.split("\\.", 2); + String name = tokens[0]; + + propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1], entry.getValue()); + } + + for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) { + String name = entry.getKey(); + Integer threads = null; + Integer prio = null; + String prioritizerClass = null; + Map<String,String> prioritizerOpts = new HashMap<>(); + + for (Entry<String,String> subEntry : entry.getValue().entrySet()) { + String opt = subEntry.getKey(); + String val = subEntry.getValue(); + + if (opt.equals(SCAN_EXEC_THREADS)) { + Integer depThreads = getDeprecatedScanThreads(name); + if (depThreads == null) { + threads = Integer.parseInt(val); + } else { + threads = depThreads; + } + } else if (opt.equals(SCAN_EXEC_PRIORITY)) { + prio = Integer.parseInt(val); + } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) { + prioritizerClass = val; + } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) { + String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length()); + if (key.isEmpty()) { + throw new IllegalStateException("Invalid scan executor option : " + opt); + } + prioritizerOpts.put(key, val); + } else { + throw new IllegalStateException("Unkown scan executor option : " + opt); + } + } + + Preconditions.checkArgument(threads != null && threads > 0, + "Scan resource %s incorrectly specified threads", name); + + scanResources.add(new ScanExecutorConfig(name, threads, + prio == null ? OptionalInt.empty() : OptionalInt.of(prio), + Optional.ofNullable(prioritizerClass), prioritizerOpts)); + } + + return scanResources; + } + /** * Invalidates the <code>ZooCache</code> used for storage and quick retrieval of properties for * this configuration. diff --git a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java index fc2891e..54b87cc 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java @@ -52,4 +52,9 @@ public class DefaultConfiguration extends AccumuloConfiguration { resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey())) .forEach(e -> props.put(e.getKey(), e.getValue())); } + + @Override + public boolean isPropertySet(Property prop) { + return false; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 98402ef..b7ff210 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -30,6 +30,9 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.spi.scan.ScanDispatcher; +import org.apache.accumulo.core.spi.scan.ScanPrioritizer; +import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.core.util.format.DefaultFormatter; import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter; import org.apache.accumulo.start.classloader.AccumuloClassLoader; @@ -430,14 +433,37 @@ public enum Property { "When a tablet server's SimpleTimer thread triggers to check idle" + " sessions, this configurable option will be used to evaluate update" + " sessions to determine if they can be closed due to inactivity"), + @Deprecated TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT, - "The maximum number of concurrent read ahead that will execute. This effectively" - + " limits the number of long running scans that can run concurrently per tserver."), - TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, PropertyType.PREFIX, - "Properties in this category allow overriding of table specific read ahead pools"), + "This property is deprecated since 2.0.0, use tserver.scan.executors.default.threads " + + "instead. The maximum number of concurrent read ahead that will execute. This " + + "effectively limits the number of long running scans that can run concurrently " + + "per tserver.\""), + @Deprecated TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max", "8", PropertyType.COUNT, - "The maximum number of concurrent metadata read ahead that will execute."), + "This property is deprecated since 2.0.0, use tserver.scan.executors.meta.threads instead. " + + "The maximum number of concurrent metadata read ahead that will execute."), + TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null, PropertyType.PREFIX, + "Prefix for defining executors to service scans. For each executor the number of threads, " + + "thread priority, and an optional prioritizer can be configured. The prioritizer " + + "determines which scan an executor should run first and must implement " + + ScanPrioritizer.class.getName() + ". Tables can select an executor by setting" + + " table.scan.dispatcher. To configure a new executor, set " + + "tserver.scan.executors.<name>.threads=<number>. Optionally, can also set " + + "tserver.scan.executors.<name>.priority=<number 1 to 10>, " + + "tserver.scan.executors.<name>.prioritizer=<class name>, and " + + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"), + TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads", "16", + PropertyType.COUNT, + "The number of threads for the scan executor that tables use by default."), + TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer", "", + PropertyType.STRING, + "Prioritizer for the default scan executor. Defaults to none which " + + "results in FIFO priority. Set to a class that implements " + + ScanPrioritizer.class.getName() + " to configure one."), + TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads", "8", PropertyType.COUNT, + "The number of threads for the metadata table scan executor."), TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", PropertyType.COUNT, "The maximum number of concurrent tablet migrations for a tablet server"), TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT, @@ -541,9 +567,6 @@ public enum Property { TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT, "The number of threads on each tablet server available to retrieve" + " summary data, that is not currently in cache, from RFiles."), - TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME, - "A customizable Scan session comparator. Note that by default, the value is empty" - + " and thus uses no session comparator"), // accumulo garbage collector properties GC_PREFIX("gc.", null, PropertyType.PREFIX, @@ -669,6 +692,19 @@ public enum Property { PropertyType.BYTES, "The max RFile size used for a merging minor compaction. The default" + " value of 0 disables a max file size."), + TABLE_SCAN_DISPATCHER("table.scan.dispatcher", SimpleScanDispatcher.class.getName(), + PropertyType.CLASSNAME, + "This class is used to dynamically dispatch scans to configured scan executors. This setting" + + " defaults to " + SimpleScanDispatcher.class.getSimpleName() + + " which dispatches to an executor" + + " named 'default' when it is optionless. Setting the option " + + "'table.scan.dispatcher.opts.executor=<name>' causes " + + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the specified executor. " + + "It has more options listed in its javadoc. Configured classes must implement " + + ScanDispatcher.class.getName() + ". This property is ignored for the root and metadata" + + " table. The metadata table always dispatches to a scan executor named `meta`."), + TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null, PropertyType.PREFIX, + "Options for the table scan dispatcher"), TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES, "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client."), @@ -1153,7 +1189,8 @@ public enum Property { || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey()) || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey()) || key.startsWith(TABLE_SAMPLER_OPTS.getKey()) - || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey()))); + || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey()) + || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey()))); } /** diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java index 001b1db..cf0fe25 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java @@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * An {@link AccumuloConfiguration} which first loads any properties set on the command-line (using * the -o option) and then from an XML file, usually accumulo-site.xml. This implementation supports @@ -165,6 +167,14 @@ public class SiteConfiguration extends AccumuloConfiguration { } @Override + public boolean isPropertySet(Property prop) { + Preconditions.checkArgument(!prop.isSensitive(), + "This method not implemented for sensitive props"); + return CliConfiguration.get(prop) != null || staticConfigs.containsKey(prop.getKey()) + || getXmlConfig().get(prop.getKey()) != null || parent.isPropertySet(prop); + } + + @Override public void getProperties(Map<String,String> props, Predicate<String> filter) { getProperties(props, filter, true); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java similarity index 58% rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java rename to core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java index 28e6ef2..40f05e5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java @@ -14,23 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.tserver.session; +package org.apache.accumulo.core.spi.common; -public class SingleRangePriorityComparator extends DefaultSessionComparator { +import java.util.Map; - @Override - public int compareSession(Session sessionA, Session sessionB) { - int priority = super.compareSession(sessionA, sessionB); +/** + * Provides information about a configured Accumulo Iterator + * + * @since 2.0.0 + */ +public interface IteratorConfiguration { + String getIteratorClass(); + + String getName(); + + int getPriority(); - if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession) { - if (priority < 0) { - priority *= -1; - } - } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession) { - if (priority > 0) { - priority *= -1; - } - } - return priority; - } + Map<String,String> getOptions(); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java similarity index 58% copy from core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java copy to core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java index f688010..a1b9322 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java @@ -14,21 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.core.util; +package org.apache.accumulo.core.spi.common; -import java.lang.Thread.UncaughtExceptionHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * @since 2.0.0 + */ +public interface Stats { -public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler { + /** + * @return the minimum data point seen, or 0 if no data was seen + */ + long min(); - private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class); + /** + * @return the maximum data point seen, or 0 if no data was seen + */ + long max(); - @Override - public void uncaughtException(Thread t, Throwable e) { + /** + * @return the mean of the data points seen, or {@link Double#NaN} if no data was seen + */ + double mean(); - log.error(String.format("Caught an exception in %s. Shutting down.", t), e); - } + /** + * @return the sum of the data points seen, or 0 if no data was seen + */ + long sum(); + /** + * @return the number of data points seen + */ + long num(); } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java new file mode 100644 index 0000000..c567460 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.spi.scan; + +import java.util.Comparator; +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower ratio have a higher + * priority. When the ratio is equal, the scan with the oldest last run time has the highest + * priority. If neither have run, then the oldest gets priority. + * + * @since 2.0.0 + */ +public class IdleRatioScanPrioritizer implements ScanPrioritizer { + private static double idleRatio(long currTime, ScanInfo si) { + double totalRunTime = si.getRunTimeStats().sum(); + double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum()); + return totalRunTime / totalIdleTime; + } + + @Override + public Comparator<ScanInfo> createComparator(Map<String,String> options) { + Preconditions.checkArgument(options.isEmpty()); + + Comparator<ScanInfo> c1 = (si1, si2) -> { + long currTime = System.currentTimeMillis(); + return Double.compare(idleRatio(currTime, si1), idleRatio(currTime, si2)); + }; + + return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0)) + .thenComparingLong(si -> si.getCreationTime()); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java new file mode 100644 index 0000000..a3bc3f1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * A per table scan dispatcher that decides which executor should be used to processes a scan. For + * information about configuring, find the documentation for the {@code table.scan.dispatcher} and + * {@code table.scan.dispatcher.opts.} properties. + * + * @since 2.0.0 + */ +public interface ScanDispatcher { + /** + * This method is called once after a ScanDispatcher is instantiated. + * + * @param options + * The configured options. For example if the table properties + * {@code table.scan.dispatcher.opts.p1=abc} and + * {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain + * {@code p1=abc} and {@code p9=123}. + */ + public default void init(Map<String,String> options) { + Preconditions.checkArgument(options.isEmpty(), "No options expected"); + } + + /** + * @param scanInfo + * Information about the scan. + * @param scanExecutors + * Information about the currently configured executors. + * @return Should return one of the executors named in scanExecutors.keySet() + */ + String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors); +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java new file mode 100644 index 0000000..a55b090 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for obtaining information about a scan executor + * + * @since 2.0.0 + */ +public interface ScanExecutor { + + interface Config { + /** + * @return the unique name used to identified executor in config + */ + String getName(); + + /** + * @return the max number of threads that were configured + */ + int getMaxThreads(); + + /** + * @return the prioritizer that was configured + */ + Optional<String> getPrioritizerClass(); + + /** + * @return the prioritizer options + */ + Map<String,String> getPrioritizerOptions(); + } + + /** + * @return The number of task queued for the executor + */ + int getQueued(); + + /** + * @return The configuration used to create the executor + */ + Config getConfig(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java new file mode 100644 index 0000000..7961c21 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.util.Collection; +import java.util.OptionalLong; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.spi.common.IteratorConfiguration; +import org.apache.accumulo.core.spi.common.Stats; + +/** + * Provides information about an active Accumulo scan against a tablet. Accumulo scans operate by + * repeatedly gathering batches of data and returning those to the client. + * + * <p> + * All times are in milliseconds and obtained using System.currentTimeMillis(). + * + * @since 2.0.0 + */ +public interface ScanInfo { + + enum Type { + /** + * A single range scan started using a {@link Scanner} + */ + SINGLE, + /** + * A multi range scan started using a {@link BatchScanner} + */ + MULTI + } + + Type getScanType(); + + String getTableId(); + + /** + * Returns the first time a tablet knew about a scan over its portion of data. + */ + long getCreationTime(); + + /** + * If the scan has run, returns the last run time. + */ + OptionalLong getLastRunTime(); + + /** + * Returns timing statistics about running and gathering a batches of data. + */ + Stats getRunTimeStats(); + + /** + * Returns statistics about the time between running. These stats are only about the idle times + * before the last run time. The idle time after the last run time are not included. If the scan + * has never run, then there are no stats. + */ + Stats getIdleTimeStats(); + + /** + * This method is similar to {@link #getIdleTimeStats()}, but it also includes the time period + * between the last run time and now in the stats. If the scan has never run, then the stats are + * computed using only {@code currentTime - creationTime}. + */ + Stats getIdleTimeStats(long currentTime); + + /** + * This method returns what column were fetched by a scan. When a family is fetched, a Column + * object where everything but the family is null is in the set. + * + * <p> + * The following example code shows how this method can be used to check if a family was fetched + * or a family+qualifier was fetched. If continually checking for the same column, should probably + * create a constant. + * + * <pre> + * <code> + * boolean wasFamilyFetched(ScanInfo si, byte[] fam) { + * Column family = new Column(fam, null, null); + * return si.getFetchedColumns().contains(family); + * } + * + * boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) { + * Column col = new Column(fam, qual, null); + * return si.getFetchedColumns().contains(col); + * } + * </code> + * </pre> + * + * + * @return The family and family+qualifier pairs fetched. + */ + Set<Column> getFetchedColumns(); + + /** + * @return iterators that where configured on the client side scanner + */ + Collection<IteratorConfiguration> getClientScanIterators(); +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java similarity index 65% rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java rename to core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java index dcfa1d4..51a4254 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java @@ -14,19 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.tserver.session; +package org.apache.accumulo.core.spi.scan; import java.util.Comparator; +import java.util.Map; -public abstract class SessionComparator implements Comparator<Runnable> { - - @Override - public int compare(Runnable sessionA, Runnable sessionB) { - if (sessionA instanceof Session && sessionB instanceof Session) - return compareSession((Session) sessionA, (Session) sessionB); - else - return 0; - } - - public abstract int compareSession(final Session sessionA, final Session sessionB); +/** + * A factory for creating comparators used for prioritizing scans. For information about + * configuring, find the documentation for the {@code tserver.scan.executors.} property. + * + * @since 2.0.0 + */ +public interface ScanPrioritizer { + Comparator<ScanInfo> createComparator(Map<String,String> options); } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java new file mode 100644 index 0000000..96e7d2c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * If no options are given, then this will dispatch to an executor named {@code default}. This + * dispatcher supports the following options. + * + * <UL> + * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : dispatches all scans to + * the named executor.</LI> + * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>} : dispatches batch + * scans to the named executor.</LI> + * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} : dispatches regular + * scans to the named executor.</LI> + * </UL> + * + * The {@code multi_executor} and {@code single_executor} options override the {@code executor} + * option. + */ + +public class SimpleScanDispatcher implements ScanDispatcher { + + private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor", + "single_executor"); + private String multiExecutor; + private String singleExecutor; + + public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default"; + + @Override + public void init(Map<String,String> options) { + Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS); + Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts); + + String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME); + multiExecutor = options.getOrDefault("multi_executor", base); + singleExecutor = options.getOrDefault("single_executor", base); + } + + @Override + public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) { + switch (scanInfo.getScanType()) { + case MULTI: + return multiExecutor; + case SINGLE: + return singleExecutor; + default: + throw new IllegalArgumentException("Unexpected scan type " + scanInfo.getScanType()); + + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java index f688010..ed9c150 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java +++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java @@ -27,7 +27,6 @@ public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandle @Override public void uncaughtException(Thread t, Throwable e) { - log.error(String.format("Caught an exception in %s. Shutting down.", t), e); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java index 0b4730c..57d429b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.core.util; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.OptionalInt; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -26,20 +28,30 @@ import org.slf4j.LoggerFactory; public class NamingThreadFactory implements ThreadFactory { private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class); - private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new AccumuloUncaughtExceptionHandler(); + private static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler(); private AtomicInteger threadNum = new AtomicInteger(1); private String name; + private OptionalInt priority; public NamingThreadFactory(String name) { this.name = name; + this.priority = OptionalInt.empty(); + } + + public NamingThreadFactory(String name, OptionalInt priority) { + this.name = name; + this.priority = priority; } @Override public Thread newThread(Runnable r) { Thread thread = new Daemon(new LoggingRunnable(log, r), name + " " + threadNum.getAndIncrement()); - thread.setUncaughtExceptionHandler(uncaughtHandler); + thread.setUncaughtExceptionHandler(UEH); + if (priority.isPresent()) { + thread.setPriority(priority.getAsInt()); + } return thread; } diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java b/core/src/main/java/org/apache/accumulo/core/util/Stat.java index 8bdeb63..704622b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java @@ -16,67 +16,70 @@ */ package org.apache.accumulo.core.util; -import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic; +import org.apache.accumulo.core.spi.common.Stats; import org.apache.commons.math3.stat.descriptive.moment.Mean; -import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; -import org.apache.commons.math3.stat.descriptive.rank.Max; -import org.apache.commons.math3.stat.descriptive.rank.Min; -import org.apache.commons.math3.stat.descriptive.summary.Sum; -public class Stat { - Min min; - Max max; - Sum sum; +public class Stat implements Stats { + long min; + long max; + long sum; Mean mean; - StandardDeviation sd; - - StorelessUnivariateStatistic[] stats; public Stat() { - min = new Min(); - max = new Max(); - sum = new Sum(); mean = new Mean(); - sd = new StandardDeviation(); - - stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd}; + clear(); } public void addStat(long stat) { - for (StorelessUnivariateStatistic statistic : stats) { - statistic.increment(stat); - } + min = Math.min(min, stat); + max = Math.max(max, stat); + sum += stat; + mean.increment(stat); } - public long getMin() { - return (long) min.getResult(); + @Override + public long min() { + return num() == 0 ? 0L : min; } - public long getMax() { - return (long) max.getResult(); + @Override + public long max() { + return num() == 0 ? 0L : max; } - public long getSum() { - return (long) sum.getResult(); + @Override + public long sum() { + return sum; } - public double getAverage() { + @Override + public double mean() { return mean.getResult(); } - public double getStdDev() { - return sd.getResult(); - } - @Override public String toString() { - return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), getAverage(), mean.getN()); + return String.format("%,d %,d %,.2f %,d", min(), max(), mean(), mean.getN()); } public void clear() { - for (StorelessUnivariateStatistic statistic : stats) { - statistic.clear(); - } + min = Long.MAX_VALUE; + max = Long.MIN_VALUE; + sum = 0; + mean.clear(); + } + + @Override + public long num() { + return mean.getN(); } + public Stat copy() { + Stat stat = new Stat(); + stat.min = this.min; + stat.max = this.max; + stat.sum = this.sum; + stat.mean = this.mean.copy(); + return stat; + } } diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java index 9f77834..0fcfebd 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java @@ -21,11 +21,15 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.function.Predicate; +import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig; +import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -131,6 +135,15 @@ public class AccumuloConfigurationTest { private HashMap<String,String> props = new HashMap<>(); private int upCount = 0; + private AccumuloConfiguration parent; + + TestConfiguration() { + parent = null; + } + + TestConfiguration(AccumuloConfiguration parent) { + this.parent = parent; + } public void set(String p, String v) { props.put(p, v); @@ -138,17 +151,29 @@ public class AccumuloConfigurationTest { } @Override + public boolean isPropertySet(Property prop) { + return props.containsKey(prop.getKey()); + } + + @Override public long getUpdateCount() { return upCount; } @Override public String get(Property property) { - return props.get(property.getKey()); + String v = props.get(property.getKey()); + if (v == null & parent != null) { + v = parent.get(property); + } + return v; } @Override public void getProperties(Map<String,String> output, Predicate<String> filter) { + if (parent != null) { + parent.getProperties(output, filter); + } for (Entry<String,String> entry : props.entrySet()) { if (filter.test(entry.getKey())) { output.put(entry.getKey(), entry.getValue()); @@ -267,4 +292,89 @@ public class AccumuloConfigurationTest { Map<String,String> pmL = tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX); assertSame(pmG, pmL); } + + @Test + public void testScanExecutors() { + String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME; + + TestConfiguration tc = new TestConfiguration(DefaultConfiguration.getInstance()); + + Collection<ScanExecutorConfig> executors = tc.getScanExecutors(); + + Assert.assertEquals(2, executors.size()); + + ScanExecutorConfig sec = executors.stream().filter(c -> c.name.equals(defName)).findFirst() + .get(); + Assert.assertEquals( + Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()), + sec.maxThreads); + Assert.assertFalse(sec.priority.isPresent()); + Assert.assertTrue(sec.prioritizerClass.get().isEmpty()); + Assert.assertTrue(sec.prioritizerOpts.isEmpty()); + + // ensure deprecated props is read if nothing else is set + tc.set("tserver.readahead.concurrent.max", "6"); + Assert.assertEquals(6, sec.getCurrentMaxThreads()); + Assert.assertEquals( + Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()), + sec.maxThreads); + ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName)) + .findFirst().get(); + Assert.assertEquals(6, sec2.maxThreads); + + // ensure new prop overrides deperecated prop + tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9"); + Assert.assertEquals(9, sec.getCurrentMaxThreads()); + Assert.assertEquals( + Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()), + sec.maxThreads); + ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName)) + .findFirst().get(); + Assert.assertEquals(9, sec3.maxThreads); + + ScanExecutorConfig sec4 = executors.stream().filter(c -> c.name.equals("meta")).findFirst() + .get(); + Assert.assertEquals( + Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()), + sec4.maxThreads); + Assert.assertFalse(sec4.priority.isPresent()); + Assert.assertFalse(sec4.prioritizerClass.isPresent()); + Assert.assertTrue(sec4.prioritizerOpts.isEmpty()); + + tc.set("tserver.metadata.readahead.concurrent.max", "2"); + Assert.assertEquals(2, sec4.getCurrentMaxThreads()); + ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta")) + .findFirst().get(); + Assert.assertEquals(2, sec5.maxThreads); + + tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3"); + Assert.assertEquals(3, sec4.getCurrentMaxThreads()); + ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta")) + .findFirst().get(); + Assert.assertEquals(3, sec6.maxThreads); + + String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey(); + tc.set(prefix + "hulksmash.threads", "66"); + tc.set(prefix + "hulksmash.priority", "3"); + tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer"); + tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1"); + tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3"); + + executors = tc.getScanExecutors(); + Assert.assertEquals(3, executors.size()); + ScanExecutorConfig sec7 = executors.stream().filter(c -> c.name.equals("hulksmash")).findFirst() + .get(); + Assert.assertEquals(66, sec7.maxThreads); + Assert.assertEquals(3, sec7.priority.getAsInt()); + Assert.assertEquals("com.foo.ScanPrioritizer", sec7.prioritizerClass.get()); + Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"), sec7.prioritizerOpts); + + tc.set(prefix + "hulksmash.threads", "44"); + Assert.assertEquals(66, sec7.maxThreads); + Assert.assertEquals(44, sec7.getCurrentMaxThreads()); + + ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c -> c.name.equals("hulksmash")) + .findFirst().get(); + Assert.assertEquals(44, sec8.maxThreads); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java new file mode 100644 index 0000000..0128b20 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.spi.scan; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.accumulo.core.spi.scan.ScanInfo.Type; +import org.junit.Assert; +import org.junit.Test; + +public class IdleRatioScanPrioritizerTest { + + @Test + public void testSort() { + long now = System.currentTimeMillis(); + + List<TestScanInfo> scans = new ArrayList<>(); + + // Two following have never run, so oldest should go first + scans.add(new TestScanInfo("a", Type.SINGLE, now - 3)); + scans.add(new TestScanInfo("b", Type.SINGLE, now - 8)); + // Two following have different idle ratio and same last run times + scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10)); + scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10)); + // Two following have same idle ratio and different last run times + scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9)); + scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7)); + + Collections.shuffle(scans); + + Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer() + .createComparator(Collections.emptyMap()); + + Collections.sort(scans, comparator); + + Assert.assertEquals("b", scans.get(0).testId); + Assert.assertEquals("a", scans.get(1).testId); + Assert.assertEquals("f", scans.get(2).testId); + Assert.assertEquals("e", scans.get(3).testId); + Assert.assertEquals("d", scans.get(4).testId); + Assert.assertEquals("c", scans.get(5).testId); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java new file mode 100644 index 0000000..b59858a --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.util.Collections; +import java.util.Map; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.spi.scan.ScanInfo.Type; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class SimpleScanDispatcherTest { + @Test + public void testProps() { + Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey() + .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".threads")); + Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey() + .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer")); + } + + private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) { + ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4); + ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4); + + SimpleScanDispatcher ssd1 = new SimpleScanDispatcher(); + ssd1.init(opts); + Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null)); + Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null)); + } + + @Test + public void testBasic() { + String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME; + + runTest(Collections.emptyMap(), dname, dname); + runTest(ImmutableMap.of("executor", "E1"), "E1", "E1"); + runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname); + runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3"); + runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2", "E1"); + runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1", "E3"); + runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"), "E2", "E3"); + runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2", "multi_executor", "E3"), + "E2", "E3"); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java new file mode 100644 index 0000000..68a9650 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.spi.scan; + +import java.util.Collection; +import java.util.OptionalLong; +import java.util.Set; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.spi.common.IteratorConfiguration; +import org.apache.accumulo.core.spi.common.Stats; +import org.apache.accumulo.core.util.Stat; + +public class TestScanInfo implements ScanInfo { + + String testId; + Type scanType; + long creationTime; + OptionalLong lastRunTime = OptionalLong.empty(); + Stat runTimeStats = new Stat(); + Stat idleTimeStats = new Stat(); + + TestScanInfo(String testId, Type scanType, long creationTime, int... times) { + this.testId = testId; + this.scanType = scanType; + this.creationTime = creationTime; + + for (int i = 0; i < times.length; i += 2) { + long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]); + long runDuration = times[i + 1] - times[i]; + runTimeStats.addStat(runDuration); + idleTimeStats.addStat(idleDuration); + } + + if (times.length > 0) { + lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime); + } + } + + @Override + public Type getScanType() { + return scanType; + } + + @Override + public String getTableId() { + throw new UnsupportedOperationException(); + } + + @Override + public long getCreationTime() { + return creationTime; + } + + @Override + public OptionalLong getLastRunTime() { + return lastRunTime; + } + + @Override + public Stats getRunTimeStats() { + return runTimeStats; + } + + @Override + public Stats getIdleTimeStats() { + return idleTimeStats; + } + + @Override + public Stats getIdleTimeStats(long currentTime) { + Stat copy = idleTimeStats.copy(); + copy.addStat(currentTime - lastRunTime.orElse(creationTime)); + return copy; + } + + @Override + public Set<Column> getFetchedColumns() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<IteratorConfiguration> getClientScanIterators() { + throw new UnsupportedOperationException(); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java index 69df38d..a2986ac 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java @@ -43,32 +43,26 @@ public class StatTest { @Test public void testGetMin() { - assertEquals(0, zero.getMin()); - assertEquals(3677, stat.getMin()); + assertEquals(0, zero.min()); + assertEquals(3677, stat.min()); } @Test public void testGetMax() { - assertEquals(0, zero.getMax()); - assertEquals(9792, stat.getMax()); + assertEquals(0, zero.max()); + assertEquals(9792, stat.max()); } @Test public void testGetAverage() { - assertEquals(0, zero.getAverage(), delta); - assertEquals(5529, stat.getAverage(), delta); - } - - @Test - public void testGetStdDev() { - assertEquals(0, zero.getStdDev(), delta); - assertEquals(2073.7656569632, stat.getStdDev(), delta); + assertEquals(0, zero.mean(), delta); + assertEquals(5529, stat.mean(), delta); } @Test public void testGetSum() { - assertEquals(0, zero.getSum()); - assertEquals(38703, stat.getSum()); + assertEquals(0, zero.sum()); + assertEquals(38703, stat.sum()); } @Test @@ -76,16 +70,14 @@ public class StatTest { zero.clear(); stat.clear(); - assertEquals(0, zero.getMax()); - assertEquals(zero.getMax(), stat.getMax()); - assertEquals(0, zero.getMin()); - assertEquals(zero.getMin(), stat.getMin()); - assertEquals(0, zero.getSum()); - assertEquals(zero.getSum(), stat.getSum()); + assertEquals(0, zero.max()); + assertEquals(zero.max(), stat.max()); + assertEquals(0, zero.min()); + assertEquals(zero.min(), stat.min()); + assertEquals(0, zero.sum()); + assertEquals(zero.sum(), stat.sum()); - assertEquals(Double.NaN, zero.getAverage(), 0); - assertEquals(zero.getAverage(), stat.getAverage(), 0); - assertEquals(Double.NaN, zero.getStdDev(), 0); - assertEquals(zero.getStdDev(), stat.getStdDev(), 0); + assertEquals(Double.NaN, zero.mean(), 0); + assertEquals(zero.mean(), stat.mean(), 0); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java index dbb7ca4..cc67feb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.conf; import static java.util.Objects.requireNonNull; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.spi.scan.ScanDispatcher; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; @@ -216,4 +218,39 @@ public class TableConfiguration extends ObservableConfiguration { return pic; } + + public static class TablesScanDispatcher { + public final ScanDispatcher dispatcher; + public final long count; + + public TablesScanDispatcher(ScanDispatcher dispatcher, long count) { + this.dispatcher = dispatcher; + this.count = count; + } + } + + private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new AtomicReference<>(); + + public ScanDispatcher getScanDispatcher() { + long count = getUpdateCount(); + TablesScanDispatcher currRef = scanDispatcherRef.get(); + if (currRef == null || currRef.count != count) { + ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this, + Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null); + + Map<String,String> opts = new HashMap<>(); + getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> { + String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length()); + opts.put(optKey, v); + }); + + newDispatcher.init(Collections.unmodifiableMap(opts)); + + TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count); + scanDispatcherRef.compareAndSet(currRef, newRef); + currRef = newRef; + } + + return currRef.dispatcher; + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java index 1973fd8..ab41141 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java @@ -81,11 +81,12 @@ public class ZooConfiguration extends AccumuloConfiguration { @Override public String get(Property property) { if (Property.isFixedZooPropertyKey(property)) { - if (fixedProps.containsKey(property.getKey())) { - return fixedProps.get(property.getKey()); + String val = fixedProps.get(property.getKey()); + if (val != null) { + return val; } else { synchronized (fixedProps) { - String val = _get(property); + val = _get(property); fixedProps.put(property.getKey(), val); return val; } @@ -96,6 +97,12 @@ public class ZooConfiguration extends AccumuloConfiguration { } } + @Override + public boolean isPropertySet(Property prop) { + return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) != null + || parent.isPropertySet(prop); + } + private String getRaw(String key) { String zPath = propPathPrefix + "/" + key; byte[] v = propCache.get(zPath); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 6e67cf0..14c0306 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -130,6 +130,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.spi.scan.ScanDispatcher; import org.apache.accumulo.core.summary.Gatherer; import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver; import org.apache.accumulo.core.summary.SummaryCollection; @@ -250,9 +251,9 @@ import org.apache.accumulo.tserver.scan.NextBatchTask; import org.apache.accumulo.tserver.scan.ScanRunState; import org.apache.accumulo.tserver.session.ConditionalSession; import org.apache.accumulo.tserver.session.MultiScanSession; -import org.apache.accumulo.tserver.session.ScanSession; import org.apache.accumulo.tserver.session.Session; import org.apache.accumulo.tserver.session.SessionManager; +import org.apache.accumulo.tserver.session.SingleScanSession; import org.apache.accumulo.tserver.session.SummarySession; import org.apache.accumulo.tserver.session.UpdateSession; import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner; @@ -543,6 +544,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } + private ScanDispatcher getScanDispatcher(KeyExtent extent) { + if (extent.isRootTablet() || extent.isMeta()) { + // dispatcher is only for user tables + return null; + } + + return getServerConfigurationFactory().getTableConfiguration(extent.getTableId()) + .getScanDispatcher(); + } + @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList, @@ -587,13 +598,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { if (tablet == null) throw new NotServingTabletException(textent); - Set<Column> columnSet = new HashSet<>(); + HashSet<Column> columnSet = new HashSet<>(); for (TColumn tcolumn : columns) { columnSet.add(new Column(tcolumn)); } - final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, - new Authorizations(authorizations), readaheadThreshold, batchTimeOut, context); + final SingleScanSession scanSession = new SingleScanSession(credentials, extent, columnSet, + ssiList, ssio, new Authorizations(authorizations), readaheadThreshold, batchTimeOut, + context); scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated, scanSession.interruptFlag, SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut, @@ -619,7 +631,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { throws NoSuchScanIDException, NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { - ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID); + SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID); if (scanSession == null) { throw new NoSuchScanIDException(); } @@ -631,7 +643,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } - private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) + private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { @@ -639,7 +651,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { if (scanSession.nextBatchTask == null) { scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag); - resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent), + scanSession, scanSession.nextBatchTask); } ScanBatch bresult; @@ -694,7 +707,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { // to client scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag); - resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent), + scanSession, scanSession.nextBatchTask); } if (!scanResult.more) @@ -705,14 +719,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public void closeScan(TInfo tinfo, long scanID) { - final ScanSession ss = (ScanSession) sessionManager.removeSession(scanID); + final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID); if (ss != null) { long t2 = System.currentTimeMillis(); if (log.isTraceEnabled()) { log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned, - (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString())); + (t2 - ss.startTime) / 1000.0, ss.runStats.toString())); } if (scanMetrics.isEnabled()) { @@ -818,7 +832,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { if (session.lookupTask == null) { session.lookupTask = new LookupTask(TabletServer.this, scanID); - resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask); + resourceManager.executeReadAhead(session.threadPoolExtent, + getScanDispatcher(session.threadPoolExtent), session, session.lookupTask); } try { @@ -1174,8 +1189,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable { String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates, (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), - us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0, - us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0)); + us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0, + us.commitTimes.sum() / 1000.0)); } if (us.failures.size() > 0) { Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 9a91e87..5ab6e27 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -21,10 +21,15 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Queue; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; @@ -36,12 +41,13 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntSupplier; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.NamespaceNotFoundException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; @@ -50,6 +56,11 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.spi.scan.ScanDispatcher; +import org.apache.accumulo.core.spi.scan.ScanExecutor; +import org.apache.accumulo.core.spi.scan.ScanInfo; +import org.apache.accumulo.core.spi.scan.ScanPrioritizer; +import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.fate.util.LoggingRunnable; @@ -67,15 +78,18 @@ import org.apache.accumulo.tserver.compaction.CompactionStrategy; import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy; import org.apache.accumulo.tserver.compaction.MajorCompactionReason; import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; -import org.apache.accumulo.tserver.session.SessionComparator; +import org.apache.accumulo.tserver.session.ScanSession; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.htrace.wrappers.TraceExecutorService; +import org.apache.htrace.wrappers.TraceRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; /** * ResourceManager is responsible for managing the resources of all tablets within a tablet server. @@ -94,14 +108,13 @@ public class TabletServerResourceManager { private final ExecutorService migrationPool; private final ExecutorService assignmentPool; private final ExecutorService assignMetaDataPool; - private final ExecutorService readAheadThreadPool; - private final ExecutorService defaultReadAheadThreadPool; private final ExecutorService summaryRetrievalPool; private final ExecutorService summaryParitionPool; private final ExecutorService summaryRemotePool; private final Map<String,ExecutorService> threadPools = new TreeMap<>(); - private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>(); + private final Map<String,ExecutorService> scanExecutors; + private final Map<String,ScanExecutor> scanExecutorChoices; private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments; @@ -128,46 +141,15 @@ public class TabletServerResourceManager { return tp; } - private ExecutorService addEs(final Property maxThreads, String name, - final ThreadPoolExecutor tp) { + private ExecutorService addEs(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) { ExecutorService result = addEs(name, tp); SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() { @Override public void run() { try { - int max = tserver.getConfiguration().getCount(maxThreads); + int max = maxThreads.getAsInt(); if (tp.getMaximumPoolSize() != max) { - log.info("Changing {} to {}", maxThreads.getKey(), max); - tp.setCorePoolSize(max); - tp.setMaximumPoolSize(max); - } - } catch (Throwable t) { - log.error("Failed to change thread pool size", t); - } - } - - }, 1000, 10 * 1000); - return result; - } - - private ExecutorService addEs(final int maxThreads, final Property prefix, - final String propertyName, String name, final ThreadPoolExecutor tp) { - ExecutorService result = addEs(name, tp); - SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() { - @Override - public void run() { - try { - int max = maxThreads; - for (Entry<String,String> entry : conf.getSystemConfiguration() - .getAllPropertiesWithPrefix(prefix).entrySet()) { - if (entry.getKey().equals(propertyName)) { - if (null != entry.getValue() && entry.getValue().length() != 0) - max = Integer.parseInt(entry.getValue()); - break; - } - } - if (tp.getMaximumPoolSize() != max) { - log.info("Changing {} to {}", maxThreads, max); + log.info("Changing max threads for {} to {}", name, max); tp.setCorePoolSize(max); tp.setMaximumPoolSize(max); } @@ -187,7 +169,7 @@ public class TabletServerResourceManager { ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue, new NamingThreadFactory(name)); tp.allowCoreThreadTimeOut(true); - return addEs(max, name, tp); + return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp); } private ExecutorService createEs(int max, String name) { @@ -198,83 +180,52 @@ public class TabletServerResourceManager { return createEs(max, name, new LinkedBlockingQueue<>()); } - private ExecutorService createEs(int maxThreads, Property prefix, String propertyName, - String name, BlockingQueue<Runnable> queue) { - ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, - TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); - return addEs(maxThreads, prefix, propertyName, name, tp); - } + private ExecutorService createPriorityExecutor(ScanExecutorConfig sec, + Map<String,Queue<?>> scanExecQueues) { - /** - * If we cannot instantiate the comparator we will default to the linked blocking queue comparator - * - * @param max - * max number of threads - * @param comparator - * comparator property - * @param name - * name passed to the thread factory - * @return priority executor - */ - private ExecutorService createPriorityExecutor(Property prefix, String propertyName, - final int maxThreads, Property comparator, String name) { - - String comparatorClazz = conf.getSystemConfiguration().get(comparator); + BlockingQueue<Runnable> queue; - if (null == comparatorClazz || comparatorClazz.length() == 0) { - log.debug("Using no comparator"); - return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>()); + if (sec.prioritizerClass.orElse("").isEmpty()) { + queue = new LinkedBlockingQueue<>(); } else { - SessionComparator comparatorObj = Property.createInstanceFromPropertyName( - conf.getSystemConfiguration(), comparator, SessionComparator.class, null); - if (null != comparatorObj) { - log.debug("Using priority based scheduler {}", comparatorClazz); - return createEs(maxThreads, prefix, propertyName, name, - new PriorityBlockingQueue<>(maxThreads, comparatorObj)); - } else { - log.debug("Using no comparator"); - return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>()); + ScanPrioritizer factory = null; + try { + factory = ConfigurationTypeHelper.getClassInstance(null, sec.prioritizerClass.get(), + ScanPrioritizer.class); + } catch (Exception e) { + throw new RuntimeException(e); } - } - } - /** - * If we cannot instantiate the comparator we will default to the linked blocking queue comparator - * - * @param max - * max number of threads - * @param comparator - * comparator property - * @param name - * name passed to the thread factory - * @return priority executor - */ - private ExecutorService createPriorityExecutor(Property max, Property comparator, String name) { - int maxThreads = conf.getSystemConfiguration().getCount(max); + if (factory == null) { + queue = new LinkedBlockingQueue<>(); + } else { + Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts); - String comparatorClazz = conf.getSystemConfiguration().get(comparator); + // function to extract scan scan session from runnable + Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r) + .getRunnable()).getScanInfo(); - if (null == comparatorClazz || comparatorClazz.length() == 0) { - log.debug("Using no comparator"); - return createEs(max, name, new LinkedBlockingQueue<>()); - } else { - SessionComparator comparatorObj = Property.createInstanceFromPropertyName( - conf.getSystemConfiguration(), comparator, SessionComparator.class, null); - if (null != comparatorObj) { - log.debug("Using priority based scheduler {}", comparatorClazz); - return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj)); - } else { - log.debug("Using no comparator"); - return createEs(max, name, new LinkedBlockingQueue<>()); + queue = new PriorityBlockingQueue<>(sec.maxThreads, + Comparator.comparing(extractor, comparator)); } } + + scanExecQueues.put(sec.name, queue); + + return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name, queue, sec.priority); } - private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) { - int maxThreads = conf.getSystemConfiguration().getCount(max); + private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name, + BlockingQueue<Runnable> queue, OptionalInt priority) { + int maxThreads = maxThreadsSupplier.getAsInt(); ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, - TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); - return addEs(max, name, tp); + TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority)); + return addEs(maxThreadsSupplier, name, tp); + } + + private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) { + IntSupplier maxThreadsSupplier = () -> conf.getSystemConfiguration().getCount(max); + return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty()); } private ExecutorService createEs(int min, int max, int timeout, String name) { @@ -282,33 +233,78 @@ public class TabletServerResourceManager { new LinkedBlockingQueue<>(), new NamingThreadFactory(name))); } - /** - * Creates table specific thread pool for executing scan threads - * - * @param instance - * ZK instance. - * @param acuConf - * accumulo configuration. - * @throws NamespaceNotFoundException - * Error thrown by tables.getTableId when a name space is not found. - * @throws TableNotFoundException - * Error thrown by tables.getTableId when a table is not found. - */ - protected void createTablePools(Instance instance, AccumuloConfiguration acuConf) - throws NamespaceNotFoundException, TableNotFoundException { - for (Entry<String,String> entry : acuConf - .getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) { - final String tableName = entry.getKey() - .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length()); - if (null == entry.getValue() || entry.getValue().length() == 0) { - throw new RuntimeException("Read ahead prefix is inproperly configured"); + protected Map<String,ExecutorService> createScanExecutors(Instance instance, + Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) { + Builder<String,ExecutorService> builder = ImmutableMap.builder(); + + for (ScanExecutorConfig sec : scanExecCfg) { + builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues)); + } + + return builder.build(); + } + + private static class ScanExecutorImpl implements ScanExecutor { + + private static class ConfigImpl implements ScanExecutor.Config { + + final ScanExecutorConfig cfg; + + public ConfigImpl(ScanExecutorConfig sec) { + this.cfg = sec; + } + + @Override + public String getName() { + return cfg.name; } - final int maxThreads = Integer.parseInt(entry.getValue()); - final String tableId = Tables.getTableId(instance, tableName).canonicalID(); - tableThreadPools.put(tableId, - createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, entry.getKey(), maxThreads, - Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific read ahead")); + + @Override + public int getMaxThreads() { + return cfg.maxThreads; + } + + @Override + public Optional<String> getPrioritizerClass() { + return cfg.prioritizerClass; + } + + @Override + public Map<String,String> getPrioritizerOptions() { + return cfg.prioritizerOpts; + } + } + + private final ConfigImpl config; + private final Queue<?> queue; + + ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) { + this.config = new ConfigImpl(sec); + this.queue = q; + } + + @Override + public int getQueued() { + return queue.size(); + } + + @Override + public Config getConfig() { + return config; + } + + } + + private Map<String,ScanExecutor> createScanExecutorChoices( + Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) { + Builder<String,ScanExecutor> builder = ImmutableMap.builder(); + + for (ScanExecutorConfig sec : scanExecCfg) { + builder.put(sec.name, new ScanExecutorImpl(sec, scanExecQueues.get(sec.name))); + } + + return builder.build(); } public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) { @@ -390,11 +386,6 @@ public class TabletServerResourceManager { activeAssignments = new ConcurrentHashMap<>(); - readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT, - Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead"); - defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, - "metadata tablets read ahead"); - summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS, "summary file retriever", 60, TimeUnit.SECONDS); summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60, @@ -402,13 +393,11 @@ public class TabletServerResourceManager { summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS, "summary partition", 60, TimeUnit.SECONDS); - try { - createTablePools(tserver.getInstance(), acuConf); - } catch (NamespaceNotFoundException e) { - throw new RuntimeException(e); - } catch (TableNotFoundException e) { - throw new RuntimeException(e); - } + Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors(); + Map<String,Queue<?>> scanExecQueues = new HashMap<>(); + scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg, scanExecQueues); + scanExecutorChoices = createScanExecutorChoices(scanExecCfg, scanExecQueues); + int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES); Cache<String,Long> fileLenCache = CacheBuilder.newBuilder() @@ -938,16 +927,29 @@ public class TabletServerResourceManager { } } - public void executeReadAhead(KeyExtent tablet, Runnable task) { - ExecutorService service = tableThreadPools.get(tablet.getTableId().canonicalID()); - if (null != service) { - service.execute(task); - } else if (tablet.isRootTablet()) { + public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo, + Runnable task) { + + task = ScanSession.wrap(scanInfo, task); + + if (tablet.isRootTablet()) { task.run(); } else if (tablet.isMeta()) { - defaultReadAheadThreadPool.execute(task); + scanExecutors.get("meta").execute(task); } else { - readAheadThreadPool.execute(task); + String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices); + ExecutorService executor = scanExecutors.get(scanExecutorName); + if (executor == null) { + log.warn( + "For table id {}, {} dispatched to non-existant executor {} Using default executor.", + tablet.getTableId(), dispatcher.getClass().getName(), scanExecutorName); + executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME); + } else if ("meta".equals(scanExecutorName)) { + log.warn("For table id {}, {} dispatched to meta executor. Using default executor.", + tablet.getTableId(), dispatcher.getClass().getName()); + executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME); + } + executor.execute(task); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index aee4477..16b9c55 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -23,7 +23,7 @@ import org.apache.accumulo.core.iterators.IterationInterruptedException; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.TooManyFilesException; -import org.apache.accumulo.tserver.session.ScanSession; +import org.apache.accumulo.tserver.session.SingleScanSession; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletClosedException; @@ -48,7 +48,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> { @Override public void run() { - final ScanSession scanSession = (ScanSession) server.getSession(scanID); + final SingleScanSession scanSession = (SingleScanSession) server.getSession(scanID); String oldThreadName = Thread.currentThread().getName(); try { @@ -69,10 +69,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> { return; } - long t1 = System.currentTimeMillis(); ScanBatch batch = scanSession.scanner.read(); - long t2 = System.currentTimeMillis(); - scanSession.nbTimes.addStat(t2 - t1); // there should only be one thing on the queue at a time, so // it should be ok to call add() diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java deleted file mode 100644 index d584242..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver.session; - -public class DefaultSessionComparator extends SessionComparator { - - @Override - public int compareSession(Session sessionA, Session sessionB) { - - final long startTimeFirst = sessionA.startTime; - final long startTimeSecond = sessionB.startTime; - - // use the lowest max idle time - final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime - ? sessionA.maxIdleAccessTime - : sessionB.maxIdleAccessTime; - - final long currentTime = System.currentTimeMillis(); - - /* - * Multiply by -1 so that we have a sensical comparison. This means that if comparison < 0, - * sessionA is newer. If comparison > 0, this means that session B is newer - */ - int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond); - - if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) { - if (comparison >= 0) { - long idleTimeA = currentTime - sessionA.lastExecTime; - - /* - * If session B is newer, let's make sure that we haven't reached the max idle time, where - * we have to begin aging A - */ - if (idleTimeA > sessionA.maxIdleAccessTime) { - comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue(); - } - } else { - long idleTimeB = currentTime - sessionB.lastExecTime; - - /* - * If session A is newer, let's make sure that B hasn't reached the max idle time, where we - * have to begin aging A - */ - if (idleTimeB > sessionA.maxIdleAccessTime) { - comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue(); - } - } - } - - return comparison; - } - -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java index 981832a..f8db1f4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.thrift.IterInfo; @@ -30,13 +29,9 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.tserver.scan.ScanTask; -public class MultiScanSession extends Session { +public class MultiScanSession extends ScanSession { public final KeyExtent threadPoolExtent; - public final HashSet<Column> columnSet = new HashSet<>(); public final Map<KeyExtent,List<Range>> queries; - public final List<IterInfo> ssiList; - public final Map<String,Map<String,String>> ssio; - public final Authorizations auths; public final SamplerConfiguration samplerConfig; public final long batchTimeOut; public final String context; @@ -53,11 +48,8 @@ public class MultiScanSession extends Session { Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations, SamplerConfiguration samplerConfig, long batchTimeOut, String context) { - super(credentials); + super(credentials, new HashSet<>(), ssiList, ssio, authorizations); this.queries = queries; - this.ssiList = ssiList; - this.ssio = ssio; - this.auths = authorizations; this.threadPoolExtent = threadPoolExtent; this.samplerConfig = samplerConfig; this.batchTimeOut = batchTimeOut; @@ -65,20 +57,20 @@ public class MultiScanSession extends Session { } @Override + public Type getScanType() { + return Type.MULTI; + } + + @Override + public String getTableId() { + return threadPoolExtent.getTableId().canonicalID(); + } + + @Override public boolean cleanup() { if (lookupTask != null) lookupTask.cancel(true); // the cancellation should provide us the safety to return true here return true; } - - /** - * Ensure that the runnable actually runs - */ - @Override - public void run() { - super.run(); - lookupTask.run(); - } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java index a55de1e..5cde06c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java @@ -16,64 +16,148 @@ */ package org.apache.accumulo.tserver.session; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.data.Column; -import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.spi.common.IteratorConfiguration; +import org.apache.accumulo.core.spi.common.Stats; +import org.apache.accumulo.core.spi.scan.ScanInfo; import org.apache.accumulo.core.util.Stat; -import org.apache.accumulo.tserver.scan.ScanTask; -import org.apache.accumulo.tserver.tablet.ScanBatch; -import org.apache.accumulo.tserver.tablet.Scanner; - -public class ScanSession extends Session { - public final Stat nbTimes = new Stat(); - public final KeyExtent extent; - public final Set<Column> columnSet; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public abstract class ScanSession extends Session implements ScanInfo { + + public static class ScanMeasurer implements Runnable { + + private ScanSession session; + private Runnable task; + + ScanMeasurer(ScanSession session, Runnable task) { + this.session = session; + this.task = task; + } + + @Override + public void run() { + long t1 = System.currentTimeMillis(); + task.run(); + long t2 = System.currentTimeMillis(); + session.finishedRun(t1, t2); + } + + public ScanInfo getScanInfo() { + return session; + } + } + + public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) { + return new ScanMeasurer(scanInfo, r); + } + + private OptionalLong lastRunTime = OptionalLong.empty(); + private Stat idleStats = new Stat(); + public Stat runStats = new Stat(); + + public final HashSet<Column> columnSet; public final List<IterInfo> ssiList; public final Map<String,Map<String,String>> ssio; public final Authorizations auths; - public final AtomicBoolean interruptFlag = new AtomicBoolean(); - public long entriesReturned = 0; - public long batchCount = 0; - public volatile ScanTask<ScanBatch> nextBatchTask; - public Scanner scanner; - public final long readaheadThreshold; - public final long batchTimeOut; - public final String context; - - public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet, - List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations, - long readaheadThreshold, long batchTimeOut, String context) { + + ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo> ssiList, + Map<String,Map<String,String>> ssio, Authorizations auths) { super(credentials); - this.extent = extent; - this.columnSet = columnSet; + this.columnSet = cols; this.ssiList = ssiList; this.ssio = ssio; - this.auths = authorizations; - this.readaheadThreshold = readaheadThreshold; - this.batchTimeOut = batchTimeOut; - this.context = context; + this.auths = auths; + } + + @Override + public long getCreationTime() { + return startTime; + } + + @Override + public OptionalLong getLastRunTime() { + return lastRunTime; } @Override - public boolean cleanup() { - final boolean ret; - try { - if (nextBatchTask != null) - nextBatchTask.cancel(true); - } finally { - if (scanner != null) - ret = scanner.close(); - else - ret = true; + public Stats getRunTimeStats() { + return runStats; + } + + @Override + public Stats getIdleTimeStats() { + return idleStats; + } + + @Override + public Stats getIdleTimeStats(long currentTime) { + long idleTime = currentTime - getLastRunTime().orElse(getCreationTime()); + Preconditions.checkArgument(idleTime >= 0); + Stat copy = idleStats.copy(); + copy.addStat(idleTime); + return copy; + } + + @Override + public Set<Column> getFetchedColumns() { + return Collections.unmodifiableSet(columnSet); + } + + private class IterConfImpl implements IteratorConfiguration { + + private IterInfo ii; + + IterConfImpl(IterInfo ii) { + this.ii = ii; + } + + @Override + public String getIteratorClass() { + return ii.className; + } + + @Override + public String getName() { + return ii.iterName; + } + + @Override + public int getPriority() { + return ii.priority; + } + + @Override + public Map<String,String> getOptions() { + Map<String,String> opts = ssio.get(ii.iterName); + return opts == null || opts.isEmpty() ? Collections.emptyMap() + : Collections.unmodifiableMap(opts); } - return ret; } + @Override + public Collection<IteratorConfiguration> getClientScanIterators() { + return Lists.transform(ssiList, IterConfImpl::new); + } + + public void finishedRun(long start, long finish) { + long idleTime = start - getLastRunTime().orElse(getCreationTime()); + long runTime = finish - start; + lastRunTime = OptionalLong.of(finish); + idleStats.addStat(idleTime); + runStats.addStat(runTime); + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 32f7e34..6bd52e4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.session; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.server.rpc.TServerUtils; -public abstract class Session implements Runnable { +public class Session { enum State { NEW, UNRESERVED, RESERVED, REMOVED @@ -27,9 +27,7 @@ public abstract class Session implements Runnable { public final String client; public long lastAccessTime; - protected volatile long lastExecTime = -1; public long startTime; - public long maxIdleAccessTime; State state = State.NEW; private final TCredentials credentials; @@ -49,18 +47,4 @@ public abstract class Session implements Runnable { public boolean cleanup() { return true; } - - @Override - public void run() { - lastExecTime = System.currentTimeMillis(); - } - - public void setLastExecutionTime(long lastExecTime) { - this.lastExecTime = lastExecTime; - } - - public long getLastExecutionTime() { - return lastExecTime; - } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index c485698..1ba2491 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -231,9 +231,6 @@ public class SessionManager { configuredIdle = maxUpdateIdle; } long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > session.maxIdleAccessTime) { - session.maxIdleAccessTime = idleTime; - } if (idleTime > configuredIdle) { log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(), session.client, idleTime); @@ -318,8 +315,8 @@ public class SessionManager { ScanTask nbt = null; Table.ID tableID = null; - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; + if (session instanceof SingleScanSession) { + SingleScanSession ss = (SingleScanSession) session; nbt = ss.nextBatchTask; tableID = ss.extent.getTableId(); } else if (session instanceof MultiScanSession) { @@ -365,8 +362,8 @@ public class SessionManager { for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { Session session = entry.getValue(); - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; + if (session instanceof SingleScanSession) { + SingleScanSession ss = (SingleScanSession) session; ScanState state = ScanState.RUNNING; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java similarity index 80% copy from server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java copy to server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java index a55de1e..fb3e29f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java @@ -16,9 +16,9 @@ */ package org.apache.accumulo.tserver.session; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.data.Column; @@ -26,18 +26,12 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.thrift.TCredentials; -import org.apache.accumulo.core.util.Stat; import org.apache.accumulo.tserver.scan.ScanTask; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Scanner; -public class ScanSession extends Session { - public final Stat nbTimes = new Stat(); +public class SingleScanSession extends ScanSession { public final KeyExtent extent; - public final Set<Column> columnSet; - public final List<IterInfo> ssiList; - public final Map<String,Map<String,String>> ssio; - public final Authorizations auths; public final AtomicBoolean interruptFlag = new AtomicBoolean(); public long entriesReturned = 0; public long batchCount = 0; @@ -47,21 +41,27 @@ public class ScanSession extends Session { public final long batchTimeOut; public final String context; - public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet, + public SingleScanSession(TCredentials credentials, KeyExtent extent, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations, long readaheadThreshold, long batchTimeOut, String context) { - super(credentials); + super(credentials, columnSet, ssiList, ssio, authorizations); this.extent = extent; - this.columnSet = columnSet; - this.ssiList = ssiList; - this.ssio = ssio; - this.auths = authorizations; this.readaheadThreshold = readaheadThreshold; this.batchTimeOut = batchTimeOut; this.context = context; } @Override + public Type getScanType() { + return Type.SINGLE; + } + + @Override + public String getTableId() { + return extent.getTableId().canonicalID(); + } + + @Override public boolean cleanup() { final boolean ret; try { @@ -75,5 +75,4 @@ public class ScanSession extends Session { } return ret; } - } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 18da8a4..dc5d684 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1540,7 +1540,8 @@ public class Tablet implements TabletCommitter { try { getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason)); } catch (RuntimeException t) { - log.debug("removing {} because we encountered an exception enqueing the CompactionRunner", reason, t); + log.debug("removing {} because we encountered an exception enqueing the CompactionRunner", + reason, t); majorCompactionQueued.remove(reason); throw t; } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java deleted file mode 100644 index f7cc5cd..0000000 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver.session; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -public class SessionComparatorTest { - - @Test - public void testSingleScanMultiScanNoRun() { - long time = System.currentTimeMillis(); - ScanSession sessionA = emptyScanSession(); - sessionA.lastAccessTime = 0; - sessionA.maxIdleAccessTime = 0; - sessionA.startTime = time - 1000; - - MultiScanSession sessionB = emptyMultiScanSession(); - sessionB.lastAccessTime = 0; - sessionB.maxIdleAccessTime = 1000; - sessionB.startTime = time - 800; - - ScanSession sessionC = emptyScanSession(); - sessionC.lastAccessTime = 0; - sessionC.maxIdleAccessTime = 1000; - sessionC.startTime = time - 800; - - // a has never run, so it should be given priority - SingleRangePriorityComparator comparator = new SingleRangePriorityComparator(); - assertEquals(-1, comparator.compareSession(sessionA, sessionB)); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - assertEquals(1, comparator.compareSession(sessionB, sessionA)); - - // now let's assume they have been executed - - assertEquals(1, comparator.compareSession(sessionA, sessionC)); - - assertEquals(0, comparator.compareSession(sessionC, sessionC)); - - } - - @Test - public void testSingleScanRun() { - long time = System.currentTimeMillis(); - ScanSession sessionA = emptyScanSession(); - sessionA.lastAccessTime = 0; - sessionA.setLastExecutionTime(time); - sessionA.maxIdleAccessTime = 1000; - sessionA.startTime = time - 1000; - - ScanSession sessionB = emptyScanSession(); - sessionB.lastAccessTime = 0; - sessionB.setLastExecutionTime(time - 2000); - sessionB.maxIdleAccessTime = 1000; - sessionB.startTime = time - 800; - - // b is newer - SingleRangePriorityComparator comparator = new SingleRangePriorityComparator(); - assertEquals(1, comparator.compareSession(sessionA, sessionB)); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - assertTrue(comparator.compareSession(sessionB, sessionA) < 0); - - sessionB.setLastExecutionTime(time); - sessionA.setLastExecutionTime(time - 2000); - - assertTrue(comparator.compareSession(sessionA, sessionB) < 0); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - int comp = comparator.compareSession(sessionB, sessionA); - assertTrue("comparison is " + comp, comp >= 1); - } - - @Test - public void testSingleScanMultiScanRun() { - long time = System.currentTimeMillis(); - ScanSession sessionA = emptyScanSession(); - sessionA.lastAccessTime = 0; - sessionA.setLastExecutionTime(time); - sessionA.maxIdleAccessTime = 1000; - sessionA.startTime = time - 1000; - - MultiScanSession sessionB = emptyMultiScanSession(); - sessionB.lastAccessTime = 0; - sessionB.setLastExecutionTime(time - 2000); - sessionB.maxIdleAccessTime = 1000; - sessionB.startTime = time - 800; - - // b is newer - SingleRangePriorityComparator comparator = new SingleRangePriorityComparator(); - assertEquals(-1, comparator.compareSession(sessionA, sessionB)); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - assertTrue(comparator.compareSession(sessionB, sessionA) > 0); - - sessionB.setLastExecutionTime(time); - sessionA.setLastExecutionTime(time - 2000); - - assertTrue(comparator.compareSession(sessionA, sessionB) < 0); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - int comp = comparator.compareSession(sessionB, sessionA); - assertTrue("comparison is " + comp, comp > 0); - } - - @Test - public void testMultiScanRun() { - long time = System.currentTimeMillis(); - ScanSession sessionA = emptyScanSession(); - sessionA.lastAccessTime = 0; - sessionA.setLastExecutionTime(time); - sessionA.maxIdleAccessTime = 1000; - sessionA.startTime = time - 1000; - - ScanSession sessionB = emptyScanSession(); - sessionB.lastAccessTime = 0; - sessionB.setLastExecutionTime(time - 2000); - sessionB.maxIdleAccessTime = 1000; - sessionB.startTime = time - 800; - - // b is newer - SingleRangePriorityComparator comparator = new SingleRangePriorityComparator(); - assertEquals(1, comparator.compareSession(sessionA, sessionB)); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - assertTrue(comparator.compareSession(sessionB, sessionA) < 0); - - sessionB.setLastExecutionTime(time); - sessionA.setLastExecutionTime(time - 2000); - - - assertTrue(comparator.compareSession(sessionA, sessionB) < 0); - - // b is before a in queue, b has never run, but because a is single - // we should be given priority - int comp = comparator.compareSession(sessionB, sessionA); - assertTrue("comparison is " + comp, comp >= 1); - } - - private static ScanSession emptyScanSession() { - return new ScanSession(null, null, null, null, null, null, 0, 0, null); - } - - private static MultiScanSession emptyMultiScanSession() { - return new MultiScanSession(null, null, null, null, null, null, null, 0, null); - } -} diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java index e807dfb..320b9c2 100644 --- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java +++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java @@ -68,7 +68,7 @@ public class IMMLGBenchmark { } for (Entry<String,Stat> entry : stats.entrySet()) { - System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage()); + System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().mean()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index cbeb1e4..907f2c6 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -606,9 +606,8 @@ public class CollectTabletStats { } private static void printStat(String desc, Stat s) { - System.out.printf( - "\t\tDescription: [%30s] average: %,6.2f std dev: %,6.2f min: %,d max: %,d %n", desc, - s.getAverage(), s.getStdDev(), s.getMin(), s.getMax()); + System.out.printf("\t\tDescription: [%30s] average: %,6.2f min: %,d max: %,d %n", desc, + s.mean(), s.min(), s.max()); }
