[CALCITE-1119] Additional metrics instrumentation for request processing
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/aecefef8 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/aecefef8 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/aecefef8 Branch: refs/heads/master Commit: aecefef8a4420e780737a8f49b72fd146679f2e2 Parents: 1d3a26d Author: Josh Elser <[email protected]> Authored: Wed Mar 2 17:43:46 2016 -0500 Committer: Josh Elser <[email protected]> Committed: Wed Mar 2 23:26:08 2016 -0500 ---------------------------------------------------------------------- .../calcite/avatica/remote/LocalService.java | 139 ++++++++++++------- 1 file changed, 89 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/aecefef8/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java index 11d15c9..c070ec0 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java @@ -17,25 +17,54 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.Meta; + import org.apache.calcite.avatica.MetaImpl; import org.apache.calcite.avatica.MissingResultsException; import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import static org.apache.calcite.avatica.remote.MetricsHelper.concat; + /** * Implementation of {@link Service} that talks to a local {@link Meta}. */ public class LocalService implements Service { final Meta meta; + final MetricsSystem metrics; + + private final Timer executeTimer; + private final Timer commitTimer; + private final Timer prepareTimer; + private final Timer prepareAndExecuteTimer; + private final Timer connectionSyncTimer; private RpcMetadataResponse serverLevelRpcMetadata; public LocalService(Meta meta) { + this(meta, NoopMetricsSystem.getInstance()); + } + + public LocalService(Meta meta, MetricsSystem metrics) { this.meta = meta; + this.metrics = Objects.requireNonNull(metrics); + + this.executeTimer = this.metrics.getTimer(name("Execute")); + this.commitTimer = this.metrics.getTimer(name("Commit")); + this.prepareTimer = this.metrics.getTimer(name("Prepare")); + this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute")); + this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync")); + } + + private static String name(String timer) { + return concat(LocalService.class, timer); } @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) { @@ -172,42 +201,46 @@ public class LocalService implements Service { } public PrepareResponse apply(PrepareRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.StatementHandle h = - meta.prepare(ch, request.sql, request.maxRowCount); - return new PrepareResponse(h, serverLevelRpcMetadata); + try (final Context ctx = prepareTimer.start()) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.StatementHandle h = + meta.prepare(ch, request.sql, request.maxRowCount); + return new PrepareResponse(h, serverLevelRpcMetadata); + } } public ExecuteResponse apply(PrepareAndExecuteRequest request) { - final Meta.StatementHandle sh = - new Meta.StatementHandle(request.connectionId, request.statementId, null); - try { - final Meta.ExecuteResult executeResult = - meta.prepareAndExecute(sh, request.sql, request.maxRowCount, - new Meta.PrepareCallback() { - @Override public Object getMonitor() { - return LocalService.class; - } - - @Override public void clear() { - } - - @Override public void assign(Meta.Signature signature, - Meta.Frame firstFrame, long updateCount) { - } - - @Override public void execute() { - } - }); - final List<ResultSetResponse> results = new ArrayList<>(); - for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { - results.add(toResponse(metaResultSet)); + try (final Context ctx = prepareAndExecuteTimer.start()) { + final Meta.StatementHandle sh = + new Meta.StatementHandle(request.connectionId, request.statementId, null); + try { + final Meta.ExecuteResult executeResult = + meta.prepareAndExecute(sh, request.sql, request.maxRowCount, + new Meta.PrepareCallback() { + @Override public Object getMonitor() { + return LocalService.class; + } + + @Override public void clear() { + } + + @Override public void assign(Meta.Signature signature, + Meta.Frame firstFrame, long updateCount) { + } + + @Override public void execute() { + } + }); + final List<ResultSetResponse> results = new ArrayList<>(); + for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { + results.add(toResponse(metaResultSet)); + } + return new ExecuteResponse(results, false, serverLevelRpcMetadata); + } catch (NoSuchStatementException e) { + // The Statement doesn't exist anymore, bubble up this information + return new ExecuteResponse(null, true, serverLevelRpcMetadata); } - return new ExecuteResponse(results, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - // The Statement doesn't exist anymore, bubble up this information - return new ExecuteResponse(null, true, serverLevelRpcMetadata); } } @@ -229,17 +262,19 @@ public class LocalService implements Service { } public ExecuteResponse apply(ExecuteRequest request) { - try { - final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle, - request.parameterValues, request.maxRowCount); - - final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size()); - for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { - results.add(toResponse(metaResultSet)); + try (final Context ctx = executeTimer.start()) { + try { + final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle, + request.parameterValues, request.maxRowCount); + + final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size()); + for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { + results.add(toResponse(metaResultSet)); + } + return new ExecuteResponse(results, false, serverLevelRpcMetadata); + } catch (NoSuchStatementException e) { + return new ExecuteResponse(null, true, serverLevelRpcMetadata); } - return new ExecuteResponse(results, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - return new ExecuteResponse(null, true, serverLevelRpcMetadata); } } @@ -272,11 +307,13 @@ public class LocalService implements Service { } public ConnectionSyncResponse apply(ConnectionSyncRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.ConnectionProperties connProps = - meta.connectionSync(ch, request.connProps); - return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata); + try (final Context ctx = connectionSyncTimer.start()) { + final Meta.ConnectionHandle ch = + new Meta.ConnectionHandle(request.connectionId); + final Meta.ConnectionProperties connProps = + meta.connectionSync(ch, request.connProps); + return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata); + } } public DatabasePropertyResponse apply(DatabasePropertyRequest request) { @@ -302,10 +339,12 @@ public class LocalService implements Service { } public CommitResponse apply(CommitRequest request) { - meta.commit(new Meta.ConnectionHandle(request.connectionId)); + try (final Context ctx = commitTimer.start()) { + meta.commit(new Meta.ConnectionHandle(request.connectionId)); - // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception. - return new CommitResponse(); + // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception. + return new CommitResponse(); + } } public RollbackResponse apply(RollbackRequest request) {
