This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8df9a  Support dispatching and prioritizing by user scan types (#972)
4a8df9a is described below

commit 4a8df9af165a211626e1fc8ad836251f11390153
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Feb 21 12:02:27 2019 -0500

    Support dispatching and prioritizing by user scan types (#972)
    
    While working on apache/fluo#1055 I found it cumbersome to configure
    Fluo and Accumulo to dispatch Fluo notification scans to a dedicated
    executor.  With these changes its much simpler to do that.  Fluo can
    set an execution hint  like `scan_type=fluo-ntfy` and Accumulo can be
    configured to execute the scan based on the type without changing Fluo
    config or source code.
    
    Before these changes, Fluo would have required either a custom
    dispatcher or custom Fluo config for Accumulo executors. With these
    changes nothing needs to be done in Fluo.
---
 .../apache/accumulo/core/client/ScannerBase.java   |  5 ++
 .../core/spi/scan/HintScanPrioritizer.java         | 45 ++++++++--
 .../core/spi/scan/SimpleScanDispatcher.java        | 96 +++++++---------------
 .../core/spi/scan/HintScanPrioritizerTest.java     | 82 ++++++++++++++++++
 .../core/spi/scan/SimpleScanDispatcherTest.java    | 29 ++-----
 .../accumulo/core/spi/scan/TestScanInfo.java       |  7 ++
 6 files changed, 168 insertions(+), 96 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java 
b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 328c066..59acc0c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -341,6 +341,11 @@ public interface ScannerBase extends 
Iterable<Entry<Key,Value>>, AutoCloseable {
    * scan, only how quickly it is returned.
    *
    * <p>
+   * Using the hint {@code scan_type=<type>} and documenting all of the types 
for your application
+   * is one strategy to consider. This allows administrators to adjust 
executor and prioritizer
+   * config for your application scan types without having to change the 
application source code.
+   *
+   * <p>
    * The default configuration for Accumulo will ignore hints. See {@link 
HintScanPrioritizer} and
    * {@link SimpleScanDispatcher} for examples of classes that can react to 
hints.
    *
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
index 22063ef..f713861 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
@@ -24,14 +24,18 @@ import org.apache.accumulo.core.client.ScannerBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
 /**
  * When configured for a scan executor, this prioritizer allows scanners to 
set priorities as
- * integers.
+ * integers. Lower integers result in higher priority.
  *
  * <p>
- * Scanners should put the key/value {@code priority=<integer>} in the map 
passed to
- * {@link ScannerBase#setExecutionHints(Map)} to set the priority. Lower 
integers result in higher
- * priority.
+ * Scanners can put the key/values {@code priority=<integer>} and/or {@code 
scan_type=<type>} in the
+ * map passed to {@link ScannerBase#setExecutionHints(Map)} to set the 
priority. When a
+ * {@code priority} hint is set it takes precedence and the value is used as 
the priority. When a
+ * {@code scan_type} hint is set the priority is looked up using the value.
  *
  * <p>
  * This prioritizer accepts the option {@code default_priority=<integer>} 
which determines what
@@ -45,6 +49,10 @@ import org.slf4j.LoggerFactory;
  * option silently ignores invalid hints.
  *
  * <p>
+ * This prioritizer accepts the option {@code priority.<type>=<integer>} which 
maps a scan type hint
+ * to a priority.
+ *
+ * <p>
  * When two scans have the same priority, the scan is prioritized based on 
last run time and then
  * creation time.
  *
@@ -61,11 +69,14 @@ public class HintScanPrioritizer implements ScanPrioritizer 
{
 
   private static final Logger log = 
LoggerFactory.getLogger(HintScanPrioritizer.class);
 
+  private final String PRIO_PREFIX = "priority.";
+
   private enum HintProblemAction {
     NONE, LOG, FAIL
   }
 
-  private static int getPriority(ScanInfo si, int defaultPriority, 
HintProblemAction hpa) {
+  private static int getPriority(ScanInfo si, int defaultPriority, 
HintProblemAction hpa,
+      Map<String,Integer> typePriorities) {
     String prio = si.getExecutionHints().get("priority");
     if (prio != null) {
       try {
@@ -86,6 +97,16 @@ public class HintScanPrioritizer implements ScanPrioritizer {
       }
     }
 
+    if (!typePriorities.isEmpty()) {
+      String scanType = si.getExecutionHints().get("scan_type");
+      if (scanType != null) {
+        Integer typePrio = typePriorities.get(scanType);
+        if (typePrio != null) {
+          return typePrio;
+        }
+      }
+    }
+
     return defaultPriority;
   }
 
@@ -94,10 +115,22 @@ public class HintScanPrioritizer implements 
ScanPrioritizer {
     int defaultPriority = Integer
         .parseInt(params.getOptions().getOrDefault("default_priority", 
Integer.MAX_VALUE + ""));
 
+    Builder<String,Integer> tpb = ImmutableMap.builder();
+
+    params.getOptions().forEach((k, v) -> {
+      if (k.startsWith(PRIO_PREFIX)) {
+        String type = k.substring(PRIO_PREFIX.length());
+        tpb.put(type, Integer.parseInt(v));
+      }
+    });
+
+    ImmutableMap<String,Integer> typePriorities = tpb.build();
+
     HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions()
         .getOrDefault("bad_hint_action", 
HintProblemAction.LOG.name()).toUpperCase());
 
-    Comparator<ScanInfo> cmp = Comparator.comparingInt(si -> getPriority(si, 
defaultPriority, hpa));
+    Comparator<ScanInfo> cmp = Comparator
+        .comparingInt(si -> getPriority(si, defaultPriority, hpa, 
typePriorities));
 
     return cmp.thenComparingLong(si -> si.getLastRunTime().orElse(0))
         .thenComparingLong(ScanInfo::getCreationTime);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index 9653f4f..4cba357 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -20,12 +20,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.ScannerBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 
 /**
  * If no options are given, then this will dispatch to an executor named 
{@code default}. This
@@ -38,21 +36,9 @@ import com.google.common.collect.Sets;
  * scans to the named executor.</LI>
  * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} 
: dispatches regular
  * scans to the named executor.</LI>
- * <LI>{@code table.scan.dispatcher.opts.heed_hints=true|false} : This option 
defaults to false, so
- * by default execution hints are ignored. When set to true, the executor can 
be set on the scanner.
- * This is done by putting the key/value {@code executor=<scan executor name>} 
in the map passed to
- * {@link ScannerBase#setExecutionHints(Map)}
- * <LI>{@code table.scan.dispatcher.opts.bad_hint_action=none|log|fail} : When
- * {@code heed_hints=true}, this option determines what to do if the executor 
in a hint does not
- * exist. The possible values for this option are {@code none}, {@code log}, 
or {@code error}.
- * Setting {@code none} will silently ignore invalid hints. Setting {@code 
log} will log a warning
- * for invalid hints. Setting {@code fail} will throw an exception likely 
causing the scan to fail.
- * For {@code log} and {@code none}, when there is an invalid hint it will 
fall back to the table
- * configuration. The default is {@code log}.
- * <LI>{@code table.scan.dispatcher.opts.ignored_hint_action=none|log|fail} : 
When
- * {@code heed_hints=false}, this option determines what to do if a hint 
specifies an executor. The
- * possible values for this option are {@code none}, {@code log}, or {@code 
fail}. The default is
- * {@code log}.
+ * <LI>{@code table.scan.dispatcher.opts.executor.<type>=<scan executor name>} 
: dispatches scans
+ * that set the hint {@code scan_type=<type>} to the named executor. If this 
setting matches then it
+ * takes precedence over all other settings. See {@link 
ScannerBase#setExecutionHints(Map)}</LI>
  *
  * </UL>
  *
@@ -62,74 +48,52 @@ import com.google.common.collect.Sets;
 
 public class SimpleScanDispatcher implements ScanDispatcher {
 
+  private final String EXECUTOR_PREFIX = "executor.";
+
   private final Set<String> VALID_OPTS = ImmutableSet.of("executor", 
"multi_executor",
-      "single_executor", "heed_hints", "bad_hint_action", 
"ignored_hint_action");
+      "single_executor");
   private String multiExecutor;
   private String singleExecutor;
-  private boolean heedHints;
-  private HintProblemAction badHintAction;
-  private HintProblemAction ignoredHintHaction;
-
-  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
 
-  private static final Logger log = 
LoggerFactory.getLogger(SimpleScanDispatcher.class);
+  private Map<String,String> typeExecutors;
 
-  private enum HintProblemAction {
-    NONE, LOG, FAIL
-  }
+  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
 
   @Override
   public void init(InitParameters params) {
     Map<String,String> options = params.getOptions();
-    Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
-    Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : 
%s", invalidOpts);
+
+    Builder<String,String> teb = ImmutableMap.builder();
+
+    options.forEach((k, v) -> {
+      if (k.startsWith(EXECUTOR_PREFIX)) {
+        String type = k.substring(EXECUTOR_PREFIX.length());
+        teb.put(type, v);
+      } else if (!VALID_OPTS.contains(k)) {
+        throw new IllegalArgumentException("Invalid option " + k);
+      }
+    });
+
+    typeExecutors = teb.build();
 
     String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
     multiExecutor = options.getOrDefault("multi_executor", base);
     singleExecutor = options.getOrDefault("single_executor", base);
-    heedHints = Boolean.parseBoolean(options.getOrDefault("heed_hints", 
"false"));
-    badHintAction = HintProblemAction.valueOf(
-        options.getOrDefault("bad_hint_action", 
HintProblemAction.LOG.name()).toUpperCase());
-    ignoredHintHaction = HintProblemAction.valueOf(
-        options.getOrDefault("ignored_hint_action", 
HintProblemAction.LOG.name()).toUpperCase());
+
   }
 
   @Override
   public String dispatch(DispatchParmaters params) {
     ScanInfo scanInfo = params.getScanInfo();
-    if (heedHints) {
-      String executor = scanInfo.getExecutionHints().get("executor");
-      if (executor != null) {
-        if (params.getScanExecutors().containsKey(executor)) {
+
+    if (!typeExecutors.isEmpty()) {
+      String scanType = scanInfo.getExecutionHints().get("scan_type");
+      if (scanType != null) {
+        String executor = typeExecutors.get(scanType);
+        if (executor != null) {
           return executor;
-        } else {
-          switch (badHintAction) {
-            case FAIL:
-              throw new IllegalArgumentException(
-                  "Scan execution hint contained unknown executor " + 
executor);
-            case LOG:
-              log.warn("Scan execution hint contained unknown executor {} ", 
executor);
-              break;
-            case NONE:
-              break;
-            default:
-              throw new IllegalStateException();
-          }
         }
       }
-    } else if (ignoredHintHaction != HintProblemAction.NONE
-        && scanInfo.getExecutionHints().containsKey("executor")) {
-      String executor = scanInfo.getExecutionHints().get("executor");
-      switch (ignoredHintHaction) {
-        case FAIL:
-          throw new IllegalArgumentException(
-              "Scan execution hint contained executor " + executor + " when 
heed_hints=false");
-        case LOG:
-          log.warn("Scan execution hint contained executor {} when 
heed_hints=false", executor);
-          break;
-        default:
-          throw new IllegalStateException();
-      }
     }
 
     switch (scanInfo.getScanType()) {
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
new file mode 100644
index 0000000..0faa2f0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class HintScanPrioritizerTest {
+  @Test
+  public void testSort() {
+    long now = System.currentTimeMillis();
+
+    List<TestScanInfo> scans = new ArrayList<>();
+
+    // Two following have never run, so oldest should go first
+    scans.add(new TestScanInfo("a", Type.SINGLE, now - 7));
+    scans.add(
+        new TestScanInfo("b", Type.SINGLE, now - 
3).setExecutionHints("scan_type", "background"));
+    scans.add(
+        new TestScanInfo("c", Type.SINGLE, now - 
4).setExecutionHints("scan_type", "background"));
+    scans.add(new TestScanInfo("d", Type.SINGLE, now - 
3).setExecutionHints("scan_type", "isbn"));
+    scans.add(new TestScanInfo("e", Type.SINGLE, now - 
5).setExecutionHints("scan_type", "isbn"));
+    scans.add(new TestScanInfo("f", Type.SINGLE, now - 
1).setExecutionHints("priority", "35"));
+    scans.add(new TestScanInfo("g", Type.SINGLE, now - 
2).setExecutionHints("priority", "25"));
+    scans.add(new TestScanInfo("h", Type.SINGLE, now - 
3).setExecutionHints("priority", "15"));
+    scans.add(new TestScanInfo("i", Type.SINGLE, now - 
4).setExecutionHints("priority", "5"));
+
+    Collections.shuffle(scans);
+
+    Comparator<ScanInfo> comparator = new HintScanPrioritizer()
+        .createComparator(new ScanPrioritizer.CreateParameters() {
+
+          @Override
+          public Map<String,String> getOptions() {
+            return ImmutableMap.of("priority.isbn", "10", 
"priority.background", "30",
+                "default_priority", "20");
+          }
+
+          @Override
+          public ServiceEnvironment getServiceEnv() {
+            throw new UnsupportedOperationException();
+          }
+        });
+
+    Collections.sort(scans, comparator);
+
+    assertEquals("i", scans.get(0).testId);
+    assertEquals("e", scans.get(1).testId);
+    assertEquals("d", scans.get(2).testId);
+    assertEquals("h", scans.get(3).testId);
+    assertEquals("a", scans.get(4).testId);
+    assertEquals("g", scans.get(5).testId);
+    assertEquals("c", scans.get(6).testId);
+    assertEquals("b", scans.get(7).testId);
+    assertEquals("f", scans.get(8).testId);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index a35e527..432e6dd 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -125,29 +125,10 @@ public class SimpleScanDispatcherTest {
 
   @Test
   public void testHints() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), 
ImmutableMap.of(), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
-        ImmutableMap.of("executor", "E2"), "E2", "E2");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
-        ImmutableMap.of("executor", "E5"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", 
"ignored_hint_action", "fail"),
-        ImmutableMap.of("executor", "E5"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", 
"bad_hint_action", "fail",
-        "ignored_hint_action", "fail"), ImmutableMap.of("executor", "E2"), 
"E2", "E2");
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false"),
-        ImmutableMap.of("executor", "E2"), "E1", "E1");
-    runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("executor", 
"E2"), "E1", "E1");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBadHint() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", 
"bad_hint_action", "fail"),
-        ImmutableMap.of("executor", "E5"), "E2", "E2");
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testIgnoredHint() {
-    runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false", 
"ignored_hint_action", "fail"),
-        ImmutableMap.of("executor", "E2"), "E1", "E1");
+    runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("scan_type", 
"quick"), "E1", "E1");
+    runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2"),
+        ImmutableMap.of("scan_type", "quick"), "E2", "E2");
+    runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2", 
"executor.slow", "E3"),
+        ImmutableMap.of("scan_type", "slow"), "E3", "E3");
   }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
index a9c72f8..4e14099 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -29,6 +29,8 @@ import 
org.apache.accumulo.core.spi.common.IteratorConfiguration;
 import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.accumulo.core.util.Stat;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestScanInfo implements ScanInfo {
 
   String testId;
@@ -56,6 +58,11 @@ public class TestScanInfo implements ScanInfo {
     }
   }
 
+  TestScanInfo setExecutionHints(String k, String v) {
+    this.executionHints = ImmutableMap.of(k, v);
+    return this;
+  }
+
   @Override
   public Type getScanType() {
     return scanType;

Reply via email to