This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 3f7fe035dec7f44e6284af8cd8e3a7db1959c741 Author: stephen <[email protected]> AuthorDate: Fri Dec 6 09:26:44 2019 -0500 Minor modifications while profiling. No changes of consequence really. --- .../apache/tinkerpop/gremlin/driver/Client.java | 17 ++++++++------- .../apache/tinkerpop/gremlin/driver/Cluster.java | 2 +- .../gremlin/driver/ConnectionPoolImpl.java | 12 +++++++---- .../driver/util/ConfigurationEvaluator.java | 7 +++---- .../gremlin/driver/util/ProfilingApplication.java | 24 ++++++++++------------ 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index e2ea17b..987948e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -199,7 +199,8 @@ public abstract class Client { if (initialized) return this; - logger.debug("Initializing client on cluster [{}]", cluster); + if (logger.isDebugEnabled()) + logger.debug("Initializing client on cluster [{}]", cluster); cluster.init(); initializeImplementation(); @@ -337,11 +338,11 @@ public abstract class Client { // need to call buildMessage() right away to get client specific configurations, that way request specific // ones can override as needed final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL)) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, batchSize); + .addArg(Tokens.ARGS_GREMLIN, gremlin) + .addArg(Tokens.ARGS_BATCH_SIZE, batchSize); // apply settings if they were made available - options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout)); + options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout)); options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params)); options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases)); options.getOverrideRequestId().ifPresent(request::overrideRequestId); @@ -379,7 +380,9 @@ public abstract class Client { } } else { conn.write(msg, future); - logger.debug("Submitted {} to - {}", msg, conn); + + if (logger.isDebugEnabled()) + logger.debug("Submitted {} to - {}", msg, conn); } }, this.cluster.executor()); return future; @@ -589,8 +592,8 @@ public abstract class Client { .addArg(Tokens.ARGS_GREMLIN, bytecode)); // apply settings if they were made available - options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize)); - options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout)); + options.getBatchSize().ifPresent(batchSize -> request.addArg(Tokens.ARGS_BATCH_SIZE, batchSize)); + options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout)); return submitAsync(request.create()); } catch (Exception ex) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index 6d72a32..7763cb0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -923,7 +923,7 @@ public final class Cluster { new BasicThreadFactory.Builder().namingPattern("gremlin-driver-worker-%d").build()); this.executor.setRemoveOnCancelPolicy(true); - validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, builder.validationRequest); + validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, builder.validationRequest); } private void validateBuilder(final Builder builder) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java index 5f0ccae..32bf7d8 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java @@ -106,19 +106,22 @@ public class ConnectionPoolImpl implements ConnectionPool { public void channelReleased(final Channel ch) { // Note: Any operation performed here might have direct impact on the performance of the // client since, this method is called with every new request. - logger.debug("Channel released: {}", ch); + if (logger.isDebugEnabled()) + logger.debug("Channel released: {}", ch); } @Override public void channelAcquired(final Channel ch) { // Note: Any operation performed here might have direct impact on the performance of the // client since, this method is called with every new request. - logger.debug("Channel acquired: {}", ch); + if (logger.isDebugEnabled()) + logger.debug("Channel acquired: {}", ch); } @Override public void channelCreated(final Channel ch) { - logger.debug("Channel created: {}", ch); + if (logger.isDebugEnabled()) + logger.debug("Channel created: {}", ch); // Guaranteed that it is a socket channel because we set b.channel as SocketChannel final SocketChannel sch = (SocketChannel) ch; ((Channelizer.AbstractChannelizer) channelizer).initChannel(sch); @@ -127,7 +130,8 @@ public class ConnectionPoolImpl implements ConnectionPool { this.channelPool = createChannelPool(b, cluster.connectionPoolSettings(), handler); - logger.debug("Initialized {} successfully.", this); + if (logger.isDebugEnabled()) + logger.debug("Initialized {} successfully.", this); } private FixedChannelPool createChannelPool(final Bootstrap b, diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java index d145592..25affa1 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java @@ -34,21 +34,20 @@ public class ConfigurationEvaluator { private final List<Integer> workerPoolSizeRange = Arrays.asList(1,2,3,4,8,16,32); private final List<Integer> nioPoolSizeRange = Arrays.asList(1,2,4); - private final List<Integer> parallelismSizeRange = Arrays.asList(1,2,4,8,16,32); + private final List<Integer> maxConnectionPoolSizeRange = Arrays.asList(64, 128, 256, 512, 1024); public Stream<String[]> generate(final String [] args) throws Exception { final Set<String> configsTried = new HashSet<>(); // get ready for the some serious brute-force action here for (int ir = 0; ir < nioPoolSizeRange.size(); ir++) { - for (int is = 0; is < parallelismSizeRange.size(); is++) { + for (int is = 0; is < maxConnectionPoolSizeRange.size(); is++) { for (int it = 0; it < workerPoolSizeRange.size(); it++) { final String s = String.join(",", String.valueOf(ir), String.valueOf(is), String.valueOf(it)); if (!configsTried.contains(s)) { final Object[] argsToProfiler = Stream.of("nioPoolSize", nioPoolSizeRange.get(ir).toString(), - "parallelism", parallelismSizeRange.get(is).toString(), - "maxConnectionPoolSize", "15000", + "maxConnectionPoolSize", maxConnectionPoolSizeRange.get(is).toString(), "workerPoolSize", workerPoolSizeRange.get(it).toString(), "noExit", Boolean.TRUE.toString()).toArray(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java index bf675e5..ed9b6d7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; /** * An internal application used to test out configuration parameters for Gremlin Driver against Gremlin Server. @@ -96,22 +95,21 @@ public class ProfilingApplication { try { final CountDownLatch latch = new CountDownLatch(requests); final AtomicInteger inFlight = new AtomicInteger(0); + final AtomicInteger count = new AtomicInteger(0); client.init(); final long start = System.nanoTime(); - IntStream.range(0, requests).forEach(i -> { - final String s = exercise ? chooseScript() : script; + while (count.get() < requests) { // control number of requests in flight for testing purposes or else we'll timeout without enough // connections to service all this stuff. - while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) { - try { - TimeUnit.MILLISECONDS.sleep(10L); - } catch (Exception ex) { - // wait wait wait - } + if (inFlight.intValue() >= cluster.maxConnectionPoolSize()) { + continue; } + final String s = exercise ? chooseScript() : script; + + count.incrementAndGet(); inFlight.incrementAndGet(); client.submitAsync(s).thenAcceptAsync(r -> { try { @@ -125,7 +123,7 @@ public class ProfilingApplication { latch.countDown(); } }, executor); - }); + } // finish once all requests are accounted for latch.await(); @@ -152,9 +150,9 @@ public class ProfilingApplication { public static void main(final String[] args) { final Map<String,Object> options = ElementHelper.asMap(args); final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString()); - final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString()); + final int profilerPoolSize = Integer.parseInt(options.getOrDefault("profiler-pool", "2").toString()); final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build(); - final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory); + final ExecutorService executor = Executors.newFixedThreadPool(profilerPoolSize, threadFactory); final String host = options.getOrDefault("host", "localhost").toString(); final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "200").toString()); @@ -237,7 +235,7 @@ public class ProfilingApplication { System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond)); if (f != null) { try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) { - writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond))); + writer.println(String.join("\t", String.valueOf(profilerPoolSize), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond))); } }
