This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 91884df  fixes #1024 Do less work in the wait command (#1031)
91884df is described below

commit 91884df193b9819830bf21444ba1f42c09ba9c68
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue Apr 17 10:32:43 2018 -0400

    fixes #1024 Do less work in the wait command (#1031)
---
 .../java/org/apache/fluo/command/FluoWait.java     | 113 +++++++++++++--------
 .../java/org/apache/fluo/core/util/ScanUtil.java   |  11 +-
 .../core/worker/finder/hash/PartitionManager.java  |   2 +-
 .../fluo/core/worker/finder/hash/TableRange.java   |  23 +++--
 .../worker/finder/hash/PartitionManagerTest.java   |   2 +-
 .../core/worker/finder/hash/TableRangeTest.java    |   5 +-
 6 files changed, 99 insertions(+), 57 deletions(-)

diff --git 
a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java 
b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index 2bba0c7..9c0b191 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -15,72 +15,105 @@
 
 package org.apache.fluo.command;
 
-import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.fluo.core.worker.finder.hash.TableRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 public class FluoWait {
 
   private static final Logger log = LoggerFactory.getLogger(FluoWait.class);
-  private static final long MIN_SLEEP_SEC = 10;
-  private static final long MAX_SLEEP_SEC = 300;
-
-  private static long calculateSleep(long notifyCount) {
-    long sleep = notifyCount / 500;
-    if (sleep < MIN_SLEEP_SEC) {
-      return MIN_SLEEP_SEC;
-    } else if (sleep > MAX_SLEEP_SEC) {
-      return MAX_SLEEP_SEC;
-    }
-    return sleep;
+  private static final long MIN_SLEEP_MS = 250;
+  private static final long MAX_SLEEP_MS = MINUTES.toMillis(5);
+
+  private static List<TableRange> getRanges(Environment env)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+    List<TableRange> ranges =
+        
TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable()));
+    Collections.shuffle(ranges);
+    return ranges;
   }
 
-  private static long countNotifications(Environment env) {
-    Scanner scanner;
+  private static boolean hasNotifications(Environment env, TableRange range)
+      throws TableNotFoundException {
+    Scanner scanner = null;
     try {
       scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
-    } catch (TableNotFoundException e) {
-      log.error("An exception was thrown -", e);
-      throw new FluoException(e);
+      scanner.setRange(range.getRange());
+      Notification.configureScanner(scanner);
+
+      return scanner.iterator().hasNext();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
     }
+  }
+
+  /**
+   * Wait until a range has no notifications.
+   *
+   * @return true if notifications were ever seen while waiting
+   */
+  private static boolean waitTillNoNotifications(Environment env, TableRange 
range)
+      throws TableNotFoundException {
+    boolean sawNotifications = false;
+    long retryTime = MIN_SLEEP_MS;
 
-    Notification.configureScanner(scanner);
+    log.debug("Scanning tablet {} for notifications", range);
 
-    return Iterables.size(scanner);
+    long start = System.currentTimeMillis();
+    while (hasNotifications(env, range)) {
+      sawNotifications = true;
+      long sleepTime = Math.max(System.currentTimeMillis() - start, retryTime);
+      log.debug("Tablet {} had notfications, will rescan in {}ms", range, 
sleepTime);
+      UtilWaitThread.sleep(sleepTime);
+      retryTime = Math.min(MAX_SLEEP_MS, (long) (retryTime * 1.5));
+      start = System.currentTimeMillis();
+    }
+
+    return sawNotifications;
   }
 
+  /**
+   * Wait until a scan of the table completes without seeing notifications AND 
without the Oracle
+   * issuing any timestamps during the scan.
+   */
   private static void waitUntilFinished(FluoConfiguration config) {
     try (Environment env = new Environment(config)) {
-      log.info("The wait command will exit when all notifications are 
processed");
-      while (true) {
+      List<TableRange> ranges = getRanges(env);
+
+      outer: while (true) {
         long ts1 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        long ntfyCount = countNotifications(env);
-        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
-          log.info("All processing has finished!");
-          break;
+        for (TableRange range : ranges) {
+          boolean sawNotifications = waitTillNoNotifications(env, range);
+          if (sawNotifications) {
+            ranges = getRanges(env);
+            // This range had notifications. Processing those notifications 
may have created
+            // notifications in previously scanned ranges, so start over.
+            continue outer;
+          }
         }
+        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
 
-        try {
-          long sleepSec = calculateSleep(ntfyCount);
-          log.info("{} notifications are still outstanding.  Will try again in 
{} seconds...",
-              ntfyCount, sleepSec);
-          Thread.sleep(1000 * sleepSec);
-        } catch (InterruptedException e) {
-          log.error("Sleep was interrupted!  Exiting...");
-          System.exit(-1);
+        // Check to ensure the Oracle issued no timestamps during the scan for 
notifications.
+        if (ts2 - ts1 == 1) {
+          break;
         }
       }
-    } catch (FluoException e) {
-      log.error(e.getMessage());
-      System.exit(-1);
     } catch (Exception e) {
       log.error("An exception was thrown -", e);
       System.exit(-1);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 4690fd5..3cb9d49 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -26,6 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
+import com.google.common.collect.Iterables;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonIOException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.security.Authorizations;
@@ -40,12 +45,6 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 
-import com.google.common.collect.Iterables;
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonIOException;
-
 public class ScanUtil {
   public static final String FLUO_VALUE = "value";
   public static final String FLUO_COLUMN_VISIBILITY = "visibility";
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
index 4474c78..89647f6 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
@@ -199,7 +199,7 @@ public class PartitionManager {
       List<Bytes> zkSplits = new ArrayList<>();
       SerializedSplits.deserialize(zkSplits::add, zkSplitData);
 
-      Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits);
+      Collection<TableRange> tableRanges = TableRange.fromBytes(zkSplits);
       PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);
 
       setPartitionInfo(newPI);
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
index a7517f8..a3c70bb 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -20,10 +20,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.data.Range;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.Hex;
 import org.apache.hadoop.io.Text;
 
 import static java.util.stream.Collectors.toList;
@@ -69,12 +71,14 @@ public class TableRange implements Comparable<TableRange> {
 
   @Override
   public String toString() {
-    return getPrevEndRow() + " " + getEndRow();
-  }
+    String per = prevEndRow == null ? "-INF" : Hex.encNonAscii(prevEndRow);
+    String er = endRow == null ? "+INF" : Hex.encNonAscii(endRow);
 
+    return "(" + per + " " + er + "]";
+  }
 
-  public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) {
-    List<Bytes> sortedRows = rows.stream().sorted().collect(toList());
+  private static List<TableRange> fromStream(Stream<Bytes> stream) {
+    List<Bytes> sortedRows = stream.sorted().collect(toList());
     List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1);
     for (int i = 0; i < sortedRows.size(); i++) {
       tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), 
sortedRows.get(i)));
@@ -83,9 +87,16 @@ public class TableRange implements Comparable<TableRange> {
     tablets.add(new TableRange(
         sortedRows.size() == 0 ? null : sortedRows.get(sortedRows.size() - 1), 
null));
     return tablets;
+
   }
 
+  public static List<TableRange> fromBytes(Collection<Bytes> rows) {
+    return fromStream(rows.stream());
+  }
 
+  public static List<TableRange> fromTexts(Collection<Text> rows) {
+    return fromStream(rows.stream().map(ByteUtil::toBytes));
+  }
 
   public Range getRange() {
     Text tper = 
Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null);
diff --git 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
index 3bf5307..8b38065 100644
--- 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
+++ 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -50,7 +50,7 @@ public class PartitionManagerTest {
 
           Collection<Bytes> rows = IntStream.iterate(0, i -> i + 
1000).limit(numSplits)
               .mapToObj(i -> String.format("r%06d", 
i)).map(Bytes::of).collect(toList());
-          Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
+          Collection<TableRange> tablets = TableRange.fromBytes(rows);
 
           Set<String> idCombos = new HashSet<>();
           Map<Integer, RangeSet> groupTablets = new HashMap<>();
diff --git 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
index 637186f..0018d09 100644
--- 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -69,8 +69,7 @@ public class TableRangeTest {
     Bytes sp2 = Bytes.of("m1");
     Bytes sp3 = Bytes.of("r1");
 
-    Collection<TableRange> trc1 =
-        new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1)));
+    Collection<TableRange> trc1 = new 
HashSet<>(TableRange.fromBytes(Arrays.asList(sp2, sp3, sp1)));
 
     Assert.assertEquals(4, trc1.size());
     Assert.assertTrue(trc1.contains(new TableRange(null, sp1)));
@@ -78,7 +77,7 @@ public class TableRangeTest {
     Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3)));
     Assert.assertTrue(trc1.contains(new TableRange(sp3, null)));
 
-    Collection<TableRange> trc2 = new 
HashSet<>(TableRange.toTabletRanges(Collections.emptyList()));
+    Collection<TableRange> trc2 = new 
HashSet<>(TableRange.fromBytes(Collections.emptyList()));
     Assert.assertEquals(1, trc2.size());
     Assert.assertTrue(trc2.contains(new TableRange(null, null)));
   }

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to