Repository: cxf Updated Branches: refs/heads/master ec4435d70 -> 250dc87fc
[CXF-5582] add StreamingResponse-based delivery to AtomPushBean Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/250dc87f Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/250dc87f Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/250dc87f Branch: refs/heads/master Commit: 250dc87fc7e0b0f6caffb5d7500feac9db048bbb Parents: ec4435d Author: Akitoshi Yoshida <[email protected]> Authored: Mon Sep 1 19:05:54 2014 +0200 Committer: Akitoshi Yoshida <[email protected]> Committed: Mon Sep 1 19:07:01 2014 +0200 ---------------------------------------------------------------------- .../web/logging/atom/AtomPullServer.java | 26 +- .../atom/AtomPushOverWebSocketServer.java | 242 +++++++++++++++++++ 2 files changed, 259 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/250dc87f/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java ---------------------------------------------------------------------- diff --git a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java index d133971..07a64ec 100644 --- a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java +++ b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPullServer.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.OutputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -35,6 +34,7 @@ import java.util.WeakHashMap; import java.util.logging.Handler; import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -73,8 +73,7 @@ public class AtomPullServer extends AbstractAtomBean { private volatile int recordsSize; private volatile boolean alreadyClosed; private SearchCondition<LogRecord> readableStorageCondition; - //TODO register streams using specific keys so that they can be individually unregistered - private List<Object> activeStreams; + private Map<String, Object> activeStreams; @Context private MessageContext context; @@ -123,7 +122,7 @@ public class AtomPullServer extends AbstractAtomBean { } readableStorageCondition = list.size() == 0 ? null : new OrSearchCondition<LogRecord>(list); } - activeStreams = Collections.synchronizedList(new ArrayList<Object>()); + activeStreams = Collections.synchronizedMap(new HashMap<String, Object>()); initBusProperty(); } @@ -206,13 +205,14 @@ public class AtomPullServer extends AbstractAtomBean { @GET @Produces({"text/html", "application/xhtml+xml" }) @Path("subscribe/alternate") - public StreamingOutput getAlternateContinuousFeed() { + public StreamingOutput getAlternateContinuousFeed(@HeaderParam("requestId") String reqid) { + final String key = reqid == null ? "*" : reqid; return new StreamingOutput() { public void write(final OutputStream out) throws IOException, WebApplicationException { // return the last entry out.write(convertEntryToHtmlFragment(((LinkedList<LogRecord>)records).getLast()).getBytes()); - activeStreams.add(out); + activeStreams.put(key, out); } }; } @@ -221,7 +221,8 @@ public class AtomPullServer extends AbstractAtomBean { @GET @Produces("application/atom+xml;type=entry") @Path("subscribe") - public StreamingResponse<Entry> getXmlContinuousFeed() { + public StreamingResponse<Entry> getXmlContinuousFeed(@HeaderParam("requestId") String reqid) { + final String key = reqid == null ? "*" : reqid; return new StreamingResponse<Entry>() { public void writeTo(final StreamingResponse.Writer<Entry> out) throws IOException { // return the last entry @@ -231,12 +232,19 @@ public class AtomPullServer extends AbstractAtomBean { .convert(Collections.singletonList(((LinkedList<LogRecord>)records).getLast())).get(0); out.write(entry); - activeStreams.add(out); + activeStreams.put(key, out); } }; } @GET + @Produces("text/plain") + @Path("unsubscribe/{key}") + public Boolean unsubscribeContinuousFeed(@PathParam("key") String key) { + return activeStreams.remove(key) != null; + } + + @GET @Path("entry/{id}") @Produces("application/atom+xml;type=entry") public Entry getEntry(@PathParam("id") int index) { @@ -394,7 +402,7 @@ public class AtomPullServer extends AbstractAtomBean { if (activeStreams.size() > 0) { byte[] rbytes = null; Entry rentry = null; - for (Iterator<Object> it = activeStreams.iterator(); it.hasNext();) { + for (Iterator<Object> it = activeStreams.values().iterator(); it.hasNext();) { Object out = it.next(); try { if (out instanceof OutputStream) { http://git-wip-us.apache.org/repos/asf/cxf/blob/250dc87f/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java ---------------------------------------------------------------------- diff --git a/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java new file mode 100644 index 0000000..7a51445 --- /dev/null +++ b/rt/management-web/src/main/java/org/apache/cxf/management/web/logging/atom/AtomPushOverWebSocketServer.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.management.web.logging.atom; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.logging.Handler; + +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; + +import org.apache.abdera.model.Element; +import org.apache.abdera.model.Feed; +import org.apache.commons.lang.Validate; +import org.apache.cxf.jaxrs.ext.StreamingResponse; +import org.apache.cxf.management.web.logging.atom.converter.Converter; +import org.apache.cxf.management.web.logging.atom.deliverer.Deliverer; + +/** + * Bean used to configure {@link AtomPushHandler JUL handler} with Spring instead of properties file. See + * {@link AtomPushHandler} class for detailed description of parameters. Next to configuration of handler, + * Spring bean offers simple configuration of associated loggers that share ATOM push-style handler. + * <p> + * General rules: + * <ul> + * <li>When {@link #setConverter(Converter) converter} property is not set explicitly, default converter is + * created.</li> + * <li>When {@link #setLoggers(String) loggers} property is used, it overrides pair of + * {@link #setLogger(String) logger} and {@link #setLevel(String) level} properties; and vice versa.</li> + * <li>When logger is not set, handler is attached to root logger (named ""); when level is not set for + * logger, default "INFO" level is used.</li> + * <li>When {@link #setBatchSize(String) batchSize} property is not set or set to wrong value, default batch + * size of "1" is used.</li> + * <li>When deliverer property is NOT set, use of "retryXxx" properties causes creation of retrying default + * deliverer.</li> + * </ul> + * Examples: + * <p> + * ATOM push handler with registered with root logger for all levels or log events, pushing one feed per event + * over the connected websocket, using default conversion methods: + * + * <pre> + * <bean class="org.apache.cxf.jaxrs.ext.logging.atom.AtomPushOverWebSocketBean" + * init-method="init"> + * <property name="level" value="ALL" /> + * </bean> + * </pre> + * + * ATOM push handler registered with multiple loggers and listening for different levels (see + * {@link #setLoggers(String) loggers} property description for syntax details). Custom deliverer will take + * care of feeds, each of which carries batch of 10 log events: + * + * <pre> + * ... + * <bean class="org.apache.cxf.jaxrs.ext.logging.atom.AtomPushOverWebSocketServer" + * init-method="init"> + * <property name="loggers" value=" + * org.apache.cxf:DEBUG, + * org.apache.cxf.jaxrs, + * org.apache.cxf.bus:ERROR" /> + * <property name="batchSize" value="10" /> + * </bean> + * </pre> + */ +//REVISIT we will move the common part into AbstractAtomPushBean so that it can be shared by both AtomPushBean and this +@Path("/logs2") +public final class AtomPushOverWebSocketServer extends AbstractAtomBean { + private AtomPushEngineConfigurator conf = new AtomPushEngineConfigurator(); + private Map<String, Object> activeStreams; + + /** + * Creates unconfigured and uninitialized bean. To configure setters must be used, then {@link #init()} + * must be called. + */ + public AtomPushOverWebSocketServer() { + conf.setDeliverer(new WebSocketDeliverer()); + } + + @Override + public void init() { + super.init(); + activeStreams = Collections.synchronizedMap(new HashMap<String, Object>()); + } + + /** + * Set initialized converter. + */ + public void setConverter(Converter converter) { + checkInit(); + Validate.notNull(converter, "converter is null"); + conf.setConverter(converter); + } + + /** + * Size of batch; empty string for default one element batch. + */ + public void setBatchSize(String batchSize) { + checkInit(); + Validate.notNull(batchSize, "batchSize is null"); + conf.setBatchSize(batchSize); + } + + /** + * Batch cleanup time in minutes + */ + public void setBatchCleanupTime(String batchCleanupTime) { + checkInit(); + Validate.notNull(batchCleanupTime, "batchCleanup is null"); + conf.setBatchCleanupTime(batchCleanupTime); + } + + /** + * Retry pause calculation strategy, either "linear" or "exponential". + */ + public void setRetryPause(String retryPause) { + checkInit(); + Validate.notNull(retryPause, "retryPause is null"); + conf.setRetryPause(retryPause); + } + + /** + * Retry pause time (in seconds). + */ + public void setRetryPauseTime(String time) { + checkInit(); + Validate.notNull(time, "time is null"); + conf.setRetryPauseTime(time); + } + + /** + * Retry timeout (in seconds). + */ + public void setRetryTimeout(String timeout) { + checkInit(); + Validate.notNull(timeout, "timeout is null"); + conf.setRetryTimeout(timeout); + } + + /** + * Conversion output type: "feed" or "entry". + */ + public void setOutput(String output) { + checkInit(); + Validate.notNull(output, "output is null"); + conf.setOutput(output); + } + + /** + * Multiplicity of subelement of output: "one" or "many". + */ + public void setMultiplicity(String multiplicity) { + checkInit(); + Validate.notNull(multiplicity, "multiplicity is null"); + conf.setMultiplicity(multiplicity); + } + + /** + * Entry data format: "content" or "extension". + */ + public void setFormat(String format) { + checkInit(); + Validate.notNull(format, "format is null"); + conf.setFormat(format); + } + + protected Handler createHandler() { + return new AtomPushHandler(conf.createEngine()); + } + + @GET + @Produces("application/atom+xml") + @Path("subscribe") + public StreamingResponse<Feed> subscribeXmlFeed(@HeaderParam("requestId") String reqid) { + final String key = reqid == null ? "*" : reqid; + return new StreamingResponse<Feed>() { + public void writeTo(final StreamingResponse.Writer<Feed> out) throws IOException { + activeStreams.put(key, out); + } + }; + } + + @GET + @Produces("text/plain") + @Path("unsubscribe/{key}") + public Boolean unsubscribeXmlFeed(@PathParam("key") String key) { + return activeStreams.remove(key) != null; + } + + private class WebSocketDeliverer implements Deliverer { + + @Override + public boolean deliver(Element element) throws InterruptedException { + if (activeStreams.size() > 0) { + for (Iterator<Object> it = activeStreams.values().iterator(); it.hasNext();) { + Object out = it.next(); + try { + if (out instanceof StreamingResponse.Writer) { + ((StreamingResponse.Writer)out).write(element); + } + } catch (Throwable t) { + // REVISIT + // the reason for not logging anything here is to not further clog the logger + // with this log broadcasting failure. + System.err.print("ERROR | AtomPushOverWebSocketServer | " + t + "; Unregistering " + out); + it.remove(); + } + } + } + + return true; + } + + @Override + public String getEndpointAddress() { + //REVISIT return something else? + return null; + } + + } +}
