Repository: cxf Updated Branches: refs/heads/3.0.x-fixes 83edbb3b9 -> 985b6be4c
[CXF-5582] adding push-over-stream to AtomPullServer Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6ac3db83 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6ac3db83 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6ac3db83 Branch: refs/heads/3.0.x-fixes Commit: 6ac3db837134c8cce0a1b8d43db7c169150616ed Parents: 83edbb3 Author: Akitoshi Yoshida <[email protected]> Authored: Thu Aug 28 11:01:09 2014 +0200 Committer: Akitoshi Yoshida <[email protected]> Committed: Tue Sep 2 11:07:03 2014 +0200 ---------------------------------------------------------------------- .../samples/logbrowser-blueprint/pom.xml | 7 +- .../resources/OSGI-INF/blueprint/context.xml | 6 +- rt/management-web/pom.xml | 3 + .../web/logging/atom/AtomPullServer.java | 138 ++++++++++++++++--- 4 files changed, 130 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/6ac3db83/distribution/src/main/release/samples/logbrowser-blueprint/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/logbrowser-blueprint/pom.xml b/distribution/src/main/release/samples/logbrowser-blueprint/pom.xml index 2395d81..2947f48 100644 --- a/distribution/src/main/release/samples/logbrowser-blueprint/pom.xml +++ b/distribution/src/main/release/samples/logbrowser-blueprint/pom.xml @@ -41,8 +41,8 @@ <Import-Package> org.apache.cxf.jaxrs.servlet, org.apache.abdera.parser.stax, - com.ctc.wstx.stax, org.apache.abdera, + com.ctc.wstx.stax, * </Import-Package> </instructions> @@ -71,6 +71,11 @@ <artifactId>cxf-rt-transports-http-jetty</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-websocket</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.abdera</groupId> <artifactId>abdera-core</artifactId> http://git-wip-us.apache.org/repos/asf/cxf/blob/6ac3db83/distribution/src/main/release/samples/logbrowser-blueprint/src/main/resources/OSGI-INF/blueprint/context.xml ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/logbrowser-blueprint/src/main/resources/OSGI-INF/blueprint/context.xml b/distribution/src/main/release/samples/logbrowser-blueprint/src/main/resources/OSGI-INF/blueprint/context.xml index 43d5530..54c9c94 100644 --- a/distribution/src/main/release/samples/logbrowser-blueprint/src/main/resources/OSGI-INF/blueprint/context.xml +++ b/distribution/src/main/release/samples/logbrowser-blueprint/src/main/resources/OSGI-INF/blueprint/context.xml @@ -37,7 +37,9 @@ <bean id="aep" class="org.apache.cxf.jaxrs.provider.atom.AtomEntryProvider" /> - <jaxrs:server id="serviceEndpoint" address="/samples/logbrowser"> + <bean id="srp" class="org.apache.cxf.jaxrs.provider.StreamingResponseProvider" /> + + <jaxrs:server id="serviceEndpoint" address="/samples/logbrowser" transportId="http://cxf.apache.org/transports/websocket"> <jaxrs:serviceBeans> <ref component-id="aps" /> <ref component-id="bs" /> @@ -48,7 +50,7 @@ <ref component-id="scp" /> <ref component-id="afp" /> <ref component-id="aep" /> + <ref component-id="srp" /> </jaxrs:providers> </jaxrs:server> - </blueprint> http://git-wip-us.apache.org/repos/asf/cxf/blob/6ac3db83/rt/management-web/pom.xml ---------------------------------------------------------------------- diff --git a/rt/management-web/pom.xml b/rt/management-web/pom.xml index 1f05112..d4cfe60 100644 --- a/rt/management-web/pom.xml +++ b/rt/management-web/pom.xml @@ -33,6 +33,9 @@ <properties> <cxf.osgi.import> !com.google.*, + org.apache.abdera.parser.stax, + org.apache.abdera, + com.ctc.wstx.stax, * </cxf.osgi.import> <gwt.version>2.6.0</gwt.version> http://git-wip-us.apache.org/repos/asf/cxf/blob/6ac3db83/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java ---------------------------------------------------------------------- diff --git a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java index e083019..d133971 100644 --- a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java +++ b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java @@ -18,12 +18,16 @@ */ package org.apache.cxf.management.web.logging.atom; +import java.io.IOException; +import java.io.OutputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,13 +38,16 @@ import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; +import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriBuilder; import org.apache.abdera.model.Entry; import org.apache.abdera.model.Feed; import org.apache.cxf.Bus; import org.apache.cxf.jaxrs.ext.MessageContext; +import org.apache.cxf.jaxrs.ext.StreamingResponse; import org.apache.cxf.jaxrs.ext.search.ConditionType; import org.apache.cxf.jaxrs.ext.search.OrSearchCondition; import org.apache.cxf.jaxrs.ext.search.PrimitiveStatement; @@ -66,7 +73,9 @@ public class AtomPullServer extends AbstractAtomBean { private volatile int recordsSize; private volatile boolean alreadyClosed; private SearchCondition<LogRecord> readableStorageCondition; - + //TODO register streams using specific keys so that they can be individually unregistered + private List<Object> activeStreams; + @Context private MessageContext context; @@ -114,9 +123,11 @@ public class AtomPullServer extends AbstractAtomBean { } readableStorageCondition = list.size() == 0 ? null : new OrSearchCondition<LogRecord>(list); } + activeStreams = Collections.synchronizedList(new ArrayList<Object>()); initBusProperty(); } + @Override protected Handler createHandler() { return new AtomPullHandler(this); @@ -193,6 +204,39 @@ public class AtomPullServer extends AbstractAtomBean { } @GET + @Produces({"text/html", "application/xhtml+xml" }) + @Path("subscribe/alternate") + public StreamingOutput getAlternateContinuousFeed() { + return new StreamingOutput() { + public void write(final OutputStream out) throws IOException, WebApplicationException { + // return the last entry + out.write(convertEntryToHtmlFragment(((LinkedList<LogRecord>)records).getLast()).getBytes()); + + activeStreams.add(out); + } + }; + } + + + @GET + @Produces("application/atom+xml;type=entry") + @Path("subscribe") + public StreamingResponse<Entry> getXmlContinuousFeed() { + return new StreamingResponse<Entry>() { + public void writeTo(final StreamingResponse.Writer<Entry> out) throws IOException { + // return the last entry + Entry entry = (Entry) new StandardConverter(StandardConverter.Output.ENTRY, + StandardConverter.Multiplicity.ONE, + StandardConverter.Format.CONTENT) + .convert(Collections.singletonList(((LinkedList<LogRecord>)records).getLast())).get(0); + out.write(entry); + + activeStreams.add(out); + } + }; + } + + @GET @Path("entry/{id}") @Produces("application/atom+xml;type=entry") public Entry getEntry(@PathParam("id") int index) { @@ -341,8 +385,50 @@ public class AtomPullServer extends AbstractAtomBean { records.add(record); ++recordsSize; } + submit(record); + } + + private void submit(final LogRecord record) { + //TODO use an executor to broadcast the record asynchronously + //TODO take the search condition in consideration to filter out those non-matching entries + if (activeStreams.size() > 0) { + byte[] rbytes = null; + Entry rentry = null; + for (Iterator<Object> it = activeStreams.iterator(); it.hasNext();) { + Object out = it.next(); + try { + if (out instanceof OutputStream) { + if (rbytes == null) { + rbytes = convertEntryToHtmlFragment(record).getBytes(); + } + ((OutputStream)out).write(rbytes); + } else if (out instanceof StreamingResponse.Writer) { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + try { + // switch TCCL as abdera uses TCCL to load various abdera classes during serialization + Thread.currentThread().setContextClassLoader(StandardConverter.class.getClassLoader()); + if (rentry == null) { + rentry = (Entry) new StandardConverter(StandardConverter.Output.ENTRY, + StandardConverter.Multiplicity.ONE, + StandardConverter.Format.CONTENT) + .convert(Collections.singletonList(record)).get(0); + } + ((StreamingResponse.Writer<Entry>)out).write(rentry); + } finally { + Thread.currentThread().setContextClassLoader(cl); + } + } + } catch (Throwable t) { + // REVISIT + // the reason for not logging anything here is to not further clog the logger + // with this log broadcasting failure. + System.err.print("ERROR | AtomPullServer | " + t + "; Unregistering " + out); + it.remove(); + } + } + } } - + public void setPageSize(int size) { pageSize = size; } @@ -369,6 +455,7 @@ public class AtomPullServer extends AbstractAtomBean { records.clear(); recordsSize = 0; feeds.clear(); + activeStreams.clear(); } // TODO : this all can be done later on in a simple xslt template @@ -387,35 +474,44 @@ public class AtomPullServer extends AbstractAtomBean { sb.append("</body></html>"); return sb.toString(); } + private String convertEntryToHtmlFragment(LogRecord r) { + StringBuilder sb = new StringBuilder(); + DateFormat df = new SimpleDateFormat("dd/MM/yy HH:mm:ss"); + addRecordToTable(sb, df, r, false); + return sb.toString(); + } private void addRecordToTable(StringBuilder sb, List<LogRecord> list, boolean forFeed) { DateFormat df = new SimpleDateFormat("dd/MM/yy HH:mm:ss"); sb.append("<table border=\"1\">"); sb.append("<tr><th>Date</th><th>Level</th><th>Logger</th><th>Message</th></tr>"); for (LogRecord lr : list) { - sb.append("<tr>"); - sb.append("<td>" + df.format(lr.getDate()) + "</td>"); - sb.append("<td>" + lr.getLevel().toString() + "</td>"); - sb.append("<td>" + lr.getLoggerName() + "</td>"); - String message = null; - if (lr.getMessage().length() > 0) { - message = lr.getThrowable().length() > 0 ? lr.getMessage() + " : " + lr.getThrowable() - : lr.getMessage(); - } else if (lr.getThrowable().length() > 0) { - message = lr.getThrowable(); - } else { - message = " "; - } - if (forFeed && lr.getThrowable().length() > 0) { - message = message.substring(0, message.length() / 2); - } - sb.append("<td>" + message + "</td>"); - sb.append("</tr>"); + addRecordToTable(sb, df, lr, forFeed); } sb.append("</table><br/><br/>"); - } + private void addRecordToTable(StringBuilder sb, DateFormat df, LogRecord lr, boolean forFeed) { + sb.append("<tr>"); + sb.append("<td>" + df.format(lr.getDate()) + "</td>"); + sb.append("<td>" + lr.getLevel().toString() + "</td>"); + sb.append("<td>" + lr.getLoggerName() + "</td>"); + String message = null; + if (lr.getMessage().length() > 0) { + message = lr.getThrowable().length() > 0 ? lr.getMessage() + " : " + lr.getThrowable() + : lr.getMessage(); + } else if (lr.getThrowable().length() > 0) { + message = lr.getThrowable(); + } else { + message = " "; + } + if (forFeed && lr.getThrowable().length() > 0) { + message = message.substring(0, message.length() / 2); + } + sb.append("<td>" + message + "</td>"); + sb.append("</tr>"); + } + private void startHtmlHeadAndBody(StringBuilder sb, String title) { sb.append("<html xmlns=\"http://www.w3.org/1999/xhtml\">"); sb.append("<head>");
