backport burn test refactor

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd4a9d18
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd4a9d18
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd4a9d18

Branch: refs/heads/cassandra-2.2
Commit: bd4a9d18e1317dcb8542bd4adc5a9f99b108d6c6
Parents: 8a56868
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Sun Jun 28 11:38:22 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Sun Jun 28 11:38:22 2015 +0100

----------------------------------------------------------------------
 build.xml                                       |   7 +
 .../cassandra/concurrent/LongOpOrderTest.java   | 240 +++++++++
 .../concurrent/LongSharedExecutorPoolTest.java  | 226 +++++++++
 .../apache/cassandra/utils/LongBTreeTest.java   | 502 +++++++++++++++++++
 .../cassandra/concurrent/LongOpOrderTest.java   | 240 ---------
 .../concurrent/LongSharedExecutorPoolTest.java  | 228 ---------
 .../apache/cassandra/utils/LongBTreeTest.java   | 401 ---------------
 7 files changed, 975 insertions(+), 869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 73e76e5..18ad49f 100644
--- a/build.xml
+++ b/build.xml
@@ -93,6 +93,7 @@
 
     <property name="test.timeout" value="60000" />
     <property name="test.long.timeout" value="600000" />
+    <property name="test.burn.timeout" value="600000" />
 
     <!-- default for cql tests. Can be override by 
-Dcassandra.test.use_prepared=false -->
     <property name="cassandra.test.use_prepared" value="true" />
@@ -1258,6 +1259,12 @@
     </testmacro>
   </target>
 
+  <target name="test-burn" depends="build-test" description="Execute 
functional tests">
+    <testmacro suitename="burn" inputdir="${test.burn.src}"
+               timeout="${test.burn.timeout}">
+    </testmacro>
+  </target>
+
   <target name="long-test" depends="build-test" description="Execute 
functional tests">
     <testmacro suitename="long" inputdir="${test.long.src}"
                timeout="${test.long.timeout}">

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java 
b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..d7105df
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,240 @@
+package org.apache.cassandra.concurrent;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LongOpOrderTest.class);
+
+    static final int CONSUMERS = 4;
+    static final int PRODUCERS = 32;
+
+    static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+    static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+    static final Thread.UncaughtExceptionHandler handler = new 
Thread.UncaughtExceptionHandler()
+    {
+        @Override
+        public void uncaughtException(Thread t, Throwable e)
+        {
+            System.err.println(t.getName() + ": " + e.getMessage());
+            e.printStackTrace();
+        }
+    };
+
+    final OpOrder order = new OpOrder();
+    final AtomicInteger errors = new AtomicInteger();
+
+    class TestOrdering implements Runnable
+    {
+
+        final int[] waitNanos = new int[1 << 16];
+        volatile State state = new State();
+        final ScheduledExecutorService sched;
+
+        TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+        {
+            this.sched = sched;
+            final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            for (int i = 0 ; i < waitNanos.length ; i++)
+                waitNanos[i] = rnd.nextInt(5000);
+            for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+                exec.execute(new Producer());
+            exec.execute(this);
+        }
+
+        @Override
+        public void run()
+        {
+            final long until = System.currentTimeMillis() + RUNTIME;
+            long lastReport = System.currentTimeMillis();
+            long count = 0;
+            long opCount = 0;
+            while (true)
+            {
+                long now = System.currentTimeMillis();
+                if (now > until)
+                    break;
+                if (now > lastReport + REPORT_INTERVAL)
+                {
+                    lastReport = now;
+                    logger.info(String.format("%s: Executed %d barriers with 
%d operations. %.0f%% complete.",
+                            Thread.currentThread().getName(), count, opCount, 
100 * (1 - ((until - now) / (double) RUNTIME))));
+                }
+                try
+                {
+                    Thread.sleep(0, waitNanos[((int) (count & 
(waitNanos.length - 1)))]);
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+
+                final State s = state;
+                s.barrier = order.newBarrier();
+                s.replacement = new State();
+                s.barrier.issue();
+                s.barrier.await();
+                s.check();
+                opCount += s.totalCount();
+                state = s.replacement;
+                sched.schedule(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        s.check();
+                    }
+                }, 1, TimeUnit.SECONDS);
+                count++;
+            }
+        }
+
+        class State
+        {
+
+            volatile OpOrder.Barrier barrier;
+            volatile State replacement;
+            final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new 
NonBlockingHashMap<>();
+            int checkCount = -1;
+
+            boolean accept(OpOrder.Group opGroup)
+            {
+                if (barrier != null && !barrier.isAfter(opGroup))
+                    return false;
+                AtomicInteger c;
+                if (null == (c = count.get(opGroup)))
+                {
+                    count.putIfAbsent(opGroup, new AtomicInteger());
+                    c = count.get(opGroup);
+                }
+                c.incrementAndGet();
+                return true;
+            }
+
+            int totalCount()
+            {
+                int c = 0;
+                for (AtomicInteger v : count.values())
+                    c += v.intValue();
+                return c;
+            }
+
+            void check()
+            {
+                boolean delete;
+                if (checkCount >= 0)
+                {
+                    if (checkCount != totalCount())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received size changed after barrier 
finished: {} vs {}", checkCount, totalCount());
+                    }
+                    delete = true;
+                }
+                else
+                {
+                    checkCount = totalCount();
+                    delete = false;
+                }
+                for (Map.Entry<OpOrder.Group, AtomicInteger> e : 
count.entrySet())
+                {
+                    if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received an operation that was created 
after the barrier was issued.");
+                    }
+                    if (TestOrdering.this.count.get(e.getKey()).intValue() != 
e.getValue().intValue())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Missing registered operations. {} vs 
{}", TestOrdering.this.count.get(e.getKey()).intValue(), 
e.getValue().intValue());
+                    }
+                    if (delete)
+                        TestOrdering.this.count.remove(e.getKey());
+                }
+            }
+
+        }
+
+        final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new 
NonBlockingHashMap<>();
+
+        class Producer implements Runnable
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    AtomicInteger c;
+                    try (OpOrder.Group opGroup = order.start())
+                    {
+                        if (null == (c = count.get(opGroup)))
+                        {
+                            count.putIfAbsent(opGroup, new AtomicInteger());
+                            c = count.get(opGroup);
+                        }
+                        c.incrementAndGet();
+                        State s = state;
+                        while (!s.accept(opGroup))
+                            s = s.replacement;
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Test
+    public void testOrdering() throws InterruptedException
+    {
+        errors.set(0);
+        Thread.setDefaultUncaughtExceptionHandler(handler);
+        final ExecutorService exec = Executors.newCachedThreadPool(new 
NamedThreadFactory("checker"));
+        final ScheduledExecutorService checker = 
Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+        for (int i = 0 ; i < CONSUMERS ; i++)
+            new TestOrdering(exec, checker);
+        exec.shutdown();
+        exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+        assertTrue(exec.isShutdown());
+        assertTrue(errors.get() == 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git 
a/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java 
b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
new file mode 100644
index 0000000..fe464c7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.BitSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.math3.distribution.WeibullDistribution;
+import org.junit.Test;
+
+public class LongSharedExecutorPoolTest
+{
+
+    private static final class WaitTask implements Runnable
+    {
+        final long nanos;
+
+        private WaitTask(long nanos)
+        {
+            this.nanos = nanos;
+        }
+
+        public void run()
+        {
+            LockSupport.parkNanos(nanos);
+        }
+    }
+
+    private static final class Result implements Comparable<Result>
+    {
+        final Future<?> future;
+        final long forecastedCompletion;
+
+        private Result(Future<?> future, long forecastedCompletion)
+        {
+            this.future = future;
+            this.forecastedCompletion = forecastedCompletion;
+        }
+
+        public int compareTo(Result that)
+        {
+            int c = Long.compare(this.forecastedCompletion, 
that.forecastedCompletion);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.hashCode(), that.hashCode());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.future.hashCode(), 
that.future.hashCode());
+        }
+    }
+
+    private static final class Batch implements Comparable<Batch>
+    {
+        final TreeSet<Result> results;
+        final long timeout;
+        final int executorIndex;
+
+        private Batch(TreeSet<Result> results, long timeout, int executorIndex)
+        {
+            this.results = results;
+            this.timeout = timeout;
+            this.executorIndex = executorIndex;
+        }
+
+        public int compareTo(Batch that)
+        {
+            int c = Long.compare(this.timeout, that.timeout);
+            if (c != 0)
+                return c;
+            c = Integer.compare(this.results.size(), that.results.size());
+            if (c != 0)
+                return c;
+            return Integer.compare(this.hashCode(), that.hashCode());
+        }
+    }
+
+    @Test
+    public void testPromptnessOfExecution() throws InterruptedException, 
ExecutionException
+    {
+        testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
+    }
+
+    private void testPromptnessOfExecution(long intervalNanos, float 
loadIncrement) throws InterruptedException, ExecutionException
+    {
+        final int executorCount = 4;
+        int threadCount = 8;
+        int maxQueued = 1024;
+        final WeibullDistribution workTime = new WeibullDistribution(3, 
200000);
+        final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
+        final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
+
+        final int[] threadCounts = new int[executorCount];
+        final WeibullDistribution[] workCount = new 
WeibullDistribution[executorCount];
+        final ExecutorService[] executors = new ExecutorService[executorCount];
+        for (int i = 0 ; i < executors.length ; i++)
+        {
+            executors[i] = SharedExecutorPool.SHARED.newExecutor(threadCount, 
maxQueued, "test" + i, "test" + i);
+            threadCounts[i] = threadCount;
+            workCount[i] = new WeibullDistribution(2, maxQueued);
+            threadCount *= 2;
+            maxQueued *= 2;
+        }
+
+        long runs = 0;
+        long events = 0;
+        final TreeSet<Batch> pending = new TreeSet<>();
+        final BitSet executorsWithWork = new BitSet(executorCount);
+        long until = 0;
+        // basic idea is to go through different levels of load on the 
executor service; initially is all small batches
+        // (mostly within max queue size) of very short operations, moving to 
progressively larger batches
+        // (beyond max queued size), and longer operations
+        for (float multiplier = 0f ; multiplier < 2.01f ; )
+        {
+            if (System.nanoTime() > until)
+            {
+                System.out.println(String.format("Completed %.0fK batches with 
%.1fM events", runs * 0.001f, events * 0.000001f));
+                events = 0;
+                until = System.nanoTime() + intervalNanos;
+                multiplier += loadIncrement;
+                System.out.println(String.format("Running for %ds with load 
multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
+            }
+
+            // wait a random amount of time so we submit new tasks in various 
stages of
+            long timeout;
+            if (pending.isEmpty()) timeout = 0;
+            else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
+            else if (pending.size() == executorCount) timeout = 
pending.first().timeout;
+            else timeout = (long) (Math.random() * pending.last().timeout);
+
+            while (!pending.isEmpty() && timeout > System.nanoTime())
+            {
+                Batch first = pending.first();
+                boolean complete = false;
+                try
+                {
+                    for (Result result : first.results.descendingSet())
+                        result.future.get(timeout - System.nanoTime(), 
TimeUnit.NANOSECONDS);
+                    complete = true;
+                }
+                catch (TimeoutException e)
+                {
+                }
+                if (!complete && System.nanoTime() > first.timeout)
+                {
+                    for (Result result : first.results)
+                        if (!result.future.isDone())
+                            throw new AssertionError();
+                    complete = true;
+                }
+                if (complete)
+                {
+                    pending.pollFirst();
+                    executorsWithWork.clear(first.executorIndex);
+                }
+            }
+
+            // if we've emptied the executors, give all our threads an 
opportunity to spin down
+            if (timeout == Long.MAX_VALUE)
+                Uninterruptibles.sleepUninterruptibly(10, 
TimeUnit.MILLISECONDS);
+
+            // submit a random batch to the first free executor service
+            int executorIndex = executorsWithWork.nextClearBit(0);
+            if (executorIndex >= executorCount)
+                continue;
+            executorsWithWork.set(executorIndex);
+            ExecutorService executor = executors[executorIndex];
+            TreeSet<Result> results = new TreeSet<>();
+            int count = (int) (workCount[executorIndex].sample() * multiplier);
+            long targetTotalElapsed = 0;
+            long start = System.nanoTime();
+            long baseTime;
+            if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() 
* multiplier);
+            else  baseTime = 0;
+            for (int j = 0 ; j < count ; j++)
+            {
+                long time;
+                if (baseTime == 0) time = (long) (workTime.sample() * 
multiplier);
+                else time = (long) (baseTime * Math.random());
+                if (time < minWorkTime)
+                    time = minWorkTime;
+                if (time > maxWorkTime)
+                    time = maxWorkTime;
+                targetTotalElapsed += time;
+                Future<?> future = executor.submit(new WaitTask(time));
+                results.add(new Result(future, System.nanoTime() + time));
+            }
+            long end = start + (long) Math.ceil(targetTotalElapsed / (double) 
threadCounts[executorIndex])
+                       + TimeUnit.MILLISECONDS.toNanos(100L);
+            long now = System.nanoTime();
+            if (runs++ > executorCount && now > end)
+                throw new AssertionError();
+            events += results.size();
+            pending.add(new Batch(results, end, executorIndex));
+//            System.out.println(String.format("Submitted batch to executor %d 
with %d items and %d permitted millis", executorIndex, count, 
TimeUnit.NANOSECONDS.toMillis(end - start)));
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException, 
ExecutionException
+    {
+        // do longer test
+        new 
LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L),
 0.1f);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java 
b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
new file mode 100644
index 0000000..9641930
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+// TODO : should probably lower fan-factor for tests to make them more 
intensive
+public class LongBTreeTest
+{
+
+    private static final MetricRegistry metrics = new MetricRegistry();
+    private static final Timer BTREE_TIMER = 
metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
+    private static final Timer TREE_TIMER = 
metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
+    private static final ExecutorService MODIFY = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("MODIFY"));
+    private static final ExecutorService COMPARE = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("COMPARE"));
+    private static final RandomAbort<Integer> SPORADIC_ABORT = new 
RandomAbort<>(new Random(), 0.0001f);
+
+    static
+    {
+        System.setProperty("cassandra.btree.fanfactor", "4");
+    }
+
+    @Test
+    public void testOversizedMiddleInsert()
+    {
+        TreeSet<Integer> canon = new TreeSet<>();
+        for (int i = 0 ; i < 10000000 ; i++)
+            canon.add(i);
+        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, 
Integer.MAX_VALUE), ICMP, true, null);
+        btree = BTree.update(btree, ICMP, canon, true);
+        canon.add(Integer.MIN_VALUE);
+        canon.add(Integer.MAX_VALUE);
+        Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
+        testEqual("Oversize", BTree.<Integer>slice(btree, true), 
canon.iterator());
+    }
+
+    @Test
+    public void testIndividualInsertsSmallOverlappingRange() throws 
ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 50, 1, 1, true);
+    }
+
+    @Test
+    public void testBatchesSmallOverlappingRange() throws ExecutionException, 
InterruptedException
+    {
+        testInsertions(10000000, 50, 1, 5, true);
+    }
+
+    @Test
+    public void testIndividualInsertsMediumSparseRange() throws 
ExecutionException, InterruptedException
+    {
+        testInsertions(10000000, 500, 10, 1, true);
+    }
+
+    @Test
+    public void testBatchesMediumSparseRange() throws ExecutionException, 
InterruptedException
+    {
+        testInsertions(10000000, 500, 10, 10, true);
+    }
+
+    @Test
+    public void testLargeBatchesLargeRange() throws ExecutionException, 
InterruptedException
+    {
+        testInsertions(100000000, 5000, 3, 100, true);
+    }
+
+    @Test
+    public void testSlicingSmallRandomTrees() throws ExecutionException, 
InterruptedException
+    {
+        testInsertions(10000, 50, 10, 10, false);
+    }
+
+    @Test
+    public void testSearchIterator() throws InterruptedException
+    {
+        int threads = Runtime.getRuntime().availableProcessors();
+        final CountDownLatch latch = new CountDownLatch(threads);
+        final AtomicLong errors = new AtomicLong();
+        final AtomicLong count = new AtomicLong();
+        final int perThreadTrees = 100;
+        final int perTreeSelections = 100;
+        final long totalCount = threads * perThreadTrees * perTreeSelections;
+        for (int t = 0 ; t < threads ; t++)
+        {
+            MODIFY.execute(new Runnable()
+            {
+                public void run()
+                {
+                    ThreadLocalRandom random = ThreadLocalRandom.current();
+                    for (int i = 0 ; i < perThreadTrees ; i++)
+                    {
+                        Object[] tree = randomTree(10000, random);
+                        for (int j = 0 ; j < perTreeSelections ; j++)
+                        {
+                            BTreeSearchIterator<Integer, Integer, Integer> 
searchIterator = new BTreeSearchIterator<>(tree, ICMP);
+                            for (Integer key : randomSelection(tree, random))
+                                if (key != searchIterator.next(key))
+                                    errors.incrementAndGet();
+                            searchIterator = new BTreeSearchIterator<Integer, 
Integer, Integer>(tree, ICMP);
+                            for (Integer key : randomMix(tree, random))
+                                if (key != searchIterator.next(key))
+                                    if (BTree.find(tree, ICMP, key) == key)
+                                        errors.incrementAndGet();
+                            count.incrementAndGet();
+                        }
+                    }
+                    latch.countDown();
+                }
+            });
+        }
+        while (latch.getCount() > 0)
+        {
+            latch.await(10L, TimeUnit.SECONDS);
+            System.out.println(String.format("%.0f%% complete %s", 100 * 
count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + 
errors.get()) : ""));
+            assert errors.get() == 0;
+        }
+    }
+
+    private static void testInsertions(int totalCount, int perTestCount, int 
testKeyRatio, int modificationBatchSize, boolean quickEquality) throws 
ExecutionException, InterruptedException
+    {
+        int batchesPerTest = perTestCount / modificationBatchSize;
+        int maximumRunLength = 100;
+        int testKeyRange = perTestCount * testKeyRatio;
+        int tests = totalCount / perTestCount;
+        System.out.println(String.format("Performing %d tests of %d 
operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+                tests, perTestCount, 1 / (float) testKeyRatio, 
modificationBatchSize));
+
+        // if we're not doing quick-equality, we can spam with garbage for all 
the checks we perform, so we'll split the work into smaller chunks
+        int chunkSize = quickEquality ? tests : (int) (100000 / 
Math.pow(perTestCount, 2));
+        for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
+        {
+            final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer 
= new ArrayList<>();
+            for (int i = 0 ; i < chunkSize ; i++)
+            {
+                outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, 
modificationBatchSize, batchesPerTest, quickEquality));
+            }
+
+            final List<ListenableFuture<?>> inner = new ArrayList<>();
+            int complete = 0;
+            int reportInterval = totalCount / 100;
+            int lastReportAt = 0;
+            for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
+            {
+                inner.addAll(f.get());
+                complete += perTestCount;
+                if (complete - lastReportAt >= reportInterval)
+                {
+                    System.out.println(String.format("Completed %d of %d 
operations", (chunk * perTestCount) + complete, totalCount));
+                    lastReportAt = complete;
+                }
+            }
+            Futures.allAsList(inner).get();
+        }
+        Snapshot snap = BTREE_TIMER.getSnapshot();
+        System.out.println(String.format("btree   : %.2fns, %.2fns, %.2fns", 
snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+        snap = TREE_TIMER.getSnapshot();
+        System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", 
snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+        System.out.println("Done");
+    }
+
+    private static ListenableFutureTask<List<ListenableFuture<?>>> 
doOneTestInsertions(final int upperBound, final int maxRunLength, final int 
averageModsPerIteration, final int iterations, final boolean quickEquality)
+    {
+        ListenableFutureTask<List<ListenableFuture<?>>> f = 
ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
+        {
+            @Override
+            public List<ListenableFuture<?>> call()
+            {
+                final List<ListenableFuture<?>> r = new ArrayList<>();
+                NavigableMap<Integer, Integer> canon = new TreeMap<>();
+                Object[] btree = BTree.empty();
+                final TreeMap<Integer, Integer> buffer = new TreeMap<>();
+                final Random rnd = new Random();
+                for (int i = 0 ; i < iterations ; i++)
+                {
+                    buffer.clear();
+                    int mods = (averageModsPerIteration >> 1) + 1 + 
rnd.nextInt(averageModsPerIteration);
+                    while (mods > 0)
+                    {
+                        int v = rnd.nextInt(upperBound);
+                        int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
+                        int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
+                        for (int j = 0 ; j < c ; j++)
+                        {
+                            buffer.put(v, v);
+                            v++;
+                        }
+                        mods -= c;
+                    }
+                    Timer.Context ctxt;
+                    ctxt = TREE_TIMER.time();
+                    canon.putAll(buffer);
+                    ctxt.stop();
+                    ctxt = BTREE_TIMER.time();
+                    Object[] next = null;
+                    while (next == null)
+                        next = BTree.update(btree, ICMP, buffer.keySet(), 
true, SPORADIC_ABORT);
+                    btree = next;
+                    ctxt.stop();
+
+                    if (!BTree.isWellFormed(btree, ICMP))
+                    {
+                        System.out.println("ERROR: Not well formed");
+                        throw new AssertionError("Not well formed!");
+                    }
+                    if (quickEquality)
+                        testEqual("", BTree.<Integer>slice(btree, true), 
canon.keySet().iterator());
+                    else
+                        r.addAll(testAllSlices("RND", btree, new 
TreeSet<>(canon.keySet())));
+                }
+                return r;
+            }
+        });
+        MODIFY.execute(f);
+        return f;
+    }
+
+    @Test
+    public void testSlicingAllSmallTrees() throws ExecutionException, 
InterruptedException
+    {
+        Object[] cur = BTree.empty();
+        TreeSet<Integer> canon = new TreeSet<>();
+        // we set FAN_FACTOR to 4, so 128 items is four levels deep, three 
fully populated
+        for (int i = 0 ; i < 128 ; i++)
+        {
+            String id = String.format("[0..%d)", canon.size());
+            System.out.println("Testing " + id);
+            Futures.allAsList(testAllSlices(id, cur, canon)).get();
+            Object[] next = null;
+            while (next == null)
+                next = BTree.update(cur, ICMP, Arrays.asList(i), true, 
SPORADIC_ABORT);
+            cur = next;
+            canon.add(i);
+        }
+    }
+
+    static final Comparator<Integer> ICMP = new Comparator<Integer>()
+    {
+        @Override
+        public int compare(Integer o1, Integer o2)
+        {
+            return Integer.compare(o1, o2);
+        }
+    };
+
+    private static List<ListenableFuture<?>> testAllSlices(String id, Object[] 
btree, NavigableSet<Integer> canon)
+    {
+        List<ListenableFuture<?>> waitFor = new ArrayList<>();
+        testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, 
waitFor);
+        testAllSlices(id + " DSC", new BTreeSet<>(btree, 
ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
+        return waitFor;
+    }
+
+    private static void testAllSlices(String id, NavigableSet<Integer> btree, 
NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> 
results)
+    {
+        testOneSlice(id, btree, canon, results);
+        for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
+        {
+            // test head/tail sets
+            testOneSlice(String.format("%s->[%d..)", id, lb), 
btree.headSet(lb, true), canon.headSet(lb, true), results);
+            testOneSlice(String.format("%s->(%d..)", id, lb), 
btree.headSet(lb, false), canon.headSet(lb, false), results);
+            testOneSlice(String.format("%s->(..%d]", id, lb), 
btree.tailSet(lb, true), canon.tailSet(lb, true), results);
+            testOneSlice(String.format("%s->(..%d]", id, lb), 
btree.tailSet(lb, false), canon.tailSet(lb, false), results);
+            for (Integer ub : range(canon.size(), lb, ascending))
+            {
+                // test subsets
+                testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), 
btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
+                testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), 
btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
+                testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), 
btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
+                testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), 
btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), 
results);
+            }
+        }
+    }
+
+    private static void testOneSlice(final String id, final 
NavigableSet<Integer> test, final NavigableSet<Integer> canon, 
List<ListenableFuture<?>> results)
+    {
+        ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
+        {
+
+            @Override
+            public void run()
+            {
+                test(id + " Count", test.size(), canon.size());
+                testEqual(id, test.iterator(), canon.iterator());
+                testEqual(id + "->DSCI", test.descendingIterator(), 
canon.descendingIterator());
+                testEqual(id + "->DSCS", test.descendingSet().iterator(), 
canon.descendingSet().iterator());
+                testEqual(id + "->DSCS->DSCI", 
test.descendingSet().descendingIterator(), 
canon.descendingSet().descendingIterator());
+            }
+        }, null);
+        results.add(f);
+        COMPARE.execute(f);
+    }
+
+    private static void test(String id, int test, int expect)
+    {
+        if (test != expect)
+        {
+            System.out.println(String.format("%s: Expected %d, Got %d", id, 
expect, test));
+        }
+    }
+
+    private static <V> void testEqual(String id, Iterator<V> btree, 
Iterator<V> canon)
+    {
+        boolean equal = true;
+        while (btree.hasNext() && canon.hasNext())
+        {
+            Object i = btree.next();
+            Object j = canon.next();
+            if (!i.equals(j))
+            {
+                System.out.println(String.format("%s: Expected %d, Got %d", 
id, j, i));
+                equal = false;
+            }
+        }
+        while (btree.hasNext())
+        {
+            System.out.println(String.format("%s: Expected <Nil>, Got %d", id, 
btree.next()));
+            equal = false;
+        }
+        while (canon.hasNext())
+        {
+            System.out.println(String.format("%s: Expected %d, Got Nil", id, 
canon.next()));
+            equal = false;
+        }
+        if (!equal)
+            throw new AssertionError("Not equal");
+    }
+
+    // should only be called on sets that range from 0->N or N->0
+    private static final Iterable<Integer> range(final int size, final int 
from, final boolean ascending)
+    {
+        return new Iterable<Integer>()
+        {
+            int cur;
+            int delta;
+            int end;
+            {
+                if (ascending)
+                {
+                    end = size + 1;
+                    cur = from == Integer.MIN_VALUE ? -1 : from;
+                    delta = 1;
+                }
+                else
+                {
+                    end = -2;
+                    cur = from == Integer.MIN_VALUE ? size : from;
+                    delta = -1;
+                }
+            }
+            @Override
+            public Iterator<Integer> iterator()
+            {
+                return new Iterator<Integer>()
+                {
+                    @Override
+                    public boolean hasNext()
+                    {
+                        return cur != end;
+                    }
+
+                    @Override
+                    public Integer next()
+                    {
+                        Integer r = cur;
+                        cur += delta;
+                        return r;
+                    }
+
+                    @Override
+                    public void remove()
+                    {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+
+    private static Object[] randomTree(int maxSize, Random random)
+    {
+        TreeSet<Integer> build = new TreeSet<>();
+        int size = random.nextInt(maxSize);
+        for (int i = 0 ; i < size ; i++)
+        {
+            build.add(random.nextInt());
+        }
+        return BTree.build(build, ICMP, true, 
UpdateFunction.NoOp.<Integer>instance());
+    }
+
+    private static Iterable<Integer> randomSelection(Object[] iter, final 
Random rnd)
+    {
+        final float proportion = rnd.nextFloat();
+        return Iterables.filter(new BTreeSet<>(iter, ICMP), new 
Predicate<Integer>()
+        {
+            public boolean apply(Integer integer)
+            {
+                return rnd.nextFloat() < proportion;
+            }
+        });
+    }
+
+    private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
+    {
+        final float proportion = rnd.nextFloat();
+        return Iterables.transform(new BTreeSet<>(iter, ICMP), new 
Function<Integer, Integer>()
+        {
+            long last = Integer.MIN_VALUE;
+
+            public Integer apply(Integer v)
+            {
+                long last = this.last;
+                this.last = v;
+                if (rnd.nextFloat() < proportion)
+                    return v;
+                return (int)((v - last) / 2);
+            }
+        });
+    }
+
+    private static final class RandomAbort<V> implements UpdateFunction<V>
+    {
+        final Random rnd;
+        final float chance;
+        private RandomAbort(Random rnd, float chance)
+        {
+            this.rnd = rnd;
+            this.chance = chance;
+        }
+
+        public V apply(V replacing, V update)
+        {
+            return update;
+        }
+
+        public boolean abortEarly()
+        {
+            return rnd.nextFloat() < chance;
+        }
+
+        public void allocated(long heapSize)
+        {
+
+        }
+
+        public V apply(V v)
+        {
+            return v;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java 
b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
deleted file mode 100644
index d7105df..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.cassandra.concurrent;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import static org.junit.Assert.assertTrue;
-
-// TODO: we don't currently test SAFE functionality at all!
-// TODO: should also test markBlocking and SyncOrdered
-public class LongOpOrderTest
-{
-
-    private static final Logger logger = 
LoggerFactory.getLogger(LongOpOrderTest.class);
-
-    static final int CONSUMERS = 4;
-    static final int PRODUCERS = 32;
-
-    static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
-    static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
-
-    static final Thread.UncaughtExceptionHandler handler = new 
Thread.UncaughtExceptionHandler()
-    {
-        @Override
-        public void uncaughtException(Thread t, Throwable e)
-        {
-            System.err.println(t.getName() + ": " + e.getMessage());
-            e.printStackTrace();
-        }
-    };
-
-    final OpOrder order = new OpOrder();
-    final AtomicInteger errors = new AtomicInteger();
-
-    class TestOrdering implements Runnable
-    {
-
-        final int[] waitNanos = new int[1 << 16];
-        volatile State state = new State();
-        final ScheduledExecutorService sched;
-
-        TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
-        {
-            this.sched = sched;
-            final ThreadLocalRandom rnd = ThreadLocalRandom.current();
-            for (int i = 0 ; i < waitNanos.length ; i++)
-                waitNanos[i] = rnd.nextInt(5000);
-            for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
-                exec.execute(new Producer());
-            exec.execute(this);
-        }
-
-        @Override
-        public void run()
-        {
-            final long until = System.currentTimeMillis() + RUNTIME;
-            long lastReport = System.currentTimeMillis();
-            long count = 0;
-            long opCount = 0;
-            while (true)
-            {
-                long now = System.currentTimeMillis();
-                if (now > until)
-                    break;
-                if (now > lastReport + REPORT_INTERVAL)
-                {
-                    lastReport = now;
-                    logger.info(String.format("%s: Executed %d barriers with 
%d operations. %.0f%% complete.",
-                            Thread.currentThread().getName(), count, opCount, 
100 * (1 - ((until - now) / (double) RUNTIME))));
-                }
-                try
-                {
-                    Thread.sleep(0, waitNanos[((int) (count & 
(waitNanos.length - 1)))]);
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-
-                final State s = state;
-                s.barrier = order.newBarrier();
-                s.replacement = new State();
-                s.barrier.issue();
-                s.barrier.await();
-                s.check();
-                opCount += s.totalCount();
-                state = s.replacement;
-                sched.schedule(new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        s.check();
-                    }
-                }, 1, TimeUnit.SECONDS);
-                count++;
-            }
-        }
-
-        class State
-        {
-
-            volatile OpOrder.Barrier barrier;
-            volatile State replacement;
-            final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new 
NonBlockingHashMap<>();
-            int checkCount = -1;
-
-            boolean accept(OpOrder.Group opGroup)
-            {
-                if (barrier != null && !barrier.isAfter(opGroup))
-                    return false;
-                AtomicInteger c;
-                if (null == (c = count.get(opGroup)))
-                {
-                    count.putIfAbsent(opGroup, new AtomicInteger());
-                    c = count.get(opGroup);
-                }
-                c.incrementAndGet();
-                return true;
-            }
-
-            int totalCount()
-            {
-                int c = 0;
-                for (AtomicInteger v : count.values())
-                    c += v.intValue();
-                return c;
-            }
-
-            void check()
-            {
-                boolean delete;
-                if (checkCount >= 0)
-                {
-                    if (checkCount != totalCount())
-                    {
-                        errors.incrementAndGet();
-                        logger.error("Received size changed after barrier 
finished: {} vs {}", checkCount, totalCount());
-                    }
-                    delete = true;
-                }
-                else
-                {
-                    checkCount = totalCount();
-                    delete = false;
-                }
-                for (Map.Entry<OpOrder.Group, AtomicInteger> e : 
count.entrySet())
-                {
-                    if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
-                    {
-                        errors.incrementAndGet();
-                        logger.error("Received an operation that was created 
after the barrier was issued.");
-                    }
-                    if (TestOrdering.this.count.get(e.getKey()).intValue() != 
e.getValue().intValue())
-                    {
-                        errors.incrementAndGet();
-                        logger.error("Missing registered operations. {} vs 
{}", TestOrdering.this.count.get(e.getKey()).intValue(), 
e.getValue().intValue());
-                    }
-                    if (delete)
-                        TestOrdering.this.count.remove(e.getKey());
-                }
-            }
-
-        }
-
-        final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new 
NonBlockingHashMap<>();
-
-        class Producer implements Runnable
-        {
-            public void run()
-            {
-                while (true)
-                {
-                    AtomicInteger c;
-                    try (OpOrder.Group opGroup = order.start())
-                    {
-                        if (null == (c = count.get(opGroup)))
-                        {
-                            count.putIfAbsent(opGroup, new AtomicInteger());
-                            c = count.get(opGroup);
-                        }
-                        c.incrementAndGet();
-                        State s = state;
-                        while (!s.accept(opGroup))
-                            s = s.replacement;
-                    }
-                }
-            }
-        }
-
-    }
-
-    @Test
-    public void testOrdering() throws InterruptedException
-    {
-        errors.set(0);
-        Thread.setDefaultUncaughtExceptionHandler(handler);
-        final ExecutorService exec = Executors.newCachedThreadPool(new 
NamedThreadFactory("checker"));
-        final ScheduledExecutorService checker = 
Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
-        for (int i = 0 ; i < CONSUMERS ; i++)
-            new TestOrdering(exec, checker);
-        exec.shutdown();
-        exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
-        assertTrue(exec.isShutdown());
-        assertTrue(errors.get() == 0);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java 
b/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
deleted file mode 100644
index 0fd53bb..0000000
--- a/test/long/org/apache/cassandra/concurrent/LongSharedExecutorPoolTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.LockSupport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.math3.distribution.WeibullDistribution;
-import org.junit.Test;
-
-public class LongSharedExecutorPoolTest
-{
-
-    private static final class WaitTask implements Runnable
-    {
-        final long nanos;
-
-        private WaitTask(long nanos)
-        {
-            this.nanos = nanos;
-        }
-
-        public void run()
-        {
-            LockSupport.parkNanos(nanos);
-        }
-    }
-
-    private static final class Result implements Comparable<Result>
-    {
-        final Future<?> future;
-        final long forecastedCompletion;
-
-        private Result(Future<?> future, long forecastedCompletion)
-        {
-            this.future = future;
-            this.forecastedCompletion = forecastedCompletion;
-        }
-
-        public int compareTo(Result that)
-        {
-            int c = Long.compare(this.forecastedCompletion, 
that.forecastedCompletion);
-            if (c != 0)
-                return c;
-            c = Integer.compare(this.hashCode(), that.hashCode());
-            if (c != 0)
-                return c;
-            return Integer.compare(this.future.hashCode(), 
that.future.hashCode());
-        }
-    }
-
-    private static final class Batch implements Comparable<Batch>
-    {
-        final TreeSet<Result> results;
-        final long timeout;
-        final int executorIndex;
-
-        private Batch(TreeSet<Result> results, long timeout, int executorIndex)
-        {
-            this.results = results;
-            this.timeout = timeout;
-            this.executorIndex = executorIndex;
-        }
-
-        public int compareTo(Batch that)
-        {
-            int c = Long.compare(this.timeout, that.timeout);
-            if (c != 0)
-                return c;
-            c = Integer.compare(this.results.size(), that.results.size());
-            if (c != 0)
-                return c;
-            return Integer.compare(this.hashCode(), that.hashCode());
-        }
-    }
-
-    @Test
-    public void testPromptnessOfExecution() throws InterruptedException, 
ExecutionException, TimeoutException
-    {
-        testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(2L), 0.5f);
-    }
-
-    private void testPromptnessOfExecution(long intervalNanos, float 
loadIncrement) throws InterruptedException, ExecutionException, TimeoutException
-    {
-        final int executorCount = 4;
-        int threadCount = 8;
-        int maxQueued = 1024;
-        final WeibullDistribution workTime = new WeibullDistribution(3, 
200000);
-        final long minWorkTime = TimeUnit.MICROSECONDS.toNanos(1);
-        final long maxWorkTime = TimeUnit.MILLISECONDS.toNanos(1);
-
-        final int[] threadCounts = new int[executorCount];
-        final WeibullDistribution[] workCount = new 
WeibullDistribution[executorCount];
-        final ExecutorService[] executors = new ExecutorService[executorCount];
-        for (int i = 0 ; i < executors.length ; i++)
-        {
-            executors[i] = 
JMXEnabledSharedExecutorPool.SHARED.newExecutor(threadCount, maxQueued, "test" 
+ i, "test" + i);
-            threadCounts[i] = threadCount;
-            workCount[i] = new WeibullDistribution(2, maxQueued);
-            threadCount *= 2;
-            maxQueued *= 2;
-        }
-
-        long runs = 0;
-        long events = 0;
-        final TreeSet<Batch> pending = new TreeSet<>();
-        final BitSet executorsWithWork = new BitSet(executorCount);
-        long until = 0;
-        // basic idea is to go through different levels of load on the 
executor service; initially is all small batches
-        // (mostly within max queue size) of very short operations, moving to 
progressively larger batches
-        // (beyond max queued size), and longer operations
-        for (float multiplier = 0f ; multiplier < 2.01f ; )
-        {
-            if (System.nanoTime() > until)
-            {
-                System.out.println(String.format("Completed %.0fK batches with 
%.1fM events", runs * 0.001f, events * 0.000001f));
-                events = 0;
-                until = System.nanoTime() + intervalNanos;
-                multiplier += loadIncrement;
-                System.out.println(String.format("Running for %ds with load 
multiplier %.1f", TimeUnit.NANOSECONDS.toSeconds(intervalNanos), multiplier));
-            }
-
-            // wait a random amount of time so we submit new tasks in various 
stages of
-            long timeout;
-            if (pending.isEmpty()) timeout = 0;
-            else if (Math.random() > 0.98) timeout = Long.MAX_VALUE;
-            else if (pending.size() == executorCount) timeout = 
pending.first().timeout;
-            else timeout = (long) (Math.random() * pending.last().timeout);
-
-            while (!pending.isEmpty() && timeout > System.nanoTime())
-            {
-                Batch first = pending.first();
-                boolean complete = false;
-                try
-                {
-                    for (Result result : first.results.descendingSet())
-                        result.future.get(timeout - System.nanoTime(), 
TimeUnit.NANOSECONDS);
-                    complete = true;
-                }
-                catch (TimeoutException e)
-                {
-                }
-                if (!complete && System.nanoTime() > first.timeout)
-                {
-                    for (Result result : first.results)
-                        if (!result.future.isDone())
-                            throw new AssertionError();
-                    complete = true;
-                }
-                if (complete)
-                {
-                    pending.pollFirst();
-                    executorsWithWork.clear(first.executorIndex);
-                }
-            }
-
-            // if we've emptied the executors, give all our threads an 
opportunity to spin down
-            if (timeout == Long.MAX_VALUE)
-                Uninterruptibles.sleepUninterruptibly(10, 
TimeUnit.MILLISECONDS);
-
-            // submit a random batch to the first free executor service
-            int executorIndex = executorsWithWork.nextClearBit(0);
-            if (executorIndex >= executorCount)
-                continue;
-            executorsWithWork.set(executorIndex);
-            ExecutorService executor = executors[executorIndex];
-            TreeSet<Result> results = new TreeSet<>();
-            int count = (int) (workCount[executorIndex].sample() * multiplier);
-            long targetTotalElapsed = 0;
-            long start = System.nanoTime();
-            long baseTime;
-            if (Math.random() > 0.5) baseTime = 2 * (long) (workTime.sample() 
* multiplier);
-            else  baseTime = 0;
-            for (int j = 0 ; j < count ; j++)
-            {
-                long time;
-                if (baseTime == 0) time = (long) (workTime.sample() * 
multiplier);
-                else time = (long) (baseTime * Math.random());
-                if (time < minWorkTime)
-                    time = minWorkTime;
-                if (time > maxWorkTime)
-                    time = maxWorkTime;
-                targetTotalElapsed += time;
-                Future<?> future = executor.submit(new WaitTask(time));
-                results.add(new Result(future, System.nanoTime() + time));
-            }
-            long end = start + (long) Math.ceil(targetTotalElapsed / (double) 
threadCounts[executorIndex])
-                       + TimeUnit.MILLISECONDS.toNanos(100L);
-            long now = System.nanoTime();
-            if (runs++ > executorCount && now > end)
-                throw new AssertionError();
-            events += results.size();
-            pending.add(new Batch(results, end, executorIndex));
-//            System.out.println(String.format("Submitted batch to executor %d 
with %d items and %d permitted millis", executorIndex, count, 
TimeUnit.NANOSECONDS.toMillis(end - start)));
-        }
-    }
-
-    public static void main(String[] args) throws InterruptedException, 
ExecutionException, TimeoutException
-    {
-        // do longer test
-        new 
LongSharedExecutorPoolTest().testPromptnessOfExecution(TimeUnit.MINUTES.toNanos(10L),
 0.1f);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd4a9d18/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java 
b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 76ff2bf..0000000
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import com.yammer.metrics.stats.Snapshot;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more 
intensive
-public class LongBTreeTest
-{
-
-    private static final Timer BTREE_TIMER = Metrics.newTimer(BTree.class, 
"BTREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
-    private static final Timer TREE_TIMER = Metrics.newTimer(BTree.class, 
"TREE", TimeUnit.NANOSECONDS, TimeUnit.NANOSECONDS);
-    private static final ExecutorService MODIFY = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("MODIFY"));
-    private static final ExecutorService COMPARE = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("COMPARE"));
-    private static final RandomAbort<Integer> SPORADIC_ABORT = new 
RandomAbort<>(new Random(), 0.0001f);
-
-    static
-    {
-        System.setProperty("cassandra.btree.fanfactor", "4");
-    }
-
-    @Test
-    public void testOversizedMiddleInsert()
-    {
-        TreeSet<Integer> canon = new TreeSet<>();
-        for (int i = 0 ; i < 10000000 ; i++)
-            canon.add(i);
-        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, 
Integer.MAX_VALUE), ICMP, true, null);
-        btree = BTree.update(btree, ICMP, canon, true);
-        canon.add(Integer.MIN_VALUE);
-        canon.add(Integer.MAX_VALUE);
-        Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
-        testEqual("Oversize", BTree.<Integer>slice(btree, true), 
canon.iterator());
-    }
-
-    @Test
-    public void testIndividualInsertsSmallOverlappingRange() throws 
ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 50, 1, 1, true);
-    }
-
-    @Test
-    public void testBatchesSmallOverlappingRange() throws ExecutionException, 
InterruptedException
-    {
-        testInsertions(10000000, 50, 1, 5, true);
-    }
-
-    @Test
-    public void testIndividualInsertsMediumSparseRange() throws 
ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 500, 10, 1, true);
-    }
-
-    @Test
-    public void testBatchesMediumSparseRange() throws ExecutionException, 
InterruptedException
-    {
-        testInsertions(10000000, 500, 10, 10, true);
-    }
-
-    @Test
-    public void testLargeBatchesLargeRange() throws ExecutionException, 
InterruptedException
-    {
-        testInsertions(100000000, 5000, 3, 100, true);
-    }
-
-    @Test
-    public void testSlicingSmallRandomTrees() throws ExecutionException, 
InterruptedException
-    {
-        testInsertions(10000, 50, 10, 10, false);
-    }
-
-    private static void testInsertions(int totalCount, int perTestCount, int 
testKeyRatio, int modificationBatchSize, boolean quickEquality) throws 
ExecutionException, InterruptedException
-    {
-        int batchesPerTest = perTestCount / modificationBatchSize;
-        int maximumRunLength = 100;
-        int testKeyRange = perTestCount * testKeyRatio;
-        int tests = totalCount / perTestCount;
-        System.out.println(String.format("Performing %d tests of %d 
operations, with %.2f max size/key-range ratio in batches of ~%d ops",
-                tests, perTestCount, 1 / (float) testKeyRatio, 
modificationBatchSize));
-
-        // if we're not doing quick-equality, we can spam with garbage for all 
the checks we perform, so we'll split the work into smaller chunks
-        int chunkSize = quickEquality ? tests : (int) (100000 / 
Math.pow(perTestCount, 2));
-        for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
-        {
-            final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer 
= new ArrayList<>();
-            for (int i = 0 ; i < chunkSize ; i++)
-            {
-                outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, 
modificationBatchSize, batchesPerTest, quickEquality));
-            }
-
-            final List<ListenableFuture<?>> inner = new ArrayList<>();
-            int complete = 0;
-            int reportInterval = totalCount / 100;
-            int lastReportAt = 0;
-            for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
-            {
-                inner.addAll(f.get());
-                complete += perTestCount;
-                if (complete - lastReportAt >= reportInterval)
-                {
-                    System.out.println(String.format("Completed %d of %d 
operations", (chunk * perTestCount) + complete, totalCount));
-                    lastReportAt = complete;
-                }
-            }
-            Futures.allAsList(inner).get();
-        }
-        Snapshot snap = BTREE_TIMER.getSnapshot();
-        System.out.println(String.format("btree   : %.2fns, %.2fns, %.2fns", 
snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
-        snap = TREE_TIMER.getSnapshot();
-        System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", 
snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
-        System.out.println("Done");
-    }
-
-    private static ListenableFutureTask<List<ListenableFuture<?>>> 
doOneTestInsertions(final int upperBound, final int maxRunLength, final int 
averageModsPerIteration, final int iterations, final boolean quickEquality)
-    {
-        ListenableFutureTask<List<ListenableFuture<?>>> f = 
ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
-        {
-            @Override
-            public List<ListenableFuture<?>> call()
-            {
-                final List<ListenableFuture<?>> r = new ArrayList<>();
-                NavigableMap<Integer, Integer> canon = new TreeMap<>();
-                Object[] btree = BTree.empty();
-                final TreeMap<Integer, Integer> buffer = new TreeMap<>();
-                final Random rnd = new Random();
-                for (int i = 0 ; i < iterations ; i++)
-                {
-                    buffer.clear();
-                    int mods = (averageModsPerIteration >> 1) + 1 + 
rnd.nextInt(averageModsPerIteration);
-                    while (mods > 0)
-                    {
-                        int v = rnd.nextInt(upperBound);
-                        int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
-                        int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
-                        for (int j = 0 ; j < c ; j++)
-                        {
-                            buffer.put(v, v);
-                            v++;
-                        }
-                        mods -= c;
-                    }
-                    TimerContext ctxt;
-                    ctxt = TREE_TIMER.time();
-                    canon.putAll(buffer);
-                    ctxt.stop();
-                    ctxt = BTREE_TIMER.time();
-                    Object[] next = null;
-                    while (next == null)
-                        next = BTree.update(btree, ICMP, buffer.keySet(), 
true, SPORADIC_ABORT);
-                    btree = next;
-                    ctxt.stop();
-
-                    if (!BTree.isWellFormed(btree, ICMP))
-                    {
-                        System.out.println("ERROR: Not well formed");
-                        throw new AssertionError("Not well formed!");
-                    }
-                    if (quickEquality)
-                        testEqual("", BTree.<Integer>slice(btree, true), 
canon.keySet().iterator());
-                    else
-                        r.addAll(testAllSlices("RND", btree, new 
TreeSet<>(canon.keySet())));
-                }
-                return r;
-            }
-        });
-        MODIFY.execute(f);
-        return f;
-    }
-
-    @Test
-    public void testSlicingAllSmallTrees() throws ExecutionException, 
InterruptedException
-    {
-        Object[] cur = BTree.empty();
-        TreeSet<Integer> canon = new TreeSet<>();
-        // we set FAN_FACTOR to 4, so 128 items is four levels deep, three 
fully populated
-        for (int i = 0 ; i < 128 ; i++)
-        {
-            String id = String.format("[0..%d)", canon.size());
-            System.out.println("Testing " + id);
-            Futures.allAsList(testAllSlices(id, cur, canon)).get();
-            Object[] next = null;
-            while (next == null)
-                next = BTree.update(cur, ICMP, Arrays.asList(i), true, 
SPORADIC_ABORT);
-            cur = next;
-            canon.add(i);
-        }
-    }
-
-    static final Comparator<Integer> ICMP = new Comparator<Integer>()
-    {
-        @Override
-        public int compare(Integer o1, Integer o2)
-        {
-            return Integer.compare(o1, o2);
-        }
-    };
-
-    private static List<ListenableFuture<?>> testAllSlices(String id, Object[] 
btree, NavigableSet<Integer> canon)
-    {
-        List<ListenableFuture<?>> waitFor = new ArrayList<>();
-        testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, 
waitFor);
-        testAllSlices(id + " DSC", new BTreeSet<>(btree, 
ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
-        return waitFor;
-    }
-
-    private static void testAllSlices(String id, NavigableSet<Integer> btree, 
NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> 
results)
-    {
-        testOneSlice(id, btree, canon, results);
-        for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
-        {
-            // test head/tail sets
-            testOneSlice(String.format("%s->[%d..)", id, lb), 
btree.headSet(lb, true), canon.headSet(lb, true), results);
-            testOneSlice(String.format("%s->(%d..)", id, lb), 
btree.headSet(lb, false), canon.headSet(lb, false), results);
-            testOneSlice(String.format("%s->(..%d]", id, lb), 
btree.tailSet(lb, true), canon.tailSet(lb, true), results);
-            testOneSlice(String.format("%s->(..%d]", id, lb), 
btree.tailSet(lb, false), canon.tailSet(lb, false), results);
-            for (Integer ub : range(canon.size(), lb, ascending))
-            {
-                // test subsets
-                testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), 
btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
-                testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), 
btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
-                testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), 
btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
-                testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), 
btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), 
results);
-            }
-        }
-    }
-
-    private static void testOneSlice(final String id, final 
NavigableSet<Integer> test, final NavigableSet<Integer> canon, 
List<ListenableFuture<?>> results)
-    {
-        ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
-        {
-
-            @Override
-            public void run()
-            {
-                test(id + " Count", test.size(), canon.size());
-                testEqual(id, test.iterator(), canon.iterator());
-                testEqual(id + "->DSCI", test.descendingIterator(), 
canon.descendingIterator());
-                testEqual(id + "->DSCS", test.descendingSet().iterator(), 
canon.descendingSet().iterator());
-                testEqual(id + "->DSCS->DSCI", 
test.descendingSet().descendingIterator(), 
canon.descendingSet().descendingIterator());
-            }
-        }, null);
-        results.add(f);
-        COMPARE.execute(f);
-    }
-
-    private static void test(String id, int test, int expect)
-    {
-        if (test != expect)
-        {
-            System.out.println(String.format("%s: Expected %d, Got %d", id, 
expect, test));
-        }
-    }
-
-    private static <V> void testEqual(String id, Iterator<V> btree, 
Iterator<V> canon)
-    {
-        boolean equal = true;
-        while (btree.hasNext() && canon.hasNext())
-        {
-            Object i = btree.next();
-            Object j = canon.next();
-            if (!i.equals(j))
-            {
-                System.out.println(String.format("%s: Expected %d, Got %d", 
id, j, i));
-                equal = false;
-            }
-        }
-        while (btree.hasNext())
-        {
-            System.out.println(String.format("%s: Expected <Nil>, Got %d", id, 
btree.next()));
-            equal = false;
-        }
-        while (canon.hasNext())
-        {
-            System.out.println(String.format("%s: Expected %d, Got Nil", id, 
canon.next()));
-            equal = false;
-        }
-        if (!equal)
-            throw new AssertionError("Not equal");
-    }
-
-    // should only be called on sets that range from 0->N or N->0
-    private static final Iterable<Integer> range(final int size, final int 
from, final boolean ascending)
-    {
-        return new Iterable<Integer>()
-        {
-            int cur;
-            int delta;
-            int end;
-            {
-                if (ascending)
-                {
-                    end = size + 1;
-                    cur = from == Integer.MIN_VALUE ? -1 : from;
-                    delta = 1;
-                }
-                else
-                {
-                    end = -2;
-                    cur = from == Integer.MIN_VALUE ? size : from;
-                    delta = -1;
-                }
-            }
-            @Override
-            public Iterator<Integer> iterator()
-            {
-                return new Iterator<Integer>()
-                {
-                    @Override
-                    public boolean hasNext()
-                    {
-                        return cur != end;
-                    }
-
-                    @Override
-                    public Integer next()
-                    {
-                        Integer r = cur;
-                        cur += delta;
-                        return r;
-                    }
-
-                    @Override
-                    public void remove()
-                    {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
-
-    private static final class RandomAbort<V> implements UpdateFunction<V>
-    {
-        final Random rnd;
-        final float chance;
-        private RandomAbort(Random rnd, float chance)
-        {
-            this.rnd = rnd;
-            this.chance = chance;
-        }
-
-        public V apply(V replacing, V update)
-        {
-            return update;
-        }
-
-        public boolean abortEarly()
-        {
-            return rnd.nextFloat() < chance;
-        }
-
-        public void allocated(long heapSize)
-        {
-
-        }
-
-        public V apply(V v)
-        {
-            return v;
-        }
-    }
-
-}

Reply via email to