Repository: cxf Updated Branches: refs/heads/master e54e39740 -> fcc8bcb38
Add total data read/written to metrics. Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fcc8bcb3 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fcc8bcb3 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fcc8bcb3 Branch: refs/heads/master Commit: fcc8bcb38f33febf5c73c5f5f56d877272f27e20 Parents: e54e397 Author: Daniel Kulp <[email protected]> Authored: Wed Mar 11 15:47:46 2015 -0400 Committer: Daniel Kulp <[email protected]> Committed: Wed Mar 11 15:47:46 2015 -0400 ---------------------------------------------------------------------- .../codahale/CountingInputStream.java | 76 ++++++++++++++++++++ .../codahale/CountingOutputStream.java | 49 +++++++++++++ .../apache/cxf/management/codahale/Metrics.java | 48 +++++++++++++ 3 files changed, 173 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java ---------------------------------------------------------------------- diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java new file mode 100644 index 0000000..84d691d --- /dev/null +++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.management.codahale; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +public final class CountingInputStream extends FilterInputStream { + + private long count; + private long mark = -1; + + public CountingInputStream(InputStream in) { + super(in); + } + + public long getCount() { + return count; + } + + public int read() throws IOException { + int result = in.read(); + if (result != -1) { + count++; + } + return result; + } + + public int read(byte[] b, int off, int len) throws IOException { + int result = in.read(b, off, len); + if (result != -1) { + count += result; + } + return result; + } + + public long skip(long n) throws IOException { + long result = in.skip(n); + count += result; + return result; + } + + public synchronized void mark(int readlimit) { + in.mark(readlimit); + mark = count; + } + + public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + count = mark; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java ---------------------------------------------------------------------- diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java new file mode 100644 index 0000000..bc5cffd --- /dev/null +++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.management.codahale; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +public final class CountingOutputStream extends FilterOutputStream { + private long count; + + public CountingOutputStream(OutputStream out) { + super(out); + } + + public long getCount() { + return count; + } + + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + count += len; + } + + public void write(int b) throws IOException { + out.write(b); + count++; + } + + public void close() throws IOException { + out.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java ---------------------------------------------------------------------- diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java index 7bb5517..05830e1 100644 --- a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java +++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java @@ -22,6 +22,8 @@ package org.apache.cxf.management.codahale; import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.concurrent.TimeUnit; import javax.management.MalformedObjectNameException; @@ -29,6 +31,7 @@ import javax.management.ObjectName; import com.codahale.metrics.Counter; import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ObjectNameFactory; import com.codahale.metrics.Timer; @@ -36,6 +39,7 @@ import com.codahale.metrics.Timer; import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.interceptor.AttachmentInInterceptor; +import org.apache.cxf.interceptor.AttachmentOutInterceptor; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.MessageSenderInterceptor; import org.apache.cxf.interceptor.ServiceInvokerInterceptor; @@ -95,11 +99,14 @@ public class Metrics { ResponseTimeMessageInInterceptor in = new ResponseTimeMessageInInterceptor(); ResponseTimeMessageInOneWayInterceptor oneway = new ResponseTimeMessageInOneWayInterceptor(); ResponseTimeMessageOutInterceptor out = new ResponseTimeMessageOutInterceptor(); + CountingOutInterceptor countingOut = new CountingOutInterceptor(); //ResponseTimeMessageInvokerInterceptor invoker = new ResponseTimeMessageInvokerInterceptor(); bus.getInInterceptors().add(in); bus.getInInterceptors().add(oneway); + bus.getOutInterceptors().add(countingOut); bus.getOutInterceptors().add(out); + bus.getOutFaultInterceptors().add(countingOut); bus.getOutFaultInterceptors().add(out); //bus.setExtension(this, CounterRepository.class); @@ -115,6 +122,8 @@ public class Metrics { Timer checkedApplicationFaults; Timer runtimeFaults; Timer logicalRuntimeFaults; + Meter incomingData; + Meter outgoingData; Context start() { inFlight.inc(); @@ -209,6 +218,8 @@ public class Metrics { ti.runtimeFaults = registry.timer(baseName + "Attribute=Runtime Faults"); ti.logicalRuntimeFaults = registry.timer(baseName + "Attribute=Logical Runtime Faults"); ti.inFlight = registry.counter(baseName + "Attribute=In Flight"); + ti.incomingData = registry.meter(baseName + "Attribute=Data Read"); + ti.outgoingData = registry.meter(baseName + "Attribute=Data Written"); endpoint.put(TimerInfo.class.getName(), ti); endpoint.addCleanupHook(new Closeable() { public void close() throws IOException { @@ -219,6 +230,8 @@ public class Metrics { registry.remove(baseName + "Attribute=Runtime Faults"); registry.remove(baseName + "Attribute=Logical Runtime Faults"); registry.remove(baseName + "Attribute=In Flight"); + registry.remove(baseName + "Attribute=Data Read"); + registry.remove(baseName + "Attribute=Data Written"); endpoint.remove(TimerInfo.class.getName()); System.out.println(endpoint.getBinding().getBindingInfo().getOperations()); for (BindingOperationInfo boi : endpoint.getBinding().getBindingInfo().getOperations()) { @@ -264,6 +277,15 @@ public class Metrics { BindingOperationInfo bi = m.getExchange().getBindingOperationInfo(); FaultMode fm = m.getExchange().get(FaultMode.class); TimerInfo op = null; + CountingInputStream in = m.getExchange().get(CountingInputStream.class); + if (in != null) { + ctx.info.incomingData.mark(in.getCount()); + } + CountingOutputStream out = m.getExchange().get(CountingOutputStream.class); + if (out != null) { + ctx.info.outgoingData.mark(out.getCount()); + } + if (bi != null) { op = getTimerInfo(m, bi); op.totals.update(l, TimeUnit.NANOSECONDS); @@ -300,6 +322,12 @@ public class Metrics { } else { TimerInfo.Context ctx = ti.start(); message.getExchange().put(TimerInfo.Context.class, ctx); + InputStream in = message.getContent(InputStream.class); + if (in != null) { + CountingInputStream newIn = new CountingInputStream(in); + message.setContent(InputStream.class, newIn); + message.getExchange().put(CountingInputStream.class, newIn); + } } } public void handleFault(Message message) { @@ -308,6 +336,26 @@ public class Metrics { } } }; + + class CountingOutInterceptor extends AbstractPhaseInterceptor<Message> { + public CountingOutInterceptor() { + super(Phase.PRE_STREAM); + addBefore(AttachmentOutInterceptor.class.getName()); + } + public void handleMessage(Message message) throws Fault { + if (isRequestor(message)) { + // + } else { + OutputStream out = message.getContent(OutputStream.class); + if (out != null) { + CountingOutputStream newOut = new CountingOutputStream(out); + message.setContent(OutputStream.class, newOut); + message.getExchange().put(CountingOutputStream.class, newOut); + } + + } + } + }; class ResponseTimeMessageOutInterceptor extends AbstractPhaseInterceptor<Message> { public ResponseTimeMessageOutInterceptor() {
