[
https://issues.apache.org/jira/browse/NIFI-1002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664804#comment-15664804
]
ASF GitHub Bot commented on NIFI-1002:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1184#discussion_r87873512
--- Diff:
nifi-nar-bundles/nifi-websocket-bundle/nifi-websocket-services-jetty/src/main/java/org/apache/nifi/websocket/jetty/JettyWebSocketServer.java
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.websocket.jetty;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.websocket.WebSocketConfigurationException;
+import org.apache.nifi.websocket.WebSocketMessageRouter;
+import org.apache.nifi.websocket.WebSocketServerService;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHandler;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketPolicy;
+import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Tags({"WebSocket", "Jetty", "server"})
+@CapabilityDescription("Implementation of WebSocketServerService." +
+ " This service uses Jetty WebSocket server module to provide" +
+ " WebSocket session management throughout the application.")
+public class JettyWebSocketServer extends AbstractJettyWebSocketService
implements WebSocketServerService {
+
+ /**
+ * A global map to refer a controller service instance by requested
port number.
+ */
+ private static final Map<Integer, JettyWebSocketServer>
portToControllerService = new ConcurrentHashMap<>();
+
+ // Allowable values for client auth
+ public static final AllowableValue CLIENT_NONE = new
AllowableValue("no", "No Authentication",
+ "Processor will not authenticate clients. Anyone can
communicate with this Processor anonymously");
+ public static final AllowableValue CLIENT_WANT = new
AllowableValue("want", "Want Authentication",
+ "Processor will try to verify the client but if unable to
verify will allow the client to communicate anonymously");
+ public static final AllowableValue CLIENT_NEED = new
AllowableValue("need", "Need Authentication",
+ "Processor will reject communications from any client unless
the client provides a certificate that is trusted by the TrustStore"
+ + "specified in the SSL Context Service");
+
+ public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
+ .name("client-authentication")
+ .displayName("Client Authentication")
+ .description("Specifies whether or not the Processor should
authenticate clients. This value is ignored if the <SSL Context Service> "
+ + "Property is not specified or the SSL Context
provided uses only a KeyStore and not a TrustStore.")
+ .required(true)
+ .allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
+ .defaultValue(CLIENT_NONE.getValue())
+ .build();
+
+ public static final PropertyDescriptor LISTEN_PORT = new
PropertyDescriptor.Builder()
+ .name("listen-port")
+ .displayName("Listen Port")
+ .description("The port number on which this WebSocketServer
listens to.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ private static final List<PropertyDescriptor> properties;
+
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.addAll(getAbstractPropertyDescriptors());
+ props.add(LISTEN_PORT);
+ props.add(SSL_CONTEXT);
+ props.add(CLIENT_AUTH);
+
+ properties = Collections.unmodifiableList(props);
+ }
+
+ private WebSocketPolicy configuredPolicy;
+ private Server server;
+ private Integer listenPort;
+ private ServletHandler servletHandler;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+
+ public static class JettyWebSocketServlet extends WebSocketServlet
implements WebSocketCreator {
+ @Override
+ public void configure(WebSocketServletFactory
webSocketServletFactory) {
+ webSocketServletFactory.setCreator(this);
+ }
+
+ @Override
+ public Object createWebSocket(ServletUpgradeRequest
servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
+ final URI requestURI = servletUpgradeRequest.getRequestURI();
+ final int port = requestURI.getPort();
+ final JettyWebSocketServer service =
portToControllerService.get(port);
+
+ if (service == null) {
+ throw new RuntimeException("No controller service is bound
with port: " + port);
+ }
+
+ final String path = requestURI.getPath();
+ final WebSocketMessageRouter router;
+ try {
+ router = service.routers.getRouterOrFail(path);
+ } catch (WebSocketConfigurationException e) {
+ throw new IllegalStateException("Failed to get router due
to: " + e, e);
+ }
+
+ final RoutingWebSocketListener listener = new
RoutingWebSocketListener(router) {
+ @Override
+ public void onWebSocketConnect(Session session) {
+ final WebSocketPolicy currentPolicy =
session.getPolicy();
+
currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
+
currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
+
currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
+ super.onWebSocketConnect(session);
+ }
+ };
+
+ return listener;
+ }
+ }
+
+ @OnEnabled
+ @Override
+ public void startServer(final ConfigurationContext context) throws
Exception {
+
+ if (server != null && server.isRunning()) {
+ getLogger().info("A WebSocket server is already running. {}",
new Object[]{server});
+ return;
+ }
+
+ configuredPolicy = WebSocketPolicy.newServerPolicy();
+ configurePolicy(context, configuredPolicy);
+
+ server = new Server();
+
+ final ContextHandlerCollection handlerCollection = new
ContextHandlerCollection();
+
+ final ServletContextHandler contextHandler = new
ServletContextHandler();
+ servletHandler = new ServletHandler();
+ contextHandler.insertHandler(servletHandler);
+
+ handlerCollection.setHandlers(new Handler[]{contextHandler});
+
+ server.setHandler(handlerCollection);
+
+ listenPort = context.getProperty(LISTEN_PORT).asInteger();
+ final SslContextFactory sslContextFactory =
createSslFactory(context);
+
+ final ServerConnector serverConnector =
createConnector(sslContextFactory, listenPort);
+
+ server.setConnectors(new Connector[] {serverConnector});
+
+ servletHandler.addServletWithMapping(JettyWebSocketServlet.class,
"/*");
+
+ // Need to specify classloader, otherwise since the callstack
doesn't have any nifi specific class, so it can't use nar.
+ try (NarCloseable closeable =
NarCloseable.withComponentNarLoader(WebSocketServerFactory.class)) {
--- End diff --
This no longer compiles after recent work on dynamic class loading by
@bbende
> support for Listen WebSocket processor
> ---------------------------------------
>
> Key: NIFI-1002
> URL: https://issues.apache.org/jira/browse/NIFI-1002
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 0.4.0
> Reporter: sumanth chinthagunta
> Priority: Minor
> Labels: newbie
>
> A WebSocket listen processor will be helpful for IoT data ingestion.
> I am playing with embedded Vert.X for WebSocket and also ability to put
> FlowFiles back to WebSocket client via Vert.X EventBus.
> https://github.com/xmlking/nifi-websocket
> I am new to NiFi. any advise can be helpful.
> PS: I feel forcing Interfaces for Controller Services is unnecessary as in
> many cases Controller Services are only used by a set of Processors and
> developers usually bundle them together.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)