This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push: new 095a2f7 APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster 095a2f7 is described below commit 095a2f789fa0221d14e92cbdf9bcdd6ba1e933c2 Author: Pramod Immaneni <pra...@datatorrent.com> AuthorDate: Tue Oct 24 17:10:56 2017 -0700 APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster --- .../datatorrent/apps/logstream/Application.java | 26 ++++----- .../apex/examples/frauddetect/Application.java | 10 ++-- .../apache/apex/examples/mobile/Application.java | 4 +- .../apex/examples/mobile/ApplicationTest.java | 6 +- .../mrmonitor/MRMonitoringApplication.java | 5 +- .../twitter/KinesisHashtagsApplication.java | 10 ++-- .../twitter/TwitterTopCounterApplication.java | 9 +-- .../wordcount/ApplicationWithQuerySupport.java | 10 +--- .../lib/io/PubSubWebSocketAppDataQuery.java | 9 +-- .../datatorrent/lib/io/WidgetOutputOperator.java | 9 +-- .../apache/apex/malhar/lib/utils/PubSubHelper.java | 67 ++++++++++++++++++++++ .../lib/io/PubSubWebSocketAppDataOperatorTest.java | 10 ++-- .../lib/io/PubSubWebSocketOperatorTest.java | 4 +- 13 files changed, 112 insertions(+), 67 deletions(-) diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 98dfebd..82c9214 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -18,19 +18,24 @@ */ package com.datatorrent.apps.logstream; -import java.net.URI; -import java.util.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.contrib.redis.RedisMapOutputOperator; +import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; import com.datatorrent.lib.algo.TopN; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -43,13 +48,6 @@ import com.datatorrent.lib.streamquery.index.ColumnIndex; import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.contrib.redis.RedisMapOutputOperator; -import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; - /** * Log stream processing application based on Apex platform.<br> * This application consumes log data generated by running systems and services @@ -156,14 +154,12 @@ public class Application implements StreamingApplication private InputPort<Object> wsOutput(DAG dag, String operatorName) { - String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(daemonAddress)) { - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); + if (PubSubHelper.isGatewayConfigured(dag)) { String appId = "appid"; //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application String topic = "apps.logstream." + appId + "." + operatorName; PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java index 73c38ef..eed9a68 100644 --- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -20,11 +20,13 @@ package org.apache.apex.examples.frauddetect; import java.io.Serializable; import java.net.URI; + import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -88,11 +90,7 @@ public class Application implements StreamingApplication { try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { - gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); + URI duri = PubSubHelper.getURIWithDefault(dag, "localhost:9090"); PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction"); PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert"); diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java index f719643..dd1e136 100644 --- a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java +++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java @@ -26,6 +26,7 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.commons.lang.mutable.MutableLong; import org.apache.commons.lang3.Range; import org.apache.hadoop.conf.Configuration; @@ -157,8 +158,7 @@ public class Application implements StreamingApplication // done generating data LOG.info("Finished generating seed data."); - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + URI uri = PubSubHelper.getURI(dag); PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>()); wsOut.setUri(uri); PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>()); diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java index ce6ca41..b88ed57 100644 --- a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java +++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java @@ -21,7 +21,6 @@ package org.apache.apex.examples.mobile; import java.net.URI; import java.util.HashMap; import java.util.Map; - import javax.servlet.Servlet; import org.eclipse.jetty.server.Connector; @@ -32,11 +31,10 @@ import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; - import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; import com.datatorrent.lib.io.PubSubWebSocketInputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -67,7 +65,7 @@ public class ApplicationTest server.start(); Connector[] connector = server.getConnectors(); conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); - URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); + URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort()); PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>(); outputOperator.setUri(uri); diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java index 288da84..55d98aa 100644 --- a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java @@ -23,6 +23,7 @@ import java.net.URI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; @@ -48,10 +49,8 @@ public class MRMonitoringApplication implements StreamingApplication @Override public void populateDAG(DAG dag, Configuration conf) { - String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator()); - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); - logger.info("WebSocket with daemon at {}", daemonAddress); + URI uri = PubSubHelper.getURI(dag); PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator()); wsIn.setUri(uri); diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java index be7edfb..225ea25 100644 --- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java @@ -18,9 +18,9 @@ */ package org.apache.apex.examples.twitter; -import java.net.URI; -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.Operator.InputPort; @@ -174,13 +174,11 @@ public class KinesisHashtagsApplication implements StreamingApplication private InputPort<Object> consoleOutput(DAG dag, String operatorName) { - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(gatewayAddress)) { - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + if (PubSubHelper.isGatewayConfigured(dag)) { String topic = "examples.twitter." + operatorName; //LOG.info("WebSocket with gateway at: {}", gatewayAddress); PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java index ee43383..77384a8 100644 --- a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java @@ -22,11 +22,10 @@ import java.net.URI; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Maps; - import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -34,7 +33,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; - import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.appdata.schemas.SchemaUtils; @@ -189,9 +187,8 @@ public class TwitterTopCounterApplication implements StreamingApplication public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias) { - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(gatewayAddress)) { - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + if (PubSubHelper.isGatewayConfigured(dag)) { + URI uri = PubSubHelper.getURI(dag); AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java index 699469b..440b30a 100644 --- a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java @@ -22,15 +22,13 @@ import java.net.URI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; - import com.datatorrent.lib.appdata.schemas.SchemaUtils; import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; import com.datatorrent.lib.io.ConsoleOutputOperator; @@ -84,10 +82,8 @@ public class ApplicationWithQuerySupport implements StreamingApplication dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input); dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input); - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - - if (!StringUtils.isEmpty(gatewayAddress)) { // add query support - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + if (PubSubHelper.isGatewayConfigured(dag)) { // add query support + URI uri = PubSubHelper.getURI(dag); AppDataSnapshotServerMap snapshotServerFile = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java index 3f2029e..9b1e0cf 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java @@ -19,14 +19,13 @@ package com.datatorrent.lib.io; import java.net.URI; -import java.net.URISyntaxException; - import javax.validation.constraints.Min; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; @@ -116,10 +115,8 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St } try { - uri = new URI("ws://" - + context.getValue(DAG.GATEWAY_CONNECT_ADDRESS) - + "/pubsub"); - } catch (URISyntaxException ex) { + uri = PubSubHelper.getURI(context); + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java index b027b58..a9030d4 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WidgetOutputOperator.java @@ -20,17 +20,15 @@ package com.datatorrent.lib.io; import java.io.IOException; import java.lang.reflect.Array; -import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import com.google.common.collect.Maps; - import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; @@ -129,9 +127,8 @@ public class WidgetOutputOperator extends BaseOperator @Override public void setup(OperatorContext context) { - String gatewayAddress = context.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(gatewayAddress)) { - wsoo.setUri(URI.create("ws://" + gatewayAddress + "/pubsub")); + if (PubSubHelper.isGatewayConfigured(context)) { + wsoo.setUri(PubSubHelper.getURI(context)); wsoo.setup(context); } else { isWebSocketConnected = false; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java new file mode 100644 index 0000000..51eaeee --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Context; + +@InterfaceStability.Evolving +public class PubSubHelper +{ + private static final Logger logger = LoggerFactory.getLogger(PubSubHelper.class); + + public static boolean isGatewayConfigured(Context context) + { + return context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS) != null; + } + + public static URI getURI(Context context) + { + return getURIWithDefault(context, null); + } + + public static URI getURIWithDefault(Context context, String defaultAddress) + { + String address = context.getValue(Context.DAGContext.GATEWAY_CONNECT_ADDRESS); + if (address == null) { + address = defaultAddress; + } + return getURI(address, context.getValue(Context.DAGContext.GATEWAY_USE_SSL)); + } + + public static URI getURI(String address) + { + return getURI(address, false); + } + + public static URI getURI(String address, boolean useSSL) + { + if (address == null) { + throw new NullPointerException("No address specified"); + } + String uri = (useSSL ? "wss://" : "ws://") + address + "/pubsub"; + logger.debug("PubSub uri {}", uri); + return URI.create(uri); + } +} diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java index 7801619..fc49bea 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java @@ -20,25 +20,25 @@ package com.datatorrent.lib.io; import java.lang.reflect.Method; import java.net.URI; -import java.net.URISyntaxException; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import com.datatorrent.common.experimental.AppData; public abstract class PubSubWebSocketAppDataOperatorTest { public static final String GATEWAY_CONNECT_ADDRESS_STRING = "my.gateway.com"; - public static final String URI_ADDRESS_STRING = "ws://localhost:6666/pubsub"; + public static final String URI_ADDRESS_STRING = "localhost:6666"; public static final URI GATEWAY_CONNECT_ADDRESS; public static final URI URI_ADDRESS; static { try { - GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub"); - URI_ADDRESS = new URI(URI_ADDRESS_STRING); - } catch (URISyntaxException ex) { + GATEWAY_CONNECT_ADDRESS = PubSubHelper.getURI(GATEWAY_CONNECT_ADDRESS_STRING); + URI_ADDRESS = PubSubHelper.getURI(URI_ADDRESS_STRING); + } catch (Exception ex) { throw new RuntimeException(ex); } } diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java index e165649..7f1e4dd 100644 --- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java @@ -30,6 +30,8 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.utils.PubSubHelper; + import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -51,7 +53,7 @@ public class PubSubWebSocketOperatorTest contextHandler.addServlet(sh, "/*"); server.start(); Connector[] connector = server.getConnectors(); - URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); + URI uri = PubSubHelper.getURI("localhost:" + connector[0].getLocalPort()); PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>(); outputOperator.setUri(uri); -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].