Repository: ignite Updated Branches: refs/heads/ignite-query-cont [created] 27434889d
IGNITE Added probe for benchmark. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27434889 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27434889 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27434889 Branch: refs/heads/ignite-query-cont Commit: 27434889dae7355947077d3464890a82f4847c34 Parents: 15da54b Author: nikolay_tikhonov <[email protected]> Authored: Fri Oct 30 13:05:44 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Oct 30 13:05:44 2015 +0300 ---------------------------------------------------------------------- .../yardstick/cache/CacheEntryEventProbe.java | 156 +++++++++++++++++++ 1 file changed, 156 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/27434889/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java new file mode 100644 index 0000000..e42479a --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/CacheEntryEventProbe.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriver; +import org.yardstickframework.BenchmarkProbe; +import org.yardstickframework.BenchmarkProbePoint; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.yardstickframework.BenchmarkUtils.errorHelp; +import static org.yardstickframework.BenchmarkUtils.println; + +/** + * Probe which calculate continuous query events. + */ +public class CacheEntryEventProbe implements BenchmarkProbe { + /** */ + private BenchmarkConfiguration cfg; + + /** Counter. */ + private AtomicLong cnt = new AtomicLong(0); + + /** Collected points. */ + private Collection<BenchmarkProbePoint> collected = new ArrayList<>(); + + /** Query cursor. */ + private QueryCursor qryCur; + + /** Service building probe points. */ + private ExecutorService buildingService; + + /** {@inheritDoc} */ + @Override public void start(BenchmarkDriver drv, BenchmarkConfiguration cfg) throws Exception { + this.cfg = cfg; + + if (drv instanceof IgniteCacheAbstractBenchmark) { + IgniteCacheAbstractBenchmark drv0 = (IgniteCacheAbstractBenchmark)drv; + + if (drv0.cache() != null) { + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> + events) throws CacheEntryListenerException { + int size = 0; + + for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) + ++size; + + cnt.addAndGet(size); + } + }); + + qryCur = drv0.cache().query(qry); + + buildingService = Executors.newSingleThreadExecutor(); + + buildingService.submit(new Runnable() { + @Override public void run() { + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(1000); + + long evts = cnt.getAndSet(0); + + BenchmarkProbePoint pnt = new BenchmarkProbePoint( + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), + new double[] {evts}); + + collectPoint(pnt); + } + } + catch (InterruptedException e) { + // No-op. + } + } + }); + + println(cfg, getClass().getSimpleName() + " probe is started."); + } + } + + if (qryCur == null) + errorHelp(cfg, "Can not start " + getClass().getSimpleName() + + " probe. Probably, the driver doesn't provide \"cache()\" method."); + } + + /** {@inheritDoc} */ + @Override public void stop() throws Exception { + if (qryCur != null) { + qryCur.close(); + + qryCur = null; + + buildingService.shutdownNow(); + + buildingService.awaitTermination(1, MINUTES); + + println(cfg, getClass().getSimpleName() + " is stopped."); + } + } + + /** {@inheritDoc} */ + @Override public Collection<String> metaInfo() { + return Arrays.asList("Time, sec", "Received events/sec (more is better)"); + } + + /** {@inheritDoc} */ + @Override public synchronized Collection<BenchmarkProbePoint> points() { + Collection<BenchmarkProbePoint> ret = collected; + + collected = new ArrayList<>(ret.size() + 5); + + return ret; + } + + /** {@inheritDoc} */ + @Override public void buildPoint(long time) { + // No-op. + } + + /** + * @param pnt Probe point. + */ + private synchronized void collectPoint(BenchmarkProbePoint pnt) { + collected.add(pnt); + } +}
