This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push: new 91884df fixes #1024 Do less work in the wait command (#1031) 91884df is described below commit 91884df193b9819830bf21444ba1f42c09ba9c68 Author: Keith Turner <ke...@deenlo.com> AuthorDate: Tue Apr 17 10:32:43 2018 -0400 fixes #1024 Do less work in the wait command (#1031) --- .../java/org/apache/fluo/command/FluoWait.java | 113 +++++++++++++-------- .../java/org/apache/fluo/core/util/ScanUtil.java | 11 +- .../core/worker/finder/hash/PartitionManager.java | 2 +- .../fluo/core/worker/finder/hash/TableRange.java | 23 +++-- .../worker/finder/hash/PartitionManagerTest.java | 2 +- .../core/worker/finder/hash/TableRangeTest.java | 5 +- 6 files changed, 99 insertions(+), 57 deletions(-) diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java index 2bba0c7..9c0b191 100644 --- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java +++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -15,72 +15,105 @@ package org.apache.fluo.command; -import com.google.common.collect.Iterables; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.exceptions.FluoException; import org.apache.fluo.core.client.FluoAdminImpl; import org.apache.fluo.core.impl.Environment; import org.apache.fluo.core.impl.Notification; +import org.apache.fluo.core.util.UtilWaitThread; +import org.apache.fluo.core.worker.finder.hash.TableRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.concurrent.TimeUnit.MINUTES; + public class FluoWait { private static final Logger log = LoggerFactory.getLogger(FluoWait.class); - private static final long MIN_SLEEP_SEC = 10; - private static final long MAX_SLEEP_SEC = 300; - - private static long calculateSleep(long notifyCount) { - long sleep = notifyCount / 500; - if (sleep < MIN_SLEEP_SEC) { - return MIN_SLEEP_SEC; - } else if (sleep > MAX_SLEEP_SEC) { - return MAX_SLEEP_SEC; - } - return sleep; + private static final long MIN_SLEEP_MS = 250; + private static final long MAX_SLEEP_MS = MINUTES.toMillis(5); + + private static List<TableRange> getRanges(Environment env) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + List<TableRange> ranges = + TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable())); + Collections.shuffle(ranges); + return ranges; } - private static long countNotifications(Environment env) { - Scanner scanner; + private static boolean hasNotifications(Environment env, TableRange range) + throws TableNotFoundException { + Scanner scanner = null; try { scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations()); - } catch (TableNotFoundException e) { - log.error("An exception was thrown -", e); - throw new FluoException(e); + scanner.setRange(range.getRange()); + Notification.configureScanner(scanner); + + return scanner.iterator().hasNext(); + } finally { + if (scanner != null) { + scanner.close(); + } } + } + + /** + * Wait until a range has no notifications. + * + * @return true if notifications were ever seen while waiting + */ + private static boolean waitTillNoNotifications(Environment env, TableRange range) + throws TableNotFoundException { + boolean sawNotifications = false; + long retryTime = MIN_SLEEP_MS; - Notification.configureScanner(scanner); + log.debug("Scanning tablet {} for notifications", range); - return Iterables.size(scanner); + long start = System.currentTimeMillis(); + while (hasNotifications(env, range)) { + sawNotifications = true; + long sleepTime = Math.max(System.currentTimeMillis() - start, retryTime); + log.debug("Tablet {} had notfications, will rescan in {}ms", range, sleepTime); + UtilWaitThread.sleep(sleepTime); + retryTime = Math.min(MAX_SLEEP_MS, (long) (retryTime * 1.5)); + start = System.currentTimeMillis(); + } + + return sawNotifications; } + /** + * Wait until a scan of the table completes without seeing notifications AND without the Oracle + * issuing any timestamps during the scan. + */ private static void waitUntilFinished(FluoConfiguration config) { try (Environment env = new Environment(config)) { - log.info("The wait command will exit when all notifications are processed"); - while (true) { + List<TableRange> ranges = getRanges(env); + + outer: while (true) { long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp(); - long ntfyCount = countNotifications(env); - long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp(); - if (ntfyCount == 0 && ts1 == (ts2 - 1)) { - log.info("All processing has finished!"); - break; + for (TableRange range : ranges) { + boolean sawNotifications = waitTillNoNotifications(env, range); + if (sawNotifications) { + ranges = getRanges(env); + // This range had notifications. Processing those notifications may have created + // notifications in previously scanned ranges, so start over. + continue outer; + } } + long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp(); - try { - long sleepSec = calculateSleep(ntfyCount); - log.info("{} notifications are still outstanding. Will try again in {} seconds...", - ntfyCount, sleepSec); - Thread.sleep(1000 * sleepSec); - } catch (InterruptedException e) { - log.error("Sleep was interrupted! Exiting..."); - System.exit(-1); + // Check to ensure the Oracle issued no timestamps during the scan for notifications. + if (ts2 - ts1 == 1) { + break; } } - } catch (FluoException e) { - log.error(e.getMessage()); - System.exit(-1); } catch (Exception e) { log.error("An exception was thrown -", e); System.exit(-1); diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java index 4690fd5..3cb9d49 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java +++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java @@ -26,6 +26,11 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import com.google.common.collect.Iterables; +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonIOException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.security.Authorizations; @@ -40,12 +45,6 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; -import com.google.common.collect.Iterables; -import com.google.gson.FieldNamingPolicy; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonIOException; - public class ScanUtil { public static final String FLUO_VALUE = "value"; public static final String FLUO_COLUMN_VISIBILITY = "visibility"; diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java index 4474c78..89647f6 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java @@ -199,7 +199,7 @@ public class PartitionManager { List<Bytes> zkSplits = new ArrayList<>(); SerializedSplits.deserialize(zkSplits::add, zkSplitData); - Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits); + Collection<TableRange> tableRanges = TableRange.fromBytes(zkSplits); PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize); setPartitionInfo(newPI); diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java index a7517f8..a3c70bb 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java @@ -4,9 +4,9 @@ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under @@ -20,10 +20,12 @@ import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; import org.apache.accumulo.core.data.Range; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.core.util.ByteUtil; +import org.apache.fluo.core.util.Hex; import org.apache.hadoop.io.Text; import static java.util.stream.Collectors.toList; @@ -69,12 +71,14 @@ public class TableRange implements Comparable<TableRange> { @Override public String toString() { - return getPrevEndRow() + " " + getEndRow(); - } + String per = prevEndRow == null ? "-INF" : Hex.encNonAscii(prevEndRow); + String er = endRow == null ? "+INF" : Hex.encNonAscii(endRow); + return "(" + per + " " + er + "]"; + } - public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) { - List<Bytes> sortedRows = rows.stream().sorted().collect(toList()); + private static List<TableRange> fromStream(Stream<Bytes> stream) { + List<Bytes> sortedRows = stream.sorted().collect(toList()); List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1); for (int i = 0; i < sortedRows.size(); i++) { tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), sortedRows.get(i))); @@ -83,9 +87,16 @@ public class TableRange implements Comparable<TableRange> { tablets.add(new TableRange( sortedRows.size() == 0 ? null : sortedRows.get(sortedRows.size() - 1), null)); return tablets; + } + public static List<TableRange> fromBytes(Collection<Bytes> rows) { + return fromStream(rows.stream()); + } + public static List<TableRange> fromTexts(Collection<Text> rows) { + return fromStream(rows.stream().map(ByteUtil::toBytes)); + } public Range getRange() { Text tper = Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null); diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java index 3bf5307..8b38065 100644 --- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java +++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java @@ -50,7 +50,7 @@ public class PartitionManagerTest { Collection<Bytes> rows = IntStream.iterate(0, i -> i + 1000).limit(numSplits) .mapToObj(i -> String.format("r%06d", i)).map(Bytes::of).collect(toList()); - Collection<TableRange> tablets = TableRange.toTabletRanges(rows); + Collection<TableRange> tablets = TableRange.fromBytes(rows); Set<String> idCombos = new HashSet<>(); Map<Integer, RangeSet> groupTablets = new HashMap<>(); diff --git a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java index 637186f..0018d09 100644 --- a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java +++ b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java @@ -69,8 +69,7 @@ public class TableRangeTest { Bytes sp2 = Bytes.of("m1"); Bytes sp3 = Bytes.of("r1"); - Collection<TableRange> trc1 = - new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1))); + Collection<TableRange> trc1 = new HashSet<>(TableRange.fromBytes(Arrays.asList(sp2, sp3, sp1))); Assert.assertEquals(4, trc1.size()); Assert.assertTrue(trc1.contains(new TableRange(null, sp1))); @@ -78,7 +77,7 @@ public class TableRangeTest { Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3))); Assert.assertTrue(trc1.contains(new TableRange(sp3, null))); - Collection<TableRange> trc2 = new HashSet<>(TableRange.toTabletRanges(Collections.emptyList())); + Collection<TableRange> trc2 = new HashSet<>(TableRange.fromBytes(Collections.emptyList())); Assert.assertEquals(1, trc2.size()); Assert.assertTrue(trc2.contains(new TableRange(null, null))); } -- To stop receiving notification emails like this one, please contact ktur...@apache.org.