Set the uri of app data pub sub operators to be the GATEWAY_CONNECT_ADDRESS by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0be7372b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0be7372b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0be7372b Branch: refs/heads/master Commit: 0be7372b1129b616d33ccaddc4c05441b52a6968 Parents: 93ce29c Author: Timothy Farkas <[email protected]> Authored: Fri Jul 24 20:07:41 2015 -0700 Committer: Chetan Narsude <[email protected]> Committed: Tue Aug 4 09:42:07 2015 -0700 ---------------------------------------------------------------------- .../lib/io/PubSubWebSocketAppDataQuery.java | 48 ++++++++++++- .../lib/io/PubSubWebSocketAppDataResult.java | 27 ++++++- .../lib/io/WebSocketInputOperator.java | 4 +- .../lib/io/WebSocketOutputOperator.java | 4 +- .../io/PubSubWebSocketAppDataOperatorTest.java | 74 ++++++++++++++++++++ .../lib/io/PubSubWebSocketAppDataQueryTest.java | 66 ++++++++++++++--- .../io/PubSubWebSocketAppDataResultTest.java | 20 ++---- 7 files changed, 212 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java index f2b1fdb..14a2d2b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java @@ -16,6 +16,9 @@ package com.datatorrent.lib.io; +import java.net.URI; +import java.net.URISyntaxException; + import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -23,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.util.PubSubMessage; @@ -50,9 +54,51 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St @Override public void setup(OperatorContext context) { + this.uri = uriHelper(context, uri); + logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic()); super.setup(context); + } - logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic()); + public static URI uriHelper(OperatorContext context, URI uri) + { + if (uri == null) { + if (context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) == null) { + throw new IllegalArgumentException("The uri property is not set and the dt.attr.GATEWAY_CONNECT_ADDRESS is not defined"); + } + + try { + uri = new URI("ws://" + + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) + + "/pubsub"); + } catch (URISyntaxException ex) { + throw new RuntimeException(ex); + } + } + + return uri; + } + + /** + * Gets the URI for WebSocket connection. + * + * @return the URI + */ + @Override + public URI getUri() + { + return uri; + } + + /** + * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this + * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}. + * + * @param uri + */ + @Override + public void setUri(URI uri) + { + this.uri = uri; } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java index 3401233..5f0b947 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java @@ -26,6 +26,7 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.util.PubSubMessage.PubSubMessageType; +import java.net.URI; /** * This is an app data pub sub result operator. This operator is used to send results to * App Data dashboards produced by App Data store operators. @@ -47,8 +48,9 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< @Override public void setup(OperatorContext context) { + this.uri = PubSubWebSocketAppDataQuery.uriHelper(context, uri); + logger.debug("Setting up:\nuri:{}\ntopic:{}",this.getUri(), this.getTopic()); super.setup(context); - logger.debug("Setting up: "); } @Override @@ -57,6 +59,29 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< return "pubsub"; } + /** + * Gets the URI for WebSocket connection. + * + * @return the URI + */ + @Override + public URI getUri() + { + return uri; + } + + /** + * The URI for WebSocket connection. If this is not set, the value of the dt.attr.GATEWAY_CONNECT_ADDRESS DAG attribute is used. If neither this + * property or dt.attr.GATEWAY_CONNECT_ADDRESS attribute is set, then this operator will fail with an {@link IllegalArgumentException}. + * + * @param uri + */ + @Override + public void setUri(URI uri) + { + this.uri = uri; + } + @Override public String convertMapToMessage(String t) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index dabcacb..a8cfa6e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -51,8 +51,8 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> * Timeout interval for reading from server. 0 or negative indicates no timeout. */ public int readTimeoutMillis = 0; - @NotNull - private URI uri; + //Do not make this @NotNull since null is a valid value for some child classes + protected URI uri; private transient AsyncHttpClient client; private transient final JsonFactory jsonFactory = new JsonFactory(); protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java index a92f8b3..a7ab3bd 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketOutputOperator.java @@ -49,8 +49,8 @@ import com.datatorrent.api.DefaultInputPort; public class WebSocketOutputOperator<T> extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(WebSocketOutputOperator.class); - @NotNull - private URI uri; + //Do not make this @NotNull since null is a valid value for some child classes + protected URI uri; private transient AsyncHttpClient client; private transient final JsonFactory jsonFactory = new JsonFactory(); protected transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java new file mode 100644 index 0000000..bc379ce --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2015 DataTorrent + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datatorrent.lib.io; + +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.common.experimental.AppData; + +public abstract class PubSubWebSocketAppDataOperatorTest +{ + public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com"; + public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub"; + public static final URI GATEWAY_CONNECT_ADDRESS; + public static final URI URI_ADDRESS; + + static + { + try { + GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub"); + URI_ADDRESS = new URI(URI_ADDRESS_STRING); + } catch (URISyntaxException ex) { + throw new RuntimeException(ex); + } + } + + public abstract AppData.ConnectionInfoProvider getOperator(); + + @Test + public void testGetAppDataURL() throws Exception + { + String topic = "test"; + String correct = "pubsub"; + + AppData.ConnectionInfoProvider pubsub = getOperator(); + + setUri(pubsub, URI_ADDRESS); + setTopic(pubsub, topic); + + Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL()); + } + + public void setUri(Object o, URI uri) throws Exception + { + Class<?> clazz = o.getClass(); + Method m = clazz.getMethod("setUri", URI.class); + m.invoke(o, uri); + } + + public void setTopic(Object o, String topic) throws Exception + { + Class<?> clazz = o.getClass(); + Method m = clazz.getMethod("setTopic", String.class); + m.invoke(o, topic); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java index 894ed72..c2aa0da 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java @@ -15,23 +15,69 @@ */ package com.datatorrent.lib.io; -import java.net.URI; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; -public class PubSubWebSocketAppDataQueryTest +import com.datatorrent.lib.helper.OperatorContextTestHelper; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; + +import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider; + +public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest { + private static OperatorContext context; + private static OperatorContext emptyContext; + + @BeforeClass + public static void setupContext() throws Exception + { + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, GATEWAY_CONNECT_ADDRESS_STRING); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + emptyContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + } + + @Override + public ConnectionInfoProvider getOperator() + { + return new PubSubWebSocketAppDataQuery(); + } + @Test - public void testGetAppDataURL() throws Exception + public void testURISet() throws Exception { - URI uri = URI.create("ws://localhost:6666/pubsub"); - String topic = "test"; - String correct = "pubsub"; + Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(emptyContext, URI_ADDRESS)); + } - PubSubWebSocketAppDataQuery pubsub = new PubSubWebSocketAppDataQuery(); - pubsub.setUri(uri); - pubsub.setTopic(topic); + @Test + public void testNoURISet() throws Exception + { + boolean threwException = false; + + try { + PubSubWebSocketAppDataQuery.uriHelper(emptyContext, null); + } catch (Exception e) { + threwException = e instanceof IllegalArgumentException; + } - Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL()); + Assert.assertTrue(threwException); + } + + @Test + public void testAttrSet() throws Exception + { + Assert.assertEquals(GATEWAY_CONNECT_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, null)); + } + + @Test + public void testAttrAndURISet() throws Exception + { + Assert.assertEquals(URI_ADDRESS, PubSubWebSocketAppDataQuery.uriHelper(context, URI_ADDRESS)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0be7372b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java index e38e439..7afa211 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResultTest.java @@ -15,23 +15,13 @@ */ package com.datatorrent.lib.io; -import java.net.URI; -import org.junit.Assert; -import org.junit.Test; +import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider; -public class PubSubWebSocketAppDataResultTest +public class PubSubWebSocketAppDataResultTest extends PubSubWebSocketAppDataOperatorTest { - @Test - public void testGetAppDataURL() throws Exception + @Override + public ConnectionInfoProvider getOperator() { - URI uri = URI.create("ws://localhost:6666/pubsub"); - String topic = "test"; - String correct = "pubsub"; - - PubSubWebSocketAppDataResult pubsub = new PubSubWebSocketAppDataResult(); - pubsub.setUri(uri); - pubsub.setTopic(topic); - - Assert.assertEquals("The url is incorrect.", correct, pubsub.getAppDataURL()); + return new PubSubWebSocketAppDataResult(); } }
