http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java new file mode 100644 index 0000000..6765928 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +import com.google.protobuf.ByteString; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.KeyStore; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.ManagedChannel; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyChannelBuilder; +import io.netty.handler.ssl.SslContextBuilder; + +@SupportsBatching +@Tags({"grpc", "rpc", "client"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Sends FlowFiles, optionally with content, to a configurable remote gRPC service endpoint. The remote gRPC service must abide by the service IDL defined in NiFi. " + + " gRPC isn't intended to carry large payloads, so this processor should be used only when FlowFile" + + " sizes are on the order of megabytes. The default maximum message size is 4MB.") +@WritesAttributes({ + @WritesAttribute(attribute = "invokegrpc.response.code", description = "The response code that is returned (0 = ERROR, 1 = SUCCESS, 2 = RETRY)"), + @WritesAttribute(attribute = "invokegrpc.response.body", description = "The response message that is returned"), + @WritesAttribute(attribute = "invokegrpc.service.host", description = "The remote gRPC service hostname"), + @WritesAttribute(attribute = "invokegrpc.service.port", description = "The remote gRPC service port"), + @WritesAttribute(attribute = "invokegrpc.java.exception.class", description = "The Java exception class raised when the processor fails"), + @WritesAttribute(attribute = "invokegrpc.java.exception.message", description = "The Java exception message raised when the processor fails"), +}) +public class InvokeGRPC extends AbstractProcessor { + public static final String RESPONSE_CODE = "invokegrpc.response.code"; + public static final String RESPONSE_BODY = "invokegrpc.response.body"; + public static final String SERVICE_HOST = "invokegrpc.service.host"; + public static final String SERVICE_PORT = "invokegrpc.service.port"; + public static final String EXCEPTION_CLASS = "invokegrpc.java.exception.class"; + public static final String EXCEPTION_MESSAGE = "invokegrpc.java.exception.message"; + + // properties + public static final PropertyDescriptor PROP_SERVICE_HOST = new PropertyDescriptor.Builder() + .name("Remote gRPC service hostname") + .description("Remote host which will be connected to") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder() + .name("Remote gRPC service port") + .description("Remote port which will be connected to") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("Max Message Size") + .description("The maximum size of FlowFiles that this processor will allow to be received." + + " The default is 4MB. If FlowFiles exceed this size, you should consider using another transport mechanism" + + " as gRPC isn't designed for heavy payloads.") + .defaultValue("4MB") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder() + .name("Use SSL/TLS") + .description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor PROP_SEND_CONTENT = new PropertyDescriptor.Builder() + .name("Send FlowFile Content") + .description("Whether or not to include the FlowFile content in the FlowFileRequest to the gRPC service.") + .required(false) + .defaultValue("true") + .allowableValues("true", "false") + .build(); + public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder() + .name("Penalize on \"No Retry\"") + .description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder() + .name("Always Output Response") + .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is " + + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the " + + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + PROP_SERVICE_HOST, + PROP_SERVICE_PORT, + PROP_MAX_MESSAGE_SIZE, + PROP_USE_SECURE, + PROP_SSL_CONTEXT_SERVICE, + PROP_SEND_CONTENT, + PROP_OUTPUT_RESPONSE_REGARDLESS, + PROP_PENALIZE_NO_RETRY + )); + + // relationships + public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() + .name("Original") + .description("The original FlowFile will be routed upon success. It will have new attributes detailing the " + + "success of the request.") + .build(); + public static final Relationship REL_RESPONSE = new Relationship.Builder() + .name("Response") + .description("A Response FlowFile will be routed upon success. If the 'Output Response Regardless' property " + + "is true then the response will be sent to this relationship regardless of the status code received.") + .build(); + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("Retry") + .description("The original FlowFile will be routed on any status code that can be retried. It will have new " + + "attributes detailing the request.") + .build(); + public static final Relationship REL_NO_RETRY = new Relationship.Builder() + .name("No Retry") + .description("The original FlowFile will be routed on any status code that should NOT be retried. " + + "It will have new attributes detailing the request.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("Failure") + .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. " + + "It will have new attributes detailing the request.") + .build(); + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS_REQ, + REL_NO_RETRY, + REL_RESPONSE, + REL_RETRY, + REL_FAILURE + ))); + + private static final String USER_AGENT_PREFIX = "NiFi_invokeGRPC"; + // NOTE: you may need to add the sources generated after running `maven clean compile` to your IDE + // configured source directories. Otherwise, the classes generated when the proto is compiled won't + // be accessible from here. For IntelliJ, open this module's settings and mark the following as source directories: + // + // * target/generated-sources/protobuf/grpc-java + // * target/generated-sources/protobuf/java + private final AtomicReference<FlowFileServiceGrpc.FlowFileServiceBlockingStub> blockingStubReference = new AtomicReference<>(); + private final AtomicReference<ManagedChannel> channelReference = new AtomicReference<>(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + /** + * Whenever this processor is triggered, we need to construct a client in order to communicate + * with the configured gRPC service. + * + * @param context the processor context + */ + @OnScheduled + public void initializeClient(final ProcessContext context) throws Exception { + + channelReference.set(null); + blockingStubReference.set(null); + final ComponentLog logger = getLogger(); + + final String host = context.getProperty(PROP_SERVICE_HOST).getValue(); + final int port = context.getProperty(PROP_SERVICE_PORT).asInteger(); + final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + String userAgent = USER_AGENT_PREFIX; + try { + userAgent += "_" + InetAddress.getLocalHost().getHostName(); + } catch (final UnknownHostException e) { + logger.warn("Unable to determine local hostname. Defaulting gRPC user agent to {}.", new Object[]{USER_AGENT_PREFIX}, e); + } + + final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port) + // supports both gzip and plaintext, but will compress by default. + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .maxInboundMessageSize(maxMessageSize) + .userAgent(userAgent); + + // configure whether or not we're using secure comms + final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); + final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); + + if (useSecure && sslContext != null) { + SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); + if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) { + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(), + sslContext.getProvider()); + final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType()); + try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) { + keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray()); + } + keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray()); + sslContextBuilder.keyManager(keyManagerFactory); + } + + if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) { + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(), + sslContext.getProvider()); + final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType()); + try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) { + trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray()); + } + trustManagerFactory.init(trustStore); + sslContextBuilder.trustManager(trustManagerFactory); + } + nettyChannelBuilder.sslContext(sslContextBuilder.build()); + + } else { + nettyChannelBuilder.usePlaintext(true); + } + + final ManagedChannel channel = nettyChannelBuilder.build(); + final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = FlowFileServiceGrpc.newBlockingStub(channel); + channelReference.set(channel); + blockingStubReference.set(blockingStub); + } + + /** + * Perform cleanup prior to JVM shutdown + * + * @param context the processor context + * @throws InterruptedException if there's an issue cleaning up + */ + @OnShutdown + public void shutdown(final ProcessContext context) throws InterruptedException { + // close the channel + final ManagedChannel channel = channelReference.get(); + if (channel != null) { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile fileToProcess = null; + if (context.hasIncomingConnection()) { + fileToProcess = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (fileToProcess == null && context.hasNonLoopConnection()) { + return; + } + } + + final ComponentLog logger = getLogger(); + final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = blockingStubReference.get(); + final String host = context.getProperty(PROP_SERVICE_HOST).getValue(); + final String port = context.getProperty(PROP_SERVICE_PORT).getValue(); + fileToProcess = session.putAttribute(fileToProcess, SERVICE_HOST, host); + fileToProcess = session.putAttribute(fileToProcess, SERVICE_PORT, port); + FlowFile responseFlowFile = null; + try { + final FlowFileRequest.Builder requestBuilder = FlowFileRequest.newBuilder() + .setId(fileToProcess.getId()) + .putAllAttributes(fileToProcess.getAttributes()); + + // if the processor is configured to send the content, turn the content into bytes + // and add it to the request. + final boolean sendContent = context.getProperty(PROP_SEND_CONTENT).asBoolean(); + if (sendContent) { + try (final InputStream contents = session.read(fileToProcess)) { + requestBuilder.setContent(ByteString.readFrom(contents)); + } + // emit provenance event + session.getProvenanceReporter().send(fileToProcess, getRemote(host, port), true); + } + final FlowFileRequest flowFileRequest = requestBuilder.build(); + logRequest(logger, host, port, flowFileRequest); + + final FlowFileReply flowFileReply = blockingStub.send(flowFileRequest); + logReply(logger, host, port, flowFileReply); + + final FlowFileReply.ResponseCode responseCode = flowFileReply.getResponseCode(); + final String body = flowFileReply.getBody(); + + fileToProcess = session.putAttribute(fileToProcess, RESPONSE_CODE, String.valueOf(responseCode)); + fileToProcess = session.putAttribute(fileToProcess, RESPONSE_BODY, body); + + responseFlowFile = session.create(fileToProcess); + route(fileToProcess, responseFlowFile, session, context, responseCode); + + } catch (final Exception e) { + // penalize or yield + if (fileToProcess != null) { + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); + fileToProcess = session.penalize(fileToProcess); + fileToProcess = session.putAttribute(fileToProcess, EXCEPTION_CLASS, e.getClass().getName()); + fileToProcess = session.putAttribute(fileToProcess, EXCEPTION_MESSAGE, e.getMessage()); + // transfer original to failure + session.transfer(fileToProcess, REL_FAILURE); + } else { + logger.error("Yielding processor due to exception encountered as a source processor: {}", e); + context.yield(); + } + + // cleanup + try { + if (responseFlowFile != null) { + session.remove(responseFlowFile); + } + } catch (final Exception e1) { + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1); + } + } + } + + /** + * Route the {@link FlowFile} request and response appropriately, depending on the gRPC service + * response code. + * + * @param request the flowfile request + * @param response the flowfile response + * @param session the processor session + * @param context the processor context + * @param responseCode the gRPC service response code + */ + private void route(FlowFile request, FlowFile response, final ProcessSession session, final ProcessContext context, final FlowFileReply.ResponseCode responseCode) { + boolean responseSent = false; + if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) { + session.transfer(response, REL_RESPONSE); + responseSent = true; + } + + switch (responseCode) { + // if the rpc failed, transfer flowfile to no retry relationship and penalize flowfile + + + // if the rpc succeeded, transfer the request and response flowfiles + case SUCCESS: + session.transfer(request, REL_SUCCESS_REQ); + if (!responseSent) { + session.transfer(response, REL_RESPONSE); + } + break; + + // if the gRPC service responded requesting a retry, then penalize the request and + // transfer it to the retry relationship. The flowfile contains attributes detailing this + // rpc request. + case RETRY: + request = session.penalize(request); + session.transfer(request, REL_RETRY); + // if we haven't sent the response by this point, clean it up. + if (!responseSent) { + session.remove(response); + } + break; + + case ERROR: + case UNRECOGNIZED: // unrecognized response code returned from gRPC service + default: + final boolean penalize = context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean(); + if (penalize) { + request = session.penalize(request); + } + session.transfer(request, REL_NO_RETRY); + // if we haven't sent the response by this point, clean it up. + if (!responseSent) { + session.remove(response); + } + break; + } + } + + private String getRemote(final String host, final String port) { + return host + ":" + port; + } + + private void logRequest(final ComponentLog logger, final String host, final String port, final FlowFileRequest flowFileRequest) { + logger.debug("\nRequest to remote service:\n\t{}\n{}", + new Object[]{getRemote(host, port), flowFileRequest.toString()}); + } + + private void logReply(final ComponentLog logger, final String host, final String port, final FlowFileReply flowFileReply) { + logger.debug("\nResponse from remote service:\n\t{}\n{}", + new Object[]{getRemote(host, port), flowFileReply.toString()}); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java new file mode 100644 index 0000000..64405af --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +import com.google.common.collect.Sets; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.Server; +import io.grpc.ServerInterceptors; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyServerBuilder; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContextBuilder; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Starts a gRPC server and listens on the given port to transform the incoming messages into FlowFiles." + + " The message format is defined by the standard gRPC protobuf IDL provided by NiFi. gRPC isn't intended to carry large payloads," + + " so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.") +@Tags({"ingest", "grpc", "rpc", "listen"}) +@WritesAttributes({ + @WritesAttribute(attribute = "listengrpc.remote.user.dn", description = "The DN of the user who sent the FlowFile to this NiFi"), + @WritesAttribute(attribute = "listengrpc.remote.host", description = "The IP of the client who sent the FlowFile to this NiFi") +}) +public class ListenGRPC extends AbstractSessionFactoryProcessor { + public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn"; + public static final String REMOTE_HOST = "listengrpc.remote.host"; + + // properties + public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder() + .name("Local gRPC service port") + .description("The local port that the gRPC service will listen on.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder() + .name("Use SSL/TLS") + .description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor PROP_FLOW_CONTROL_WINDOW = new PropertyDescriptor.Builder() + .name("Flow Control Window") + .description("The initial HTTP/2 flow control window for both new streams and overall connection." + + " Flow-control schemes ensure that streams on the same connection do not destructively interfere with each other." + + " The default is 1MB.") + .defaultValue("1MB") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("Max Message Size") + .description("The maximum size of FlowFiles that this processor will allow to be received." + + " The default is 4MB. If FlowFiles exceed this size, you should consider using another transport mechanism" + + " as gRPC isn't designed for heavy payloads.") + .defaultValue("4MB") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder() + .name("Authorized DN Pattern") + .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.") + .required(true) + .defaultValue(".*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + PROP_SERVICE_PORT, + PROP_USE_SECURE, + PROP_SSL_CONTEXT_SERVICE, + PROP_FLOW_CONTROL_WINDOW, + PROP_AUTHORIZED_DN_PATTERN, + PROP_MAX_MESSAGE_SIZE + )); + + // relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("Success") + .description("The FlowFile was received successfully.") + .build(); + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(Sets.newHashSet(Arrays.asList( + REL_SUCCESS + ))); + private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>(); + private volatile Server server = null; + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + + @OnScheduled + public void startServer(final ProcessContext context) throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException { + final ComponentLog logger = getLogger(); + // gather configured properties + final Integer port = context.getProperty(PROP_SERVICE_PORT).asInteger(); + final Boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); + final Integer flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue(); + final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); + final Pattern authorizedDnPattern = Pattern.compile(context.getProperty(PROP_AUTHORIZED_DN_PATTERN).getValue()); + final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger()); + callInterceptor.enforceDNPattern(authorizedDnPattern); + + final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(), + sessionFactoryReference, + context); + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) + .addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor)) + // default (de)compressor registries handle both plaintext and gzip compressed messages + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .flowControlWindow(flowControlWindow) + .maxMessageSize(maxMessageSize); + + if (useSecure && sslContext != null) { + // construct key manager + if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) { + throw new IllegalStateException("SSL is enabled, but no keystore has been configured. You must configure a keystore."); + } + + final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(), + sslContext.getProvider()); + final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType()); + try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) { + keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray()); + } + keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray()); + + SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory); + + // if the trust store is configured, then client auth is required. + if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) { + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(), + sslContext.getProvider()); + final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType()); + try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) { + trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray()); + } + trustManagerFactory.init(trustStore); + sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + } else { + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.NONE); + } + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); + serverBuilder = serverBuilder.sslContext(sslContextBuilder.build()); + } + logger.info("Starting gRPC server on port: {}", new Object[]{port.toString()}); + this.server = serverBuilder.build().start(); + } + + @OnStopped + public void stopServer(final ProcessContext context) { + if (this.server != null) { + try { + this.server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + getLogger().warn("Unable to cleanly shutdown embedded gRPC server due to {}", new Object[]{e}); + this.server = null; + } + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + sessionFactoryReference.compareAndSet(null, sessionFactory); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..10609a7 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.grpc.InvokeGRPC +org.apache.nifi.processors.grpc.ListenGRPC http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto new file mode 100644 index 0000000..1eb18a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: you may need to add the sources generated when running `maven clean compile` to your IDE +// configured source directories. Otherwise, the classes generated when the proto is compiled won't +// be accessible. For IntelliJ, open this module's settings and mark the following as source directories: +// +// * target/generated-sources/protobuf/grpc-java +// * target/generated-sources/protobuf/java + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "org.apache.nifi.processors.grpc"; +option java_outer_classname = "FFSProto"; +option objc_class_prefix = "FFS"; + +package org.apache.nifi.processors.grpc; + +// The FlowFile service definition. +service FlowFileService { + // Sends a FlowFile (blocking rpc) + rpc Send (FlowFileRequest) returns (FlowFileReply) {} +} + +// The request message +message FlowFileRequest { + // tags 1-15 require one byte to encode and should be left for commonly occurring tags. + // For that reason, tags 3-14 are left available. + // + // source: https://developers.google.com/protocol-buffers/docs/proto3#assigning-tags + int64 id = 1; + map<string, string> attributes = 2; + bytes content = 15; +} + +// the reply message +message FlowFileReply { + enum ResponseCode { + ERROR = 0; + SUCCESS = 1; + RETRY = 2; + } + + ResponseCode responseCode = 1; + string body = 2; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java new file mode 100644 index 0000000..1dccbb9 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +import org.apache.nifi.ssl.StandardSSLContextService; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.ManagedChannel; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyChannelBuilder; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContextBuilder; + +import static org.apache.nifi.processors.grpc.TestGRPCServer.NEED_CLIENT_AUTH; + +/** + * Generic gRPC channel builder for use in unit testing. Consumers should use the channel built here + * to construct the desired stubs to communicate with a gRPC service. + */ +public class TestGRPCClient { + // Used to represent the ephemeral port range. + private static final int PORT_START = 49152; + private static final int PORT_END = 65535; + + /** + * Can be used by clients to grab a random port in a range of ports + * + * @return a port to use for client/server comms + */ + public static int randomPort() { + // add 1 because upper bound is exclusive + return ThreadLocalRandom.current().nextInt(PORT_START, PORT_END + 1); + } + + /** + * Build a channel with the given host and port and optional ssl properties. + * + * @param host the host to establish a connection with + * @param port the port on which to communicate with the host + * @return a constructed channel + */ + public static ManagedChannel buildChannel(final String host, final int port) + throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException { + return buildChannel(host, port, null); + } + + /** + * Build a channel with the given host and port and optional ssl properties. + * + * @param host the host to establish a connection with + * @param port the port on which to communicate with the host + * @param sslProperties the properties by which to establish an ssl connection + * @return a constructed channel + */ + public static ManagedChannel buildChannel(final String host, final int port, final Map<String, String> sslProperties) + throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException { + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port) + .directExecutor() + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) + .userAgent("testAgent"); + + if (sslProperties != null) { + SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); + + if(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) { + final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyStore keyStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + final String keyStoreFile = sslProperties.get(StandardSSLContextService.KEYSTORE.getName()); + final String keyStorePassword = sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()); + try (final InputStream is = new FileInputStream(keyStoreFile)) { + keyStore.load(is, keyStorePassword.toCharArray()); + } + keyManager.init(keyStore, keyStorePassword.toCharArray()); + sslContextBuilder = sslContextBuilder.keyManager(keyManager); + } + + if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + final KeyStore trustStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + final String trustStoreFile = sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()); + final String trustStorePassword = sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()); + try (final InputStream is = new FileInputStream(trustStoreFile)) { + trustStore.load(is, trustStorePassword.toCharArray()); + } + trustManagerFactory.init(trustStore); + sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if (clientAuth == null) { + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + } else { + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); + } + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); + channelBuilder = channelBuilder.sslContext(sslContextBuilder.build()); + } else { + channelBuilder.usePlaintext(true); + } + return channelBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java new file mode 100644 index 0000000..7aeced8 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.security.KeyStore; +import java.util.Map; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import org.apache.nifi.ssl.StandardSSLContextService; + +import io.grpc.BindableService; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.Server; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyServerBuilder; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContextBuilder; + +/** + * Generic gRPC test server to assist with unit tests that require a server to be present. + * + * @param <T> the gRPC service implementation + */ +public class TestGRPCServer<T extends BindableService> { + public static final String HOST = "localhost"; + public static final String NEED_CLIENT_AUTH = "needClientAuth"; + private final Class<T> clazz; + private Server server; + private Map<String, String> sslProperties; + + /** + * Create a gRPC server + * + * @param clazz the gRPC service implementation + */ + public TestGRPCServer(final Class<T> clazz) { + this(clazz, null); + } + + + /** + * Create a gRPC server + * + * @param clazz the gRPC service implementation + * @param sslProperties the keystore and truststore properties for SSL communications + */ + public TestGRPCServer(final Class<T> clazz, final Map<String, String> sslProperties) { + this.clazz = clazz; + this.sslProperties = sslProperties; + } + + /** + * Can be used by clients to grab a random port in a range of ports + * + * @return a port to use for client/server comms + */ + public static int randomPort() throws IOException { + ServerSocket socket = new ServerSocket(0); + socket.setReuseAddress(true); + final int port = socket.getLocalPort(); + socket.close(); + return port; + } + + /** + * Starts the gRPC server @localhost:port. + */ + public void start(final int port) throws Exception { + final NettyServerBuilder nettyServerBuilder = NettyServerBuilder + .forPort(port) + .directExecutor() + .addService(clazz.newInstance()) + .compressorRegistry(CompressorRegistry.getDefaultInstance()) + .decompressorRegistry(DecompressorRegistry.getDefaultInstance()); + + if (this.sslProperties != null) { + if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) == null) { + throw new RuntimeException("You must configure a keystore in order to use SSL with gRPC."); + } + + final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyStore keyStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + final String keyStoreFile = sslProperties.get(StandardSSLContextService.KEYSTORE.getName()); + final String keyStorePassword = sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()); + try (final InputStream is = new FileInputStream(keyStoreFile)) { + keyStore.load(is, keyStorePassword.toCharArray()); + } + keyManager.init(keyStore, keyStorePassword.toCharArray()); + SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager); + + if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { + final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + final KeyStore trustStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + final String trustStoreFile = sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()); + final String trustStorePassword = sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()); + try (final InputStream is = new FileInputStream(trustStoreFile)) { + trustStore.load(is, trustStorePassword.toCharArray()); + } + trustManagerFactory.init(trustStore); + sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if (clientAuth == null) { + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + } else { + sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); + } + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); + nettyServerBuilder.sslContext(sslContextBuilder.build()); + } + + server = nettyServerBuilder.build().start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + TestGRPCServer.this.stop(); + System.err.println("*** server shut down"); + } + }); + } + + /** + * Stop the server. + */ + void stop() { + if (server != null) { + server.shutdown(); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + public void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java new file mode 100644 index 0000000..f709a19 --- /dev/null +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.grpc; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.grpc.stub.StreamObserver; +import io.netty.handler.ssl.ClientAuth; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class TestInvokeGRPC { + // ids placed on flowfiles and used to dictate response codes in the DummyFlowFileService below + private static final long ERROR = 500; + private static final long SUCCESS = 501; + private static final long RETRY = 502; + + @Test + public void testSuccess() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ); + assertThat(successFiles.size(), equalTo(1)); + final MockFlowFile successFile = successFiles.get(0); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testSuccessWithFlowFileContent() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + + runner.enqueue("content"); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ); + assertThat(successFiles.size(), equalTo(1)); + final MockFlowFile successFile = successFiles.get(0); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content"); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testSuccessAlwaysOutputResponse() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true"); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ); + assertThat(successFiles.size(), equalTo(1)); + final MockFlowFile successFile = successFiles.get(0); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testExceedMaxMessageSize() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + // set max message size to 1B to force error + runner.setProperty(InvokeGRPC.PROP_MAX_MESSAGE_SIZE, "1B"); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + // an exception should be thrown indicating that the max message size was exceeded. + response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException"); + } finally { + server.stop(); + } + } + + @Test + public void testRetry() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + + final MockFlowFile mockFlowFile = new MockFlowFile(RETRY); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + runner.assertPenalizeCount(1); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testRetryAlwaysOutputResponse() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true"); + + final MockFlowFile mockFlowFile = new MockFlowFile(RETRY); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + runner.assertPenalizeCount(1); + + final List<MockFlowFile> retryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY); + assertThat(retryFiles.size(), equalTo(1)); + final MockFlowFile retry = retryFiles.get(0); + retry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY)); + retry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry"); + retry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + retry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testNoRetryOnError() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + + final MockFlowFile mockFlowFile = new MockFlowFile(ERROR); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testNoRetryOnErrorAlwaysOutputResponseAndPenalize() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true"); + runner.setProperty(InvokeGRPC.PROP_PENALIZE_NO_RETRY, "true"); + + final MockFlowFile mockFlowFile = new MockFlowFile(ERROR); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + runner.assertPenalizeCount(1); + + final List<MockFlowFile> noRetryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY); + assertThat(noRetryFiles.size(), equalTo(1)); + final MockFlowFile noRetry = noRetryFiles.get(0); + noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR)); + noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error"); + noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testNoInput() throws Exception { + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class); + + try { + final int port = TestGRPCServer.randomPort(); + server.start(port); + + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + runner.assertPenalizeCount(0); + } finally { + server.stop(); + } + } + + @Test + public void testServerConnectionFail() throws Exception { + + final int port = TestGRPCServer.randomPort(); + + // should be no gRPC server running @ that port, so processor will fail + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, Integer.toString(port)); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, Integer.toString(port)); + response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException"); + + } + + @Test + public void testSecureTwoWaySsl() throws Exception { + final Map<String, String> sslProperties = getKeystoreProperties(); + sslProperties.putAll(getTruststoreProperties()); + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties); + + try { + final int port = TestGRPCServer.randomPort(); + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true"); + useSSLContextService(runner, sslProperties); + server.start(port); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ); + assertThat(successFiles.size(), equalTo(1)); + final MockFlowFile successFile = successFiles.get(0); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + @Test + public void testSecureOneWaySsl() throws Exception { + final Map<String, String> sslProperties = getKeystoreProperties(); + sslProperties.put(TestGRPCServer.NEED_CLIENT_AUTH, ClientAuth.NONE.name()); + final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties); + + try { + final int port = TestGRPCServer.randomPort(); + final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class); + runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST); + runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port)); + runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true"); + useSSLContextService(runner, getTruststoreProperties()); + server.start(port); + + final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS); + runner.enqueue(mockFlowFile); + runner.run(); + runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0); + + final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE); + assertThat(responseFiles.size(), equalTo(1)); + final MockFlowFile response = responseFiles.get(0); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + + final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ); + assertThat(successFiles.size(), equalTo(1)); + final MockFlowFile successFile = successFiles.get(0); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS)); + successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success"); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST); + successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port)); + } finally { + server.stop(); + } + } + + private static Map<String, String> getTruststoreProperties() { + final Map<String, String> props = new HashMap<>(); + props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return props; + } + + private static Map<String, String> getKeystoreProperties() { + final Map<String, String> properties = new HashMap<>(); + properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + return properties; + } + + private void useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) { + final SSLContextService service = new StandardSSLContextService(); + try { + controller.addControllerService("ssl-service", service, sslProperties); + controller.enableControllerService(service); + } catch (InitializationException ex) { + ex.printStackTrace(); + Assert.fail("Could not create SSL Context Service"); + } + + controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE, "ssl-service"); + } + + /** + * Dummy gRPC service whose responses are dictated by the IDs on the messages it receives + */ + private static class DummyFlowFileService extends FlowFileServiceGrpc.FlowFileServiceImplBase { + + public DummyFlowFileService() { + } + + @Override + public void send(FlowFileRequest request, StreamObserver<FlowFileReply> responseObserver) { + final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder(); + // use the id to dictate response codes + final long id = request.getId(); + + if (id == ERROR) { + replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR) + .setBody("error"); + } else if (id == SUCCESS) { + replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS) + .setBody("success"); + } else if (id == RETRY) { + replyBuilder.setResponseCode(FlowFileReply.ResponseCode.RETRY) + .setBody("retry"); + // else, assume the request is to include the flowfile content in the response + } else { + replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS) + .setBody(request.getContent().toStringUtf8()); + + } + responseObserver.onNext(replyBuilder.build()); + responseObserver.onCompleted(); + } + } +}