http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java deleted file mode 100644 index 460c43a..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/configuration/EipConfigurator.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.configuration; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -public class EipConfigurator { - - - - @Value("${consumer.inRouteHost}") - private String consumerInRouteHost; - - @Value("${consumer.inRoutePort}") - private String consumerInRoutePort; - - @Value("${subscriber.inRouteHost}") - private String subscriberInRouteHost; - - @Value("${subscriber.inRoutePort}") - private String subscriberInRoutePort; - - - @Value("${consumer.activityQUri}") - private String consumerActivityQUri; - - @Value("${consumer.publisherEndpointProtocol}") - private String publisherEndpointProtocol; - - @Value("${consumer.publisherEndpointUrlResource}") - private String publisherEndpointUrlResource; - - @Value("${consumer.receiveMethod}") - private String consumerReceiveMethod; - - @Value("${consumer.splitMethod}") - private String consumerSplitMethod; - - @Value("${subscriber.subscriberEndpointProtocol}") - private String subscriberEndpointProtocol; - - @Value("${subscriber.subscriberEndpointUrlResource}") - private String subscriberEndpointUrlResource; - - @Value("${subscriber.receiveMethod}") - private String subscriberReceiveMethod; - - @Value("${subscriber.postMethod}") - private String subscriberPostMethod; - - @Value("${subscriber.getMethod}") - private String subscriberGetMethod; - - - @Value("${servlet.baseUrlPath}") - private String baseUrlPath; - - - public static String ENDPOINT_PROTOCOL_JETTY="jetty:http://"; - public static String ENDPOINT_PROTOCOL_SERVLET="servlet:///"; - - public String getConsumerInRouteHost() { - return consumerInRouteHost; - } - - public String getConsumerInRoutePort() { - return consumerInRoutePort; - } - - public String getConsumerActivityQUri() { - return consumerActivityQUri; - } - - public void setConsumerActivityQUri(String consumerActivityQUri) { - this.consumerActivityQUri = consumerActivityQUri; - } - - public void setConsumerInRoutePort(String consumerInRoutePort) { - this.consumerInRoutePort = consumerInRoutePort; - } - - public void setConsumerInRouteHost(String consumerInRouteHost) { - this.consumerInRouteHost = consumerInRouteHost; - } - - public String getSubscriberInRouteHost() { - return subscriberInRouteHost; - } - - public void setSubscriberInRouteHost(String subscriberInRouteHost) { - this.subscriberInRouteHost = subscriberInRouteHost; - } - - public String getSubscriberInRoutePort() { - return subscriberInRoutePort; - } - - public void setSubscriberInRoutePort(String subscriberInRoutePort) { - this.subscriberInRoutePort = subscriberInRoutePort; - } - - public String getPublisherEndpointProtocol() { - return publisherEndpointProtocol; - } - - public void setPublisherEndpointProtocol(String publisherEndpointProtocol) { - this.publisherEndpointProtocol = publisherEndpointProtocol; - } - - public String getPublisherEndpointUrlResource() { - return publisherEndpointUrlResource; - } - - public void setPublisherEndpointUrlResource(String publisherEndpointUrlResource) { - this.publisherEndpointUrlResource = publisherEndpointUrlResource; - } - - public String getConsumerReceiveMethod() { - return consumerReceiveMethod; - } - - public void setConsumerReceiveMethod(String consumerReceiveMethod) { - this.consumerReceiveMethod = consumerReceiveMethod; - } - - public String getConsumerSplitMethod() { - return consumerSplitMethod; - } - - public void setConsumerSplitMethod(String consumerSplitMethod) { - this.consumerSplitMethod = consumerSplitMethod; - } - - public String getSubscriberEndpointProtocol() { - return subscriberEndpointProtocol; - } - - public void setSubscriberEndpointProtocol(String subscriberEndpointProtocol) { - this.subscriberEndpointProtocol = subscriberEndpointProtocol; - } - - public String getSubscriberEndpointUrlResource() { - return subscriberEndpointUrlResource; - } - - public void setSubscriberEndpointUrlResource(String subscriberEndpointUrlResource) { - this.subscriberEndpointUrlResource = subscriberEndpointUrlResource; - } - - public String getSubscriberReceiveMethod() { - return subscriberReceiveMethod; - } - - public void setSubscriberReceiveMethod(String subscriberReceiveMethod) { - this.subscriberReceiveMethod = subscriberReceiveMethod; - } - - public String getSubscriberPostMethod() { - return subscriberPostMethod; - } - - public void setSubscriberPostMethod(String subscriberPostMethod) { - this.subscriberPostMethod = subscriberPostMethod; - } - - public String getSubscriberGetMethod() { - return subscriberGetMethod; - } - - public void setSubscriberGetMethod(String subscriberGetMethod) { - this.subscriberGetMethod = subscriberGetMethod; - } - - public String getBaseUrlPath() { - return baseUrlPath; - } - - public void setBaseUrlPath(String baseUrlPath) { - this.baseUrlPath = baseUrlPath; - } - - -}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java deleted file mode 100644 index 741a63c..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityPublisherRegistrationProcessor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.processors; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; - - -public class ActivityPublisherRegistrationProcessor implements Processor{ - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class); - public void process(Exchange exchange){ - //add the necessary headers to the message so that the activity registration component - //can do a lookup to either make a new processor and endpoint, or pass the message to the right one - String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString(); - - if (!httpMethod.equals("POST")){ - //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405); - } else { - - //for now...just expect a post with a uri in the body...should have some checking here with http response codes - // authentication, all that good stuff...happens in the registration module - - String body = exchange.getIn().getBody(String.class); - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false); - - try { - - // read from file, convert it to user class - ActivityConsumer configuration = mapper.readValue(body, ActivityConsumer.class); - if (configuration.getSrc()==null){ - LOG.info("configuration src is null"); - throw new Exception(); - } - - exchange.getOut().setBody(configuration); - - } catch (Exception e) { - LOG.info("error: " + e); - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400); - exchange.getOut().setBody("POST should contain a valid JSON configuration for registering as an Activity Publisher (check that src element is valid)."); - } - } - - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java deleted file mode 100644 index 201eebd..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/processors/ActivityStreamsSubscriberRegistrationProcessor.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.processors; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.messaging.service.SubscriptionService; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; - - -public class ActivityStreamsSubscriberRegistrationProcessor implements Processor{ - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRegistrationProcessor.class); - private SubscriptionService subscriptionService; - - public ActivityStreamsSubscriberRegistrationProcessor(SubscriptionService subscriptionService){ - this.subscriptionService = subscriptionService; - } - - public void process(Exchange exchange){ - LOG.info("processing the subscriber..."); - //add the necessary headers to the message so that the activity registration component - //can do a lookup to either make a new processor and endpoint, or pass the message to the right one - String httpMethod = exchange.getIn().getHeader("CamelHttpMethod").toString(); - - if (!httpMethod.equals("POST")){ - //reject anything that isn't a post...Camel 2.10 solves needing this check, however, SM 4.4 doesn't have the latest - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,405); - } else { - - //for now...just expect a post with a uri in the body...should have some checking here with http response codes - // authentication, all that good stuff...happens in the registration module - - - String body = exchange.getIn().getBody(String.class); - - LOG.info("receiving the subscriber: "+body); - //OAuth token? What does subscriber post to init a subscription URL? - //maybe its a list of URLs to subscribe to subscriptions=1,2,3,4&auth_token=XXXX - - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false); - - try { - - // read from file, convert it to user class - ActivityStreamsSubscription configuration = mapper.readValue(body, ActivityStreamsSubscription.class); - if(configuration.getFilters() == null){ - configuration.setFilters(subscriptionService.getFilters(configuration.getAuthToken())); - }else{ - subscriptionService.saveFilters(configuration); - } - exchange.getOut().setBody(configuration); - - } catch (Exception e) { - LOG.info("exception" + e); - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,400); - exchange.getOut().setBody("POST should contain a valid Subscription configuration object."); - } - - - - //just pass this on to the route creator, body will be the dedicated URL for this subscriber - - } - - - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java deleted file mode 100644 index dea8781..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityConsumerRouteBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers; - - - -import org.apache.camel.Exchange; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; - - -public interface ActivityConsumerRouteBuilder { - - - void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java deleted file mode 100644 index 6947722..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/ActivityStreamsSubscriberRouteBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers; - - - -import org.apache.camel.Exchange; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; - - -public interface ActivityStreamsSubscriberRouteBuilder { - - - void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber); - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java deleted file mode 100644 index 20b8246..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityConsumerRouter.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers.impl; - - -import org.apache.camel.builder.RouteBuilder; -import org.apache.streams.messaging.routers.ActivityConsumerRouteBuilder; - - -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumerWarehouse; -import org.apache.streams.osgi.components.activityconsumer.ActivityConsumer; -import org.apache.streams.messaging.configuration.EipConfigurator; -import org.springframework.beans.factory.annotation.Autowired; -import org.apache.camel.Exchange; -import org.apache.camel.CamelContext; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.UUID; - - -public class ActivityConsumerRouter extends RouteBuilder implements ActivityConsumerRouteBuilder { - private static final transient Log LOG = LogFactory.getLog(ActivityConsumerRouter.class); - - @Autowired - private EipConfigurator configuration; - - protected CamelContext camelContext; - - private ActivityConsumerWarehouse activityConsumerWarehouse; - - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public void setActivityConsumerWarehouse(ActivityConsumerWarehouse activityConsumerWarehouse) { - this.activityConsumerWarehouse = activityConsumerWarehouse; - } - - - public void createNewRouteForConsumer(Exchange exchange, ActivityConsumer activityConsumer){ - - //todo: add some better scheme then getCount for URL... - //todo: make the route again if consumer exists...and context doesn't have route - if (activityConsumer.isAuthenticated()){ - ActivityConsumer existingConsumer = activityConsumerWarehouse.findConsumerBySrc(activityConsumer.getSrc().toASCIIString()); - - if (existingConsumer==null){ - - try{ - - if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){ - activityConsumer.setInRoute(configuration.getConsumerInRouteHost()+ ":" + configuration.getConsumerInRoutePort() +"/" + configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody("http://" + activityConsumer.getInRoute()); - }else if (configuration.getPublisherEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){ - activityConsumer.setInRoute( configuration.getPublisherEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody(configuration.getBaseUrlPath() + activityConsumer.getInRoute()); - } else{ - throw new Exception("No supported endpoint protocol is configured."); - } - - - //setup a message queue for this consumer.getInRoute() - camelContext.addRoutes(new DynamicConsumerRouteBuilder(configuration,camelContext, configuration.getPublisherEndpointProtocol() + activityConsumer.getInRoute(), activityConsumer)); - - - LOG.info("all messages sent from " + activityConsumer.getSrc() + " must be posted to " + activityConsumer.getInRoute()); - //only add the route to the warehouse after its been created in messaging system... - activityConsumerWarehouse.register(activityConsumer); - }catch (Exception e){ - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500); - exchange.getOut().setBody("error creating route: " + e); - LOG.error("error creating route: " + e); - } - - } else{ - - exchange.getOut().setBody(configuration.getBaseUrlPath() + existingConsumer.getInRoute()); - } - - }else{ - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401); - exchange.getOut().setBody("Authentication failed."); - } - - } - - - public void configure() throws java.lang.Exception{ - //nothing...set the context? - - } - - /** - * This route builder is a skeleton to add new routes at runtime - */ - private static final class DynamicConsumerRouteBuilder extends RouteBuilder { - private final String from; - private ActivityConsumer activityConsumer; - - - private EipConfigurator configuration; - - private DynamicConsumerRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityConsumer activityConsumer) { - super(context); - this.from = from; - this.activityConsumer = activityConsumer; - this.configuration = configuration; - } - - @Override - public void configure() throws Exception { - - - from(from) - .bean(activityConsumer, configuration.getConsumerReceiveMethod()).setBody(body()) - .split() - .method(activityConsumer, configuration.getConsumerSplitMethod()) - .to(configuration.getConsumerActivityQUri()); - - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java deleted file mode 100644 index 68ef0a5..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/routers/impl/ActivityStreamsSubscriberRouter.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.routers.impl; - - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.builder.RouteBuilder; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.streams.messaging.aggregation.ActivityAggregator; -import org.apache.streams.messaging.configuration.EipConfigurator; -import org.apache.streams.messaging.routers.ActivityStreamsSubscriberRouteBuilder; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriber; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.UUID; - - -public class ActivityStreamsSubscriberRouter extends RouteBuilder implements ActivityStreamsSubscriberRouteBuilder { - private static final transient Log LOG = LogFactory.getLog(ActivityStreamsSubscriberRouter.class); - - @Autowired - private EipConfigurator configuration; - - protected CamelContext camelContext; - - @Autowired - private ActivityAggregator activityAggregator; - - private ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse; - - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - public void setActivityStreamsSubscriberWarehouse(ActivityStreamsSubscriberWarehouse activityStreamsSubscriberWarehouse) { - this.activityStreamsSubscriberWarehouse = activityStreamsSubscriberWarehouse; - } - - - public void createNewRouteForSubscriber(Exchange exchange, ActivityStreamsSubscriber activityStreamsSubscriber){ - - //todo: add some better scheme then getCount for URL... - //todo: make the route again if subscriber exists...and context doesn't have route - if (activityStreamsSubscriber.isAuthenticated()){ - - try{ - - if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_JETTY)){ - activityStreamsSubscriber.setInRoute(configuration.getSubscriberInRouteHost()+ ":" + configuration.getSubscriberInRoutePort() +"/" + configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody("http://" + activityStreamsSubscriber.getInRoute()); - }else if (configuration.getSubscriberEndpointProtocol().equals(EipConfigurator.ENDPOINT_PROTOCOL_SERVLET)){ - activityStreamsSubscriber.setInRoute( configuration.getSubscriberEndpointUrlResource() + "/" + UUID.randomUUID()); - //set the body to the url the producer should post to - exchange.getOut().setBody(configuration.getBaseUrlPath() + activityStreamsSubscriber.getInRoute()); - } else{ - throw new Exception("No supported endpoint protocol is configured."); - } - - //setup a message queue for this consumer.getInRoute() - camelContext.addRoutes(new DynamicSubscriberRouteBuilder(configuration,camelContext, configuration.getSubscriberEndpointProtocol() + activityStreamsSubscriber.getInRoute(), activityStreamsSubscriber)); - - activityAggregator.updateSubscriber(activityStreamsSubscriber); - activityStreamsSubscriberWarehouse.register(activityStreamsSubscriber); - }catch (Exception e){ - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,500); - exchange.getOut().setBody("error creating route: " + e); - LOG.error("error creating route: " + e); - } - - }else{ - exchange.getOut().setFault(true); - exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE,401); - exchange.getOut().setBody("Authentication failed."); - } - - } - - - - - public void configure() throws Exception{ - //nothing...set the context? - - } - - /** - * This route builder is a skeleton to add new routes at runtime - */ - private static final class DynamicSubscriberRouteBuilder extends RouteBuilder { - private final String from; - private ActivityStreamsSubscriber activityStreamsSubscriber; - - - private EipConfigurator configuration; - - private DynamicSubscriberRouteBuilder(EipConfigurator configuration, CamelContext context, String from, ActivityStreamsSubscriber activityStreamsSubscriber) { - super(context); - this.from = from; - this.activityStreamsSubscriber = activityStreamsSubscriber; - this.configuration = configuration; - } - - @Override - public void configure() throws Exception { - - - from(from) - .choice() - .when(header("CamelHttpMethod").isEqualTo("POST")) - //when its a post...it goes to adding a new src - .bean(activityStreamsSubscriber, configuration.getSubscriberPostMethod()).setBody(body()) - .when(header("CamelHttpMethod").isEqualTo("GET")) - // when its a GET it goes to getStream() - .bean(activityStreamsSubscriber, configuration.getSubscriberGetMethod()) ; - - - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java deleted file mode 100644 index 0c85974..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/ActivityService.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service; - -import org.apache.camel.Exchange; - -import java.util.Date; -import java.util.List; - -public interface ActivityService { - - void receiveExchange(Exchange exchange); - - List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated); -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java deleted file mode 100644 index 98f585d..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/SubscriptionService.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service; - -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; - -import java.util.List; - -public interface SubscriptionService { - - List<String> getFilters(String authToken); - void saveFilters(ActivityStreamsSubscription subscription); -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java deleted file mode 100644 index 89f71ab..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraActivityService.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service.impl; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.camel.Exchange; -import org.apache.rave.model.ActivityStreamsEntry; -import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry; -import org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository; -import org.apache.streams.messaging.service.ActivityService; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.map.ObjectMapper; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -public class CassandraActivityService implements ActivityService { - - private static final transient Log LOG = LogFactory.getLog(CassandraActivityService.class); - - private CassandraActivityStreamsRepository cassandraActivityStreamsRepository; - private ObjectMapper mapper; - - @Autowired - public CassandraActivityService(CassandraActivityStreamsRepository cassandraActivityStreamsRepository, ObjectMapper mapper) { - this.cassandraActivityStreamsRepository = cassandraActivityStreamsRepository; - this.mapper = mapper; - mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - @Override - public void receiveExchange(Exchange exchange) { - - //receive the exchange as a list - List<Exchange> grouped = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - - for (Exchange e : grouped) { - //get activity off of exchange - LOG.info("Exchange: " + e); - - //extract the ActivityStreamsEntry object and save it in the database - LOG.info("About to preform the translation to JSON Object"); - String activityJson = e.getIn().getBody(String.class); - - try { - ActivityStreamsEntry streamsEntry = mapper.readValue(activityJson, CassandraActivityStreamsEntry.class); - streamsEntry.setPublished(new Date()); - cassandraActivityStreamsRepository.save(streamsEntry); - } catch (IOException err) { - LOG.error("there was an error while converting the json string to an object and saving to the database", err); - } - - } - } - - @Override - public List<String> getActivitiesForFilters(List<String> filters, Date lastUpdated) { - List<CassandraActivityStreamsEntry> activityObjects = cassandraActivityStreamsRepository.getActivitiesForFilters(filters, lastUpdated); - Collections.sort(activityObjects, Collections.reverseOrder()); - //TODO: make the number of streams returned configurable - return getJsonList(activityObjects.subList(0,Math.min(activityObjects.size(),10))); - } - - private List<String> getJsonList(List<CassandraActivityStreamsEntry> activities) { - List<String> jsonList = new ArrayList<String>(); - for (ActivityStreamsEntry entry : activities) { - try { - jsonList.add(mapper.writeValueAsString(entry)); - } catch (IOException e) { - LOG.error("There was an error while trying to convert the java object to a string: " + entry, e); - } - } - return jsonList; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java b/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java deleted file mode 100644 index 8972d1e..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/java/org/apache/streams/messaging/service/impl/CassandraSubscriptionService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service.impl; - -import org.apache.streams.cassandra.repository.impl.CassandraSubscriptionRepository; -import org.apache.streams.messaging.service.SubscriptionService; -import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription; - -import java.util.Arrays; -import java.util.List; - -public class CassandraSubscriptionService implements SubscriptionService { - - private CassandraSubscriptionRepository repository; - - public CassandraSubscriptionService(CassandraSubscriptionRepository repository){ - this.repository = repository; - } - - public List<String> getFilters(String authToken){ - return Arrays.asList(repository.getFilters(authToken).split(" ")); - } - - public void saveFilters(ActivityStreamsSubscription subscription){ - repository.save(subscription); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml deleted file mode 100644 index 60a3f1f..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/propertiesLoader.xml +++ /dev/null @@ -1,35 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<!--<beans--> - <!--xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"--> - <!--xmlns="http://www.springframework.org/schema/beans"--> - <!--xmlns:context="http://www.springframework.org/schema/context"--> - <!--xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd--> - <!--http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">--> - - - - <!--<context:component-scan base-package="org.apache.streams.messaging" annotation-config="true"/>--> - - <!--<context:property-placeholder location="/META-INF/streams.properties"/>--> - - <!--<bean id="configuration" class="org.apache.streams.messaging.configuration.EipConfigurator" />--> - - -<!--</beans>--> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml deleted file mode 100644 index a9b97a7..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-applicationContext.xml +++ /dev/null @@ -1,113 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<beans - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns="http://www.springframework.org/schema/beans" - xmlns:context="http://www.springframework.org/schema/context" - xmlns:task="http://www.springframework.org/schema/task" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> - - - <bean id="activityConsumerRouter" class="org.apache.streams.messaging.routers.impl.ActivityConsumerRouter"> - <property name="activityConsumerWarehouse" ref="activityConsumerWarehouse"/> - <property name="camelContext" ref="context"/> - </bean> - - <bean id="activityRegistrationProcessor" - class="org.apache.streams.messaging.processors.ActivityPublisherRegistrationProcessor"/> - - - <bean id="activityStreamsSubscriberRouter" - class="org.apache.streams.messaging.routers.impl.ActivityStreamsSubscriberRouter"> - <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/> - <property name="camelContext" ref="context"/> - </bean> - - <bean id="subscriberRegistrationProcessor" - class="org.apache.streams.messaging.processors.ActivityStreamsSubscriberRegistrationProcessor"> - <constructor-arg ref="subscriptionService"/> - </bean> - - <bean id="cassandraKeyspace" class="org.apache.streams.cassandra.repository.impl.CassandraKeyspace"> - <constructor-arg ref="cassandraConfig"/> - </bean> - - <bean id="cassandraActivityStreamsRepository" - class="org.apache.streams.cassandra.repository.impl.CassandraActivityStreamsRepository"> - <constructor-arg ref="cassandraKeyspace"/> - <constructor-arg ref="cassandraConfig"/> - </bean> - - <bean id="cassandraSubscriptionRepository" - class="org.apache.streams.cassandra.repository.impl.CassandraSubscriptionRepository"> - <constructor-arg ref="cassandraKeyspace"/> - <constructor-arg ref="cassandraConfig"/> - </bean> - - <bean id="objectMapper" class="org.codehaus.jackson.map.ObjectMapper"/> - - <bean id="subscriptionService" class="org.apache.streams.messaging.service.impl.CassandraSubscriptionService"> - <constructor-arg ref="cassandraSubscriptionRepository"/> - </bean> - - <bean id="activityService" class="org.apache.streams.messaging.service.impl.CassandraActivityService"> - <constructor-arg ref="cassandraActivityStreamsRepository"/> - <constructor-arg ref="objectMapper"/> - </bean> - - <!--<bean id="stromActivityAggregator" class="org.apache.streams.messaging.storm.StormActivityAggregator">--> - <!--<constructor-arg ref="bolt"/>--> - <!--<constructor-arg ref="spout"/>--> - <!--</bean>--> - - <!--<bean id="bolt" class="org.apache.streams.messaging.storm.StormSubscriberBolt"/>--> - <!--<bean id="spout" class="org.apache.streams.messaging.storm.StormSubscriberSpout"/>--> - - <bean id="activityAggregator" class="org.apache.streams.messaging.aggregation.ActivityAggregator"> - <property name="activityService" ref="activityService"/> - <property name="activityStreamsSubscriberWarehouse" ref="activityStreamsSubscriberWarehouse"/> - </bean> - - <task:annotation-driven/> - - <bean id="jmsConnectionFactory" - class="org.apache.activemq.ActiveMQConnectionFactory"> - <property name="brokerURL" value="${activemq.jmsConnectionFactoryUrl}"/> - </bean> - - <bean id="pooledConnectionFactory" - class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> - <property name="maxConnections" value="8"/> - <property name="connectionFactory" ref="jmsConnectionFactory"/> - </bean> - - <bean id="jmsConfig" - class="org.apache.camel.component.jms.JmsConfiguration"> - <property name="connectionFactory" ref="pooledConnectionFactory"/> - <property name="concurrentConsumers" value="10"/> - </bean> - - <bean id="activemq" - class="org.apache.activemq.camel.component.ActiveMQComponent"> - <property name="configuration" ref="jmsConfig"/> - </bean> - - -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml deleted file mode 100644 index 9066206..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streams-eip-osgi-component-import.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:osgi="http://www.springframework.org/schema/osgi" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/osgi - http://www.springframework.org/schema/osgi/spring-osgi.xsd"> - - - <osgi:reference id="activityPublisherRegistration" interface="org.apache.streams.osgi.components.ActivityPublisherRegistration" /> - <osgi:reference id="activityConsumerWarehouse" interface="org.apache.streams.osgi.components.activityconsumer.ActivityConsumerWarehouse" /> - - <osgi:reference id="activityStreamsSubscriberRegistration" interface="org.apache.streams.osgi.components.ActivityStreamsSubscriberRegistration" /> - <osgi:reference id="activityStreamsSubscriberWarehouse" interface="org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscriberWarehouse" /> - - - - -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml deleted file mode 100644 index 4360f39..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/spring/streamsCamelContext.xml +++ /dev/null @@ -1,96 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xmlns:osgi="http://www.springframework.org/schema/osgi" - xsi:schemaLocation=" - http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans-2.5.xsd - http://camel.apache.org/schema/spring - http://camel.apache.org/schema/spring/camel-spring-2.0.0.xsd - http://www.springframework.org/schema/osgi - http://www.springframework.org/schema/osgi/spring-osgi.xsd"> - - - - - <camelContext id="context" xmlns="http://camel.apache.org/schema/spring"> - - <endpoint id="consumerRegistrationEndpoint" uri="${consumer.registrationEndpoint}"/> - <endpoint id="subscriberRegistrationEndpoint" uri="${subscriber.registrationEndpoint}"/> - <!--publisher registration route setup --> - <route> - <from uri="ref:consumerRegistrationEndpoint"/> - <bean ref="activityRegistrationProcessor" /> - <to uri="direct:publisher-register"/> - </route> - - <route> - <from uri="direct:publisher-register"/> - <bean ref="activityPublisherRegistration" method="register"/> - <to uri="direct:add-publisher-route"/> - </route> - - <route> - <from uri="direct:add-publisher-route"/> - <bean ref="activityConsumerRouter" method="createNewRouteForConsumer"/> - <to uri="log:ExampleLog"/> - </route> - - <!--split activities on Q, waiting for aggregation --> - <route> - <from uri="direct:activityQ"/> - <inOnly uri="activemq:queue:activities"/> - </route> - - <route> - <from uri="activemq:queue:activities"/> - <aggregate completionInterval="500" groupExchanges="true"> - <correlationExpression> - <constant>true</constant> - </correlationExpression> - <bean ref="activityService" method="receiveExchange"/> - </aggregate> - </route> - - - <!-- register as a subscriber - returned the endpoint to poll and add to subscription sources - GET/POST --> - <route> - <from uri="ref:subscriberRegistrationEndpoint"/> - <bean ref="subscriberRegistrationProcessor" /> - <to uri="direct:subscriber-register"/> - </route> - - <route> - <from uri="direct:subscriber-register"/> - <bean ref="activityStreamsSubscriberRegistration" method="register"/> - <to uri="direct:add-subscriber-route"/> - </route> - - <route> - <from uri="direct:add-subscriber-route"/> - <bean ref="activityStreamsSubscriberRouter" method="createNewRouteForSubscriber"/> - <to uri="log:ExampleLog"/> - </route> - - - </camelContext> - -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties b/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties deleted file mode 100644 index b7bbfce..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/main/resources/META-INF/streams.properties +++ /dev/null @@ -1,41 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -servlet.baseUrlPath=http://localhost:8080/streams-web/ - -consumer.inRouteHost=localhost -consumer.inRoutePort=8000 -consumer.activityQUri = direct:activityQ - -consumer.publisherEndpointProtocol=jetty:http:// -consumer.publisherEndpointUrlResource=streams/publish -consumer.receiveMethod=receive -consumer.splitMethod=split - -consumer.registrationEndpoint=jetty:http://localhost:8000/streams/publisher/register - -subscriber.inRouteHost=localhost -subscriber.inRoutePort=8000 -subscriber.subscriberEndpointUrlResource=streams/subscriber -subscriber.receiveMethod=receive -subscriber.postMethod=updateActivityStreamsSubscriberConfiguration -subscriber.getMethod=getStream -subscriber.registrationEndpoint=jetty:http://localhost:8000/streams/subscriber/register -subscriber.subscriberEndpointProtocol=jetty:http:// - -activemq.jmsConnectionFactoryUrl=tcp://localhost:61616 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce7aa24d/streams-runtimes/streams-runtime-webapp/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-webapp/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java b/streams-runtimes/streams-runtime-webapp/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java deleted file mode 100644 index 0812c47..0000000 --- a/streams-runtimes/streams-runtime-webapp/src/test/java/org/apache/streams/messaging/service/impl/CassandraActivityServiceTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.messaging.service.impl; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import static org.easymock.EasyMock.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; - -public class CassandraActivityServiceTest { - - private CassandraActivityService cassandraActivityService; - - @Before - public void setup(){ -// cassandraActivityService = new CassandraActivityService(); - } - - @Ignore - @Test - public void getActivititiesForFilterTest(){ - List<String> activities = cassandraActivityService.getActivitiesForFilters(Arrays.asList("r501"), new Date(0)); - } - - @Ignore - @Test - public void receiveExchangeTest(){ - Exchange e = createMock(Exchange.class); - List<Exchange> grouped = new ArrayList<Exchange>(); - Exchange e2 = createMock(Exchange.class); - grouped.add(e2); - Message m = createMock(Message.class); - - String activityJson = "{\n" + - "\"id\":\"id2\",\n" + - "\"verb\":\"verb2\",\n" + - "\"displayName\":\"displayname2\",\n" + - "\"target\":{\n" + - "\t\"id\":\"targetid2\",\n" + - "\t\"displayName\":\"targetname2\"\n" + - "\t},\n" + - "\t\"object\":{\n" + - "\t\"id\":\"objectid2\",\n" + - "\t\"displayName\":\"objectname2\"\n" + - "\t},\n" + - "\t\"actor\":{\n" + - "\t\"id\":\"actorid2\",\n" + - "\t\"displayName\":\"actorname2\"\n" + - "\t}\n" + - "\t\n" + - "\t}"; - - expect(e.getProperty(Exchange.GROUPED_EXCHANGE, List.class)).andReturn(grouped); - expect(e2.getIn()).andReturn(m); - expect(m.getBody(String.class)).andReturn(activityJson); - - replay(e, e2, m); - - cassandraActivityService.receiveExchange(e); - //List<String> myTest = cassandraActivityService.getActivitiesForQuery("select * from coltest"); - } -}