This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 4a8df9a Support dispatching and prioritizing by user scan types (#972) 4a8df9a is described below commit 4a8df9af165a211626e1fc8ad836251f11390153 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Feb 21 12:02:27 2019 -0500 Support dispatching and prioritizing by user scan types (#972) While working on apache/fluo#1055 I found it cumbersome to configure Fluo and Accumulo to dispatch Fluo notification scans to a dedicated executor. With these changes its much simpler to do that. Fluo can set an execution hint like `scan_type=fluo-ntfy` and Accumulo can be configured to execute the scan based on the type without changing Fluo config or source code. Before these changes, Fluo would have required either a custom dispatcher or custom Fluo config for Accumulo executors. With these changes nothing needs to be done in Fluo. --- .../apache/accumulo/core/client/ScannerBase.java | 5 ++ .../core/spi/scan/HintScanPrioritizer.java | 45 ++++++++-- .../core/spi/scan/SimpleScanDispatcher.java | 96 +++++++--------------- .../core/spi/scan/HintScanPrioritizerTest.java | 82 ++++++++++++++++++ .../core/spi/scan/SimpleScanDispatcherTest.java | 29 ++----- .../accumulo/core/spi/scan/TestScanInfo.java | 7 ++ 6 files changed, 168 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 328c066..59acc0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -341,6 +341,11 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>>, AutoCloseable { * scan, only how quickly it is returned. * * <p> + * Using the hint {@code scan_type=<type>} and documenting all of the types for your application + * is one strategy to consider. This allows administrators to adjust executor and prioritizer + * config for your application scan types without having to change the application source code. + * + * <p> * The default configuration for Accumulo will ignore hints. See {@link HintScanPrioritizer} and * {@link SimpleScanDispatcher} for examples of classes that can react to hints. * diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java index 22063ef..f713861 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java @@ -24,14 +24,18 @@ import org.apache.accumulo.core.client.ScannerBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; + /** * When configured for a scan executor, this prioritizer allows scanners to set priorities as - * integers. + * integers. Lower integers result in higher priority. * * <p> - * Scanners should put the key/value {@code priority=<integer>} in the map passed to - * {@link ScannerBase#setExecutionHints(Map)} to set the priority. Lower integers result in higher - * priority. + * Scanners can put the key/values {@code priority=<integer>} and/or {@code scan_type=<type>} in the + * map passed to {@link ScannerBase#setExecutionHints(Map)} to set the priority. When a + * {@code priority} hint is set it takes precedence and the value is used as the priority. When a + * {@code scan_type} hint is set the priority is looked up using the value. * * <p> * This prioritizer accepts the option {@code default_priority=<integer>} which determines what @@ -45,6 +49,10 @@ import org.slf4j.LoggerFactory; * option silently ignores invalid hints. * * <p> + * This prioritizer accepts the option {@code priority.<type>=<integer>} which maps a scan type hint + * to a priority. + * + * <p> * When two scans have the same priority, the scan is prioritized based on last run time and then * creation time. * @@ -61,11 +69,14 @@ public class HintScanPrioritizer implements ScanPrioritizer { private static final Logger log = LoggerFactory.getLogger(HintScanPrioritizer.class); + private final String PRIO_PREFIX = "priority."; + private enum HintProblemAction { NONE, LOG, FAIL } - private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa) { + private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa, + Map<String,Integer> typePriorities) { String prio = si.getExecutionHints().get("priority"); if (prio != null) { try { @@ -86,6 +97,16 @@ public class HintScanPrioritizer implements ScanPrioritizer { } } + if (!typePriorities.isEmpty()) { + String scanType = si.getExecutionHints().get("scan_type"); + if (scanType != null) { + Integer typePrio = typePriorities.get(scanType); + if (typePrio != null) { + return typePrio; + } + } + } + return defaultPriority; } @@ -94,10 +115,22 @@ public class HintScanPrioritizer implements ScanPrioritizer { int defaultPriority = Integer .parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE + "")); + Builder<String,Integer> tpb = ImmutableMap.builder(); + + params.getOptions().forEach((k, v) -> { + if (k.startsWith(PRIO_PREFIX)) { + String type = k.substring(PRIO_PREFIX.length()); + tpb.put(type, Integer.parseInt(v)); + } + }); + + ImmutableMap<String,Integer> typePriorities = tpb.build(); + HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions() .getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase()); - Comparator<ScanInfo> cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority, hpa)); + Comparator<ScanInfo> cmp = Comparator + .comparingInt(si -> getPriority(si, defaultPriority, hpa, typePriorities)); return cmp.thenComparingLong(si -> si.getLastRunTime().orElse(0)) .thenComparingLong(ScanInfo::getCreationTime); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java index 9653f4f..4cba357 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java @@ -20,12 +20,10 @@ import java.util.Map; import java.util.Set; import org.apache.accumulo.core.client.ScannerBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; /** * If no options are given, then this will dispatch to an executor named {@code default}. This @@ -38,21 +36,9 @@ import com.google.common.collect.Sets; * scans to the named executor.</LI> * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} : dispatches regular * scans to the named executor.</LI> - * <LI>{@code table.scan.dispatcher.opts.heed_hints=true|false} : This option defaults to false, so - * by default execution hints are ignored. When set to true, the executor can be set on the scanner. - * This is done by putting the key/value {@code executor=<scan executor name>} in the map passed to - * {@link ScannerBase#setExecutionHints(Map)} - * <LI>{@code table.scan.dispatcher.opts.bad_hint_action=none|log|fail} : When - * {@code heed_hints=true}, this option determines what to do if the executor in a hint does not - * exist. The possible values for this option are {@code none}, {@code log}, or {@code error}. - * Setting {@code none} will silently ignore invalid hints. Setting {@code log} will log a warning - * for invalid hints. Setting {@code fail} will throw an exception likely causing the scan to fail. - * For {@code log} and {@code none}, when there is an invalid hint it will fall back to the table - * configuration. The default is {@code log}. - * <LI>{@code table.scan.dispatcher.opts.ignored_hint_action=none|log|fail} : When - * {@code heed_hints=false}, this option determines what to do if a hint specifies an executor. The - * possible values for this option are {@code none}, {@code log}, or {@code fail}. The default is - * {@code log}. + * <LI>{@code table.scan.dispatcher.opts.executor.<type>=<scan executor name>} : dispatches scans + * that set the hint {@code scan_type=<type>} to the named executor. If this setting matches then it + * takes precedence over all other settings. See {@link ScannerBase#setExecutionHints(Map)}</LI> * * </UL> * @@ -62,74 +48,52 @@ import com.google.common.collect.Sets; public class SimpleScanDispatcher implements ScanDispatcher { + private final String EXECUTOR_PREFIX = "executor."; + private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor", - "single_executor", "heed_hints", "bad_hint_action", "ignored_hint_action"); + "single_executor"); private String multiExecutor; private String singleExecutor; - private boolean heedHints; - private HintProblemAction badHintAction; - private HintProblemAction ignoredHintHaction; - - public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default"; - private static final Logger log = LoggerFactory.getLogger(SimpleScanDispatcher.class); + private Map<String,String> typeExecutors; - private enum HintProblemAction { - NONE, LOG, FAIL - } + public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default"; @Override public void init(InitParameters params) { Map<String,String> options = params.getOptions(); - Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS); - Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts); + + Builder<String,String> teb = ImmutableMap.builder(); + + options.forEach((k, v) -> { + if (k.startsWith(EXECUTOR_PREFIX)) { + String type = k.substring(EXECUTOR_PREFIX.length()); + teb.put(type, v); + } else if (!VALID_OPTS.contains(k)) { + throw new IllegalArgumentException("Invalid option " + k); + } + }); + + typeExecutors = teb.build(); String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME); multiExecutor = options.getOrDefault("multi_executor", base); singleExecutor = options.getOrDefault("single_executor", base); - heedHints = Boolean.parseBoolean(options.getOrDefault("heed_hints", "false")); - badHintAction = HintProblemAction.valueOf( - options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase()); - ignoredHintHaction = HintProblemAction.valueOf( - options.getOrDefault("ignored_hint_action", HintProblemAction.LOG.name()).toUpperCase()); + } @Override public String dispatch(DispatchParmaters params) { ScanInfo scanInfo = params.getScanInfo(); - if (heedHints) { - String executor = scanInfo.getExecutionHints().get("executor"); - if (executor != null) { - if (params.getScanExecutors().containsKey(executor)) { + + if (!typeExecutors.isEmpty()) { + String scanType = scanInfo.getExecutionHints().get("scan_type"); + if (scanType != null) { + String executor = typeExecutors.get(scanType); + if (executor != null) { return executor; - } else { - switch (badHintAction) { - case FAIL: - throw new IllegalArgumentException( - "Scan execution hint contained unknown executor " + executor); - case LOG: - log.warn("Scan execution hint contained unknown executor {} ", executor); - break; - case NONE: - break; - default: - throw new IllegalStateException(); - } } } - } else if (ignoredHintHaction != HintProblemAction.NONE - && scanInfo.getExecutionHints().containsKey("executor")) { - String executor = scanInfo.getExecutionHints().get("executor"); - switch (ignoredHintHaction) { - case FAIL: - throw new IllegalArgumentException( - "Scan execution hint contained executor " + executor + " when heed_hints=false"); - case LOG: - log.warn("Scan execution hint contained executor {} when heed_hints=false", executor); - break; - default: - throw new IllegalStateException(); - } } switch (scanInfo.getScanType()) { diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java new file mode 100644 index 0000000..0faa2f0 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.scan.ScanInfo.Type; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class HintScanPrioritizerTest { + @Test + public void testSort() { + long now = System.currentTimeMillis(); + + List<TestScanInfo> scans = new ArrayList<>(); + + // Two following have never run, so oldest should go first + scans.add(new TestScanInfo("a", Type.SINGLE, now - 7)); + scans.add( + new TestScanInfo("b", Type.SINGLE, now - 3).setExecutionHints("scan_type", "background")); + scans.add( + new TestScanInfo("c", Type.SINGLE, now - 4).setExecutionHints("scan_type", "background")); + scans.add(new TestScanInfo("d", Type.SINGLE, now - 3).setExecutionHints("scan_type", "isbn")); + scans.add(new TestScanInfo("e", Type.SINGLE, now - 5).setExecutionHints("scan_type", "isbn")); + scans.add(new TestScanInfo("f", Type.SINGLE, now - 1).setExecutionHints("priority", "35")); + scans.add(new TestScanInfo("g", Type.SINGLE, now - 2).setExecutionHints("priority", "25")); + scans.add(new TestScanInfo("h", Type.SINGLE, now - 3).setExecutionHints("priority", "15")); + scans.add(new TestScanInfo("i", Type.SINGLE, now - 4).setExecutionHints("priority", "5")); + + Collections.shuffle(scans); + + Comparator<ScanInfo> comparator = new HintScanPrioritizer() + .createComparator(new ScanPrioritizer.CreateParameters() { + + @Override + public Map<String,String> getOptions() { + return ImmutableMap.of("priority.isbn", "10", "priority.background", "30", + "default_priority", "20"); + } + + @Override + public ServiceEnvironment getServiceEnv() { + throw new UnsupportedOperationException(); + } + }); + + Collections.sort(scans, comparator); + + assertEquals("i", scans.get(0).testId); + assertEquals("e", scans.get(1).testId); + assertEquals("d", scans.get(2).testId); + assertEquals("h", scans.get(3).testId); + assertEquals("a", scans.get(4).testId); + assertEquals("g", scans.get(5).testId); + assertEquals("c", scans.get(6).testId); + assertEquals("b", scans.get(7).testId); + assertEquals("f", scans.get(8).testId); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java index a35e527..432e6dd 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java @@ -125,29 +125,10 @@ public class SimpleScanDispatcherTest { @Test public void testHints() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), ImmutableMap.of(), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), - ImmutableMap.of("executor", "E2"), "E2", "E2"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), - ImmutableMap.of("executor", "E5"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "ignored_hint_action", "fail"), - ImmutableMap.of("executor", "E5"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail", - "ignored_hint_action", "fail"), ImmutableMap.of("executor", "E2"), "E2", "E2"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false"), - ImmutableMap.of("executor", "E2"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("executor", "E2"), "E1", "E1"); - } - - @Test(expected = IllegalArgumentException.class) - public void testBadHint() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail"), - ImmutableMap.of("executor", "E5"), "E2", "E2"); - } - - @Test(expected = IllegalArgumentException.class) - public void testIgnoredHint() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false", "ignored_hint_action", "fail"), - ImmutableMap.of("executor", "E2"), "E1", "E1"); + runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("scan_type", "quick"), "E1", "E1"); + runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2"), + ImmutableMap.of("scan_type", "quick"), "E2", "E2"); + runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2", "executor.slow", "E3"), + ImmutableMap.of("scan_type", "slow"), "E3", "E3"); } } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java index a9c72f8..4e14099 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java @@ -29,6 +29,8 @@ import org.apache.accumulo.core.spi.common.IteratorConfiguration; import org.apache.accumulo.core.spi.common.Stats; import org.apache.accumulo.core.util.Stat; +import com.google.common.collect.ImmutableMap; + public class TestScanInfo implements ScanInfo { String testId; @@ -56,6 +58,11 @@ public class TestScanInfo implements ScanInfo { } } + TestScanInfo setExecutionHints(String k, String v) { + this.executionHints = ImmutableMap.of(k, v); + return this; + } + @Override public Type getScanType() { return scanType;