http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java
new file mode 100644
index 0000000..6765928
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.KeyStore;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.SslContextBuilder;
+
+@SupportsBatching
+@Tags({"grpc", "rpc", "client"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Sends FlowFiles, optionally with content, to a 
configurable remote gRPC service endpoint. The remote gRPC service must abide 
by the service IDL defined in NiFi. " +
+        " gRPC isn't intended to carry large payloads,  so this processor 
should be used only when FlowFile" +
+        " sizes are on the order of megabytes. The default maximum message 
size is 4MB.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "invokegrpc.response.code", description = 
"The response code that is returned (0 = ERROR, 1 = SUCCESS, 2 = RETRY)"),
+        @WritesAttribute(attribute = "invokegrpc.response.body", description = 
"The response message that is returned"),
+        @WritesAttribute(attribute = "invokegrpc.service.host", description = 
"The remote gRPC service hostname"),
+        @WritesAttribute(attribute = "invokegrpc.service.port", description = 
"The remote gRPC service port"),
+        @WritesAttribute(attribute = "invokegrpc.java.exception.class", 
description = "The Java exception class raised when the processor fails"),
+        @WritesAttribute(attribute = "invokegrpc.java.exception.message", 
description = "The Java exception message raised when the processor fails"),
+})
+public class InvokeGRPC extends AbstractProcessor {
+    public static final String RESPONSE_CODE = "invokegrpc.response.code";
+    public static final String RESPONSE_BODY = "invokegrpc.response.body";
+    public static final String SERVICE_HOST = "invokegrpc.service.host";
+    public static final String SERVICE_PORT = "invokegrpc.service.port";
+    public static final String EXCEPTION_CLASS = 
"invokegrpc.java.exception.class";
+    public static final String EXCEPTION_MESSAGE = 
"invokegrpc.java.exception.message";
+
+    // properties
+    public static final PropertyDescriptor PROP_SERVICE_HOST = new 
PropertyDescriptor.Builder()
+            .name("Remote gRPC service hostname")
+            .description("Remote host which will be connected to")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_SERVICE_PORT = new 
PropertyDescriptor.Builder()
+            .name("Remote gRPC service port")
+            .description("Remote port which will be connected to")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Message Size")
+            .description("The maximum size of FlowFiles that this processor 
will allow to be received." +
+                    " The default is 4MB. If FlowFiles exceed this size, you 
should consider using another transport mechanism" +
+                    " as gRPC isn't designed for heavy payloads.")
+            .defaultValue("4MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_USE_SECURE = new 
PropertyDescriptor.Builder()
+            .name("Use SSL/TLS")
+            .description("Whether or not to use SSL/TLS to send the contents 
of the gRPC messages.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+    public static final PropertyDescriptor PROP_SEND_CONTENT = new 
PropertyDescriptor.Builder()
+            .name("Send FlowFile Content")
+            .description("Whether or not to include the FlowFile content in 
the FlowFileRequest to the gRPC service.")
+            .required(false)
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .build();
+    public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new 
PropertyDescriptor.Builder()
+            .name("Penalize on \"No Retry\"")
+            .description("Enabling this property will penalize FlowFiles that 
are routed to the \"No Retry\" relationship.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+    public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = 
new PropertyDescriptor.Builder()
+            .name("Always Output Response")
+            .description("Will force a response FlowFile to be generated and 
routed to the 'Response' relationship regardless of what the server status code 
received is "
+                    + "or if the processor is configured to put the server 
response body in the request attribute. In the later configuration a request 
FlowFile with the "
+                    + "response body in the attribute and a typical response 
FlowFile will be emitted to their respective relationships.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_SERVICE_HOST,
+            PROP_SERVICE_PORT,
+            PROP_MAX_MESSAGE_SIZE,
+            PROP_USE_SECURE,
+            PROP_SSL_CONTEXT_SERVICE,
+            PROP_SEND_CONTENT,
+            PROP_OUTPUT_RESPONSE_REGARDLESS,
+            PROP_PENALIZE_NO_RETRY
+    ));
+
+    // relationships
+    public static final Relationship REL_SUCCESS_REQ = new 
Relationship.Builder()
+            .name("Original")
+            .description("The original FlowFile will be routed upon success. 
It will have new attributes detailing the "
+                    + "success of the request.")
+            .build();
+    public static final Relationship REL_RESPONSE = new Relationship.Builder()
+            .name("Response")
+            .description("A Response FlowFile will be routed upon success. If 
the 'Output Response Regardless' property "
+                    + "is true then the response will be sent to this 
relationship regardless of the status code received.")
+            .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("Retry")
+            .description("The original FlowFile will be routed on any status 
code that can be retried. It will have new "
+                    + "attributes detailing the request.")
+            .build();
+    public static final Relationship REL_NO_RETRY = new Relationship.Builder()
+            .name("No Retry")
+            .description("The original FlowFile will be routed on any status 
code that should NOT be retried.  "
+                    + "It will have new attributes detailing the request.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("Failure")
+            .description("The original FlowFile will be routed on any type of 
connection failure, timeout or general exception. "
+                    + "It will have new attributes detailing the request.")
+            .build();
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS_REQ,
+            REL_NO_RETRY,
+            REL_RESPONSE,
+            REL_RETRY,
+            REL_FAILURE
+    )));
+
+    private static final String USER_AGENT_PREFIX = "NiFi_invokeGRPC";
+    // NOTE: you may need to add the sources generated after running `maven 
clean compile` to your IDE
+    // configured source directories. Otherwise, the classes generated when 
the proto is compiled won't
+    // be accessible from here. For IntelliJ, open this module's settings and 
mark the following as source directories:
+    //
+    // * target/generated-sources/protobuf/grpc-java
+    // * target/generated-sources/protobuf/java
+    private final 
AtomicReference<FlowFileServiceGrpc.FlowFileServiceBlockingStub> 
blockingStubReference = new AtomicReference<>();
+    private final AtomicReference<ManagedChannel> channelReference = new 
AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    /**
+     * Whenever this processor is triggered, we need to construct a client in 
order to communicate
+     * with the configured gRPC service.
+     *
+     * @param context the processor context
+     */
+    @OnScheduled
+    public void initializeClient(final ProcessContext context) throws 
Exception {
+
+        channelReference.set(null);
+        blockingStubReference.set(null);
+        final ComponentLog logger = getLogger();
+
+        final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
+        final int port = context.getProperty(PROP_SERVICE_PORT).asInteger();
+        final Integer maxMessageSize = 
context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
+        String userAgent = USER_AGENT_PREFIX;
+        try {
+            userAgent += "_" + InetAddress.getLocalHost().getHostName();
+        } catch (final UnknownHostException e) {
+            logger.warn("Unable to determine local hostname. Defaulting gRPC 
user agent to {}.", new Object[]{USER_AGENT_PREFIX}, e);
+        }
+
+        final NettyChannelBuilder nettyChannelBuilder = 
NettyChannelBuilder.forAddress(host, port)
+                // supports both gzip and plaintext, but will compress by 
default.
+                .compressorRegistry(CompressorRegistry.getDefaultInstance())
+                
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
+                .maxInboundMessageSize(maxMessageSize)
+                .userAgent(userAgent);
+
+        // configure whether or not we're using secure comms
+        final boolean useSecure = 
context.getProperty(PROP_USE_SECURE).asBoolean();
+        final SSLContextService sslContextService = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+
+        if (useSecure && sslContext != null) {
+            SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+            if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
+                final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
+                        sslContext.getProvider());
+                final KeyStore keyStore = 
KeyStore.getInstance(sslContextService.getKeyStoreType());
+                try (final InputStream is = new 
FileInputStream(sslContextService.getKeyStoreFile())) {
+                    keyStore.load(is, 
sslContextService.getKeyStorePassword().toCharArray());
+                }
+                keyManagerFactory.init(keyStore, 
sslContextService.getKeyStorePassword().toCharArray());
+                sslContextBuilder.keyManager(keyManagerFactory);
+            }
+
+            if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
+                final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
+                        sslContext.getProvider());
+                final KeyStore trustStore = 
KeyStore.getInstance(sslContextService.getTrustStoreType());
+                try (final InputStream is = new 
FileInputStream(sslContextService.getTrustStoreFile())) {
+                    trustStore.load(is, 
sslContextService.getTrustStorePassword().toCharArray());
+                }
+                trustManagerFactory.init(trustStore);
+                sslContextBuilder.trustManager(trustManagerFactory);
+            }
+            nettyChannelBuilder.sslContext(sslContextBuilder.build());
+
+        } else {
+            nettyChannelBuilder.usePlaintext(true);
+        }
+
+        final ManagedChannel channel = nettyChannelBuilder.build();
+        final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = 
FlowFileServiceGrpc.newBlockingStub(channel);
+        channelReference.set(channel);
+        blockingStubReference.set(blockingStub);
+    }
+
+    /**
+     * Perform cleanup prior to JVM shutdown
+     *
+     * @param context the processor context
+     * @throws InterruptedException if there's an issue cleaning up
+     */
+    @OnShutdown
+    public void shutdown(final ProcessContext context) throws 
InterruptedException {
+        // close the channel
+        final ManagedChannel channel = channelReference.get();
+        if (channel != null) {
+            channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile fileToProcess = null;
+        if (context.hasIncomingConnection()) {
+            fileToProcess = session.get();
+
+            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming 
from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (fileToProcess == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        final ComponentLog logger = getLogger();
+        final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = 
blockingStubReference.get();
+        final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
+        final String port = context.getProperty(PROP_SERVICE_PORT).getValue();
+        fileToProcess = session.putAttribute(fileToProcess, SERVICE_HOST, 
host);
+        fileToProcess = session.putAttribute(fileToProcess, SERVICE_PORT, 
port);
+        FlowFile responseFlowFile = null;
+        try {
+            final FlowFileRequest.Builder requestBuilder = 
FlowFileRequest.newBuilder()
+                    .setId(fileToProcess.getId())
+                    .putAllAttributes(fileToProcess.getAttributes());
+
+            // if the processor is configured to send the content, turn the 
content into bytes
+            // and add it to the request.
+            final boolean sendContent = 
context.getProperty(PROP_SEND_CONTENT).asBoolean();
+            if (sendContent) {
+                try (final InputStream contents = session.read(fileToProcess)) 
{
+                    requestBuilder.setContent(ByteString.readFrom(contents));
+                }
+                // emit provenance event
+                session.getProvenanceReporter().send(fileToProcess, 
getRemote(host, port), true);
+            }
+            final FlowFileRequest flowFileRequest = requestBuilder.build();
+            logRequest(logger, host, port, flowFileRequest);
+
+            final FlowFileReply flowFileReply = 
blockingStub.send(flowFileRequest);
+            logReply(logger, host, port, flowFileReply);
+
+            final FlowFileReply.ResponseCode responseCode = 
flowFileReply.getResponseCode();
+            final String body = flowFileReply.getBody();
+
+            fileToProcess = session.putAttribute(fileToProcess, RESPONSE_CODE, 
String.valueOf(responseCode));
+            fileToProcess = session.putAttribute(fileToProcess, RESPONSE_BODY, 
body);
+
+            responseFlowFile = session.create(fileToProcess);
+            route(fileToProcess, responseFlowFile, session, context, 
responseCode);
+
+        } catch (final Exception e) {
+            // penalize or yield
+            if (fileToProcess != null) {
+                logger.error("Routing to {} due to exception: {}", new 
Object[]{REL_FAILURE.getName(), e}, e);
+                fileToProcess = session.penalize(fileToProcess);
+                fileToProcess = session.putAttribute(fileToProcess, 
EXCEPTION_CLASS, e.getClass().getName());
+                fileToProcess = session.putAttribute(fileToProcess, 
EXCEPTION_MESSAGE, e.getMessage());
+                // transfer original to failure
+                session.transfer(fileToProcess, REL_FAILURE);
+            } else {
+                logger.error("Yielding processor due to exception encountered 
as a source processor: {}", e);
+                context.yield();
+            }
+
+            // cleanup
+            try {
+                if (responseFlowFile != null) {
+                    session.remove(responseFlowFile);
+                }
+            } catch (final Exception e1) {
+                logger.error("Could not cleanup response flowfile due to 
exception: {}", new Object[]{e1}, e1);
+            }
+        }
+    }
+
+    /**
+     * Route the {@link FlowFile} request and response appropriately, 
depending on the gRPC service
+     * response code.
+     *
+     * @param request      the flowfile request
+     * @param response     the flowfile response
+     * @param session      the processor session
+     * @param context      the processor context
+     * @param responseCode the gRPC service response code
+     */
+    private void route(FlowFile request, FlowFile response, final 
ProcessSession session, final ProcessContext context, final 
FlowFileReply.ResponseCode responseCode) {
+        boolean responseSent = false;
+        if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
+            session.transfer(response, REL_RESPONSE);
+            responseSent = true;
+        }
+
+        switch (responseCode) {
+            // if the rpc failed, transfer flowfile to no retry relationship 
and penalize flowfile
+
+
+            // if the rpc succeeded, transfer the request and response 
flowfiles
+            case SUCCESS:
+                session.transfer(request, REL_SUCCESS_REQ);
+                if (!responseSent) {
+                    session.transfer(response, REL_RESPONSE);
+                }
+                break;
+
+            // if the gRPC service responded requesting a retry, then penalize 
the request and
+            // transfer it to the retry relationship. The flowfile contains 
attributes detailing this
+            // rpc request.
+            case RETRY:
+                request = session.penalize(request);
+                session.transfer(request, REL_RETRY);
+                // if we haven't sent the response by this point, clean it up.
+                if (!responseSent) {
+                    session.remove(response);
+                }
+                break;
+
+            case ERROR:
+            case UNRECOGNIZED: // unrecognized response code returned from 
gRPC service
+            default:
+                final boolean penalize = 
context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean();
+                if (penalize) {
+                    request = session.penalize(request);
+                }
+                session.transfer(request, REL_NO_RETRY);
+                // if we haven't sent the response by this point, clean it up.
+                if (!responseSent) {
+                    session.remove(response);
+                }
+                break;
+        }
+    }
+
+    private String getRemote(final String host, final String port) {
+        return host + ":" + port;
+    }
+
+    private void logRequest(final ComponentLog logger, final String host, 
final String port, final FlowFileRequest flowFileRequest) {
+        logger.debug("\nRequest to remote service:\n\t{}\n{}",
+                new Object[]{getRemote(host, port), 
flowFileRequest.toString()});
+    }
+
+    private void logReply(final ComponentLog logger, final String host, final 
String port, final FlowFileReply flowFileReply) {
+        logger.debug("\nResponse from remote service:\n\t{}\n{}",
+                new Object[]{getRemote(host, port), flowFileReply.toString()});
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
new file mode 100644
index 0000000..64405af
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.Server;
+import io.grpc.ServerInterceptors;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Starts a gRPC server and listens on the given port to 
transform the incoming messages into FlowFiles." +
+        " The message format is defined by the standard gRPC protobuf IDL 
provided by NiFi. gRPC isn't intended to carry large payloads," +
+        " so this processor should be used only when FlowFile sizes are on the 
order of megabytes. The default maximum message size is 4MB.")
+@Tags({"ingest", "grpc", "rpc", "listen"})
+@WritesAttributes({
+        @WritesAttribute(attribute = "listengrpc.remote.user.dn", description 
= "The DN of the user who sent the FlowFile to this NiFi"),
+        @WritesAttribute(attribute = "listengrpc.remote.host", description = 
"The IP of the client who sent the FlowFile to this NiFi")
+})
+public class ListenGRPC extends AbstractSessionFactoryProcessor {
+    public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
+    public static final String REMOTE_HOST = "listengrpc.remote.host";
+
+    // properties
+    public static final PropertyDescriptor PROP_SERVICE_PORT = new 
PropertyDescriptor.Builder()
+            .name("Local gRPC service port")
+            .description("The local port that the gRPC service will listen 
on.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_USE_SECURE = new 
PropertyDescriptor.Builder()
+            .name("Use SSL/TLS")
+            .description("Whether or not to use SSL/TLS to send the contents 
of the gRPC messages.")
+            .required(false)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+    public static final PropertyDescriptor PROP_FLOW_CONTROL_WINDOW = new 
PropertyDescriptor.Builder()
+            .name("Flow Control Window")
+            .description("The initial HTTP/2 flow control window for both new 
streams and overall connection." +
+                    " Flow-control schemes ensure that streams on the same 
connection do not destructively interfere with each other." +
+                    " The default is 1MB.")
+            .defaultValue("1MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Message Size")
+            .description("The maximum size of FlowFiles that this processor 
will allow to be received." +
+                    " The default is 4MB. If FlowFiles exceed this size, you 
should consider using another transport mechanism" +
+                    " as gRPC isn't designed for heavy payloads.")
+            .defaultValue("4MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROP_AUTHORIZED_DN_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("Authorized DN Pattern")
+            .description("A Regular Expression to apply against the 
Distinguished Name of incoming connections. If the Pattern does not match the 
DN, the connection will be refused.")
+            .required(true)
+            .defaultValue(".*")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
+    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_SERVICE_PORT,
+            PROP_USE_SECURE,
+            PROP_SSL_CONTEXT_SERVICE,
+            PROP_FLOW_CONTROL_WINDOW,
+            PROP_AUTHORIZED_DN_PATTERN,
+            PROP_MAX_MESSAGE_SIZE
+    ));
+
+    // relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("Success")
+            .description("The FlowFile was received successfully.")
+            .build();
+
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(Sets.newHashSet(Arrays.asList(
+            REL_SUCCESS
+    )));
+    private final AtomicReference<ProcessSessionFactory> 
sessionFactoryReference = new AtomicReference<>();
+    private volatile Server server = null;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+
+    @OnScheduled
+    public void startServer(final ProcessContext context) throws 
NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, 
UnrecoverableKeyException {
+        final ComponentLog logger = getLogger();
+        // gather configured properties
+        final Integer port = 
context.getProperty(PROP_SERVICE_PORT).asInteger();
+        final Boolean useSecure = 
context.getProperty(PROP_USE_SECURE).asBoolean();
+        final Integer flowControlWindow = 
context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue();
+        final Integer maxMessageSize = 
context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
+        final SSLContextService sslContextService = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final SSLContext sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+        final Pattern authorizedDnPattern = 
Pattern.compile(context.getProperty(PROP_AUTHORIZED_DN_PATTERN).getValue());
+        final FlowFileIngestServiceInterceptor callInterceptor = new 
FlowFileIngestServiceInterceptor(getLogger());
+        callInterceptor.enforceDNPattern(authorizedDnPattern);
+
+        final FlowFileIngestService flowFileIngestService = new 
FlowFileIngestService(getLogger(),
+                sessionFactoryReference,
+                context);
+        NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
+                
.addService(ServerInterceptors.intercept(flowFileIngestService, 
callInterceptor))
+                // default (de)compressor registries handle both plaintext and 
gzip compressed messages
+                .compressorRegistry(CompressorRegistry.getDefaultInstance())
+                
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
+                .flowControlWindow(flowControlWindow)
+                .maxMessageSize(maxMessageSize);
+
+        if (useSecure && sslContext != null) {
+            // construct key manager
+            if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) {
+                throw new IllegalStateException("SSL is enabled, but no 
keystore has been configured. You must configure a keystore.");
+            }
+
+            final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
+                    sslContext.getProvider());
+            final KeyStore keyStore = 
KeyStore.getInstance(sslContextService.getKeyStoreType());
+            try (final InputStream is = new 
FileInputStream(sslContextService.getKeyStoreFile())) {
+                keyStore.load(is, 
sslContextService.getKeyStorePassword().toCharArray());
+            }
+            keyManagerFactory.init(keyStore, 
sslContextService.getKeyStorePassword().toCharArray());
+
+            SslContextBuilder sslContextBuilder = 
SslContextBuilder.forServer(keyManagerFactory);
+
+            // if the trust store is configured, then client auth is required.
+            if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) 
{
+                final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
+                        sslContext.getProvider());
+                final KeyStore trustStore = 
KeyStore.getInstance(sslContextService.getTrustStoreType());
+                try (final InputStream is = new 
FileInputStream(sslContextService.getTrustStoreFile())) {
+                    trustStore.load(is, 
sslContextService.getTrustStorePassword().toCharArray());
+                }
+                trustManagerFactory.init(trustStore);
+                sslContextBuilder = 
sslContextBuilder.trustManager(trustManagerFactory);
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+            } else {
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.NONE);
+            }
+            sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
+            serverBuilder = 
serverBuilder.sslContext(sslContextBuilder.build());
+        }
+        logger.info("Starting gRPC server on port: {}", new 
Object[]{port.toString()});
+        this.server = serverBuilder.build().start();
+    }
+
+    @OnStopped
+    public void stopServer(final ProcessContext context) {
+        if (this.server != null) {
+            try {
+                this.server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                getLogger().warn("Unable to cleanly shutdown embedded gRPC 
server due to {}", new Object[]{e});
+                this.server = null;
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+        sessionFactoryReference.compareAndSet(null, sessionFactory);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..10609a7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.grpc.InvokeGRPC
+org.apache.nifi.processors.grpc.ListenGRPC

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto
new file mode 100644
index 0000000..1eb18a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/resources/proto/flowfile_service.proto
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// NOTE: you may need to add the sources generated when running `maven clean 
compile` to your IDE
+// configured source directories. Otherwise, the classes generated when the 
proto is compiled won't
+// be accessible. For IntelliJ, open this module's settings and mark the 
following as source directories:
+//
+// * target/generated-sources/protobuf/grpc-java
+// * target/generated-sources/protobuf/java
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.nifi.processors.grpc";
+option java_outer_classname = "FFSProto";
+option objc_class_prefix = "FFS";
+
+package org.apache.nifi.processors.grpc;
+
+// The FlowFile service definition.
+service FlowFileService {
+    // Sends a FlowFile (blocking rpc)
+    rpc Send (FlowFileRequest) returns (FlowFileReply) {}
+}
+
+// The request message
+message FlowFileRequest {
+    // tags 1-15 require one byte to encode and should be left for commonly 
occurring tags.
+    // For that reason, tags 3-14 are left available.
+    //
+    // source: 
https://developers.google.com/protocol-buffers/docs/proto3#assigning-tags
+    int64 id = 1;
+    map<string, string> attributes = 2;
+    bytes content = 15;
+}
+
+// the reply message
+message FlowFileReply {
+    enum ResponseCode {
+        ERROR = 0;
+        SUCCESS = 1;
+        RETRY = 2;
+    }
+
+    ResponseCode responseCode = 1;
+    string body = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java
new file mode 100644
index 0000000..1dccbb9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+import org.apache.nifi.ssl.StandardSSLContextService;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+
+import static org.apache.nifi.processors.grpc.TestGRPCServer.NEED_CLIENT_AUTH;
+
+/**
+ * Generic gRPC channel builder for use in unit testing. Consumers should use 
the channel built here
+ * to construct the desired stubs to communicate with a gRPC service.
+ */
+public class TestGRPCClient {
+    // Used to represent the ephemeral port range.
+    private static final int PORT_START = 49152;
+    private static final int PORT_END = 65535;
+
+    /**
+     * Can be used by clients to grab a random port in a range of ports
+     *
+     * @return a port to use for client/server comms
+     */
+    public static int randomPort() {
+        // add 1 because upper bound is exclusive
+        return ThreadLocalRandom.current().nextInt(PORT_START, PORT_END + 1);
+    }
+
+    /**
+     * Build a channel with the given host and port and optional ssl 
properties.
+     *
+     * @param host the host to establish a connection with
+     * @param port the port on which to communicate with the host
+     * @return a constructed channel
+     */
+    public static ManagedChannel buildChannel(final String host, final int 
port)
+            throws NoSuchAlgorithmException, KeyStoreException, IOException, 
CertificateException, UnrecoverableKeyException {
+        return buildChannel(host, port, null);
+    }
+
+    /**
+     * Build a channel with the given host and port and optional ssl 
properties.
+     *
+     * @param host          the host to establish a connection with
+     * @param port          the port on which to communicate with the host
+     * @param sslProperties the properties by which to establish an ssl 
connection
+     * @return a constructed channel
+     */
+    public static ManagedChannel buildChannel(final String host, final int 
port, final Map<String, String> sslProperties)
+            throws NoSuchAlgorithmException, KeyStoreException, IOException, 
CertificateException, UnrecoverableKeyException {
+        NettyChannelBuilder channelBuilder = 
NettyChannelBuilder.forAddress(host, port)
+                .directExecutor()
+                .compressorRegistry(CompressorRegistry.getDefaultInstance())
+                
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
+                .userAgent("testAgent");
+
+        if (sslProperties != null) {
+            SslContextBuilder sslContextBuilder = 
SslContextBuilder.forClient();
+
+            if(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) 
!= null) {
+                final KeyManagerFactory keyManager = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+                final KeyStore keyStore = 
KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
+                final String keyStoreFile = 
sslProperties.get(StandardSSLContextService.KEYSTORE.getName());
+                final String keyStorePassword = 
sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName());
+                try (final InputStream is = new FileInputStream(keyStoreFile)) 
{
+                    keyStore.load(is, keyStorePassword.toCharArray());
+                }
+                keyManager.init(keyStore, keyStorePassword.toCharArray());
+                sslContextBuilder = sslContextBuilder.keyManager(keyManager);
+            }
+
+            if 
(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
+                final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                final KeyStore trustStore = 
KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
+                final String trustStoreFile = 
sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName());
+                final String trustStorePassword = 
sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName());
+                try (final InputStream is = new 
FileInputStream(trustStoreFile)) {
+                    trustStore.load(is, trustStorePassword.toCharArray());
+                }
+                trustManagerFactory.init(trustStore);
+                sslContextBuilder = 
sslContextBuilder.trustManager(trustManagerFactory);
+            }
+
+            final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
+            if (clientAuth == null) {
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+            } else {
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
+            }
+            sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
+            channelBuilder = 
channelBuilder.sslContext(sslContextBuilder.build());
+        } else {
+            channelBuilder.usePlaintext(true);
+        }
+        return channelBuilder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java
new file mode 100644
index 0000000..7aeced8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.security.KeyStore;
+import java.util.Map;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.nifi.ssl.StandardSSLContextService;
+
+import io.grpc.BindableService;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.Server;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+
+/**
+ * Generic gRPC test server to assist with unit tests that require a server to 
be present.
+ *
+ * @param <T> the gRPC service implementation
+ */
+public class TestGRPCServer<T extends BindableService> {
+    public static final String HOST = "localhost";
+    public static final String NEED_CLIENT_AUTH = "needClientAuth";
+    private final Class<T> clazz;
+    private Server server;
+    private Map<String, String> sslProperties;
+
+    /**
+     * Create a gRPC server
+     *
+     * @param clazz the gRPC service implementation
+     */
+    public TestGRPCServer(final Class<T> clazz) {
+        this(clazz, null);
+    }
+
+
+    /**
+     * Create a gRPC server
+     *
+     * @param clazz         the gRPC service implementation
+     * @param sslProperties the keystore and truststore properties for SSL 
communications
+     */
+    public TestGRPCServer(final Class<T> clazz, final Map<String, String> 
sslProperties) {
+        this.clazz = clazz;
+        this.sslProperties = sslProperties;
+    }
+
+    /**
+     * Can be used by clients to grab a random port in a range of ports
+     *
+     * @return a port to use for client/server comms
+     */
+    public static int randomPort() throws IOException {
+        ServerSocket socket = new ServerSocket(0);
+        socket.setReuseAddress(true);
+        final int port = socket.getLocalPort();
+        socket.close();
+        return port;
+    }
+
+    /**
+     * Starts the gRPC server @localhost:port.
+     */
+    public void start(final int port) throws Exception {
+        final NettyServerBuilder nettyServerBuilder = NettyServerBuilder
+                .forPort(port)
+                .directExecutor()
+                .addService(clazz.newInstance())
+                .compressorRegistry(CompressorRegistry.getDefaultInstance())
+                
.decompressorRegistry(DecompressorRegistry.getDefaultInstance());
+
+        if (this.sslProperties != null) {
+            if 
(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) == null) {
+                throw new RuntimeException("You must configure a keystore in 
order to use SSL with gRPC.");
+            }
+
+            final KeyManagerFactory keyManager = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+            final KeyStore keyStore = 
KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
+            final String keyStoreFile = 
sslProperties.get(StandardSSLContextService.KEYSTORE.getName());
+            final String keyStorePassword = 
sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName());
+            try (final InputStream is = new FileInputStream(keyStoreFile)) {
+                keyStore.load(is, keyStorePassword.toCharArray());
+            }
+            keyManager.init(keyStore, keyStorePassword.toCharArray());
+            SslContextBuilder sslContextBuilder = 
SslContextBuilder.forServer(keyManager);
+
+            if 
(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
+                final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                final KeyStore trustStore = 
KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
+                final String trustStoreFile = 
sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName());
+                final String trustStorePassword = 
sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName());
+                try (final InputStream is = new 
FileInputStream(trustStoreFile)) {
+                    trustStore.load(is, trustStorePassword.toCharArray());
+                }
+                trustManagerFactory.init(trustStore);
+                sslContextBuilder = 
sslContextBuilder.trustManager(trustManagerFactory);
+            }
+
+            final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
+            if (clientAuth == null) {
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+            } else {
+                sslContextBuilder = 
sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
+            }
+            sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
+            nettyServerBuilder.sslContext(sslContextBuilder.build());
+        }
+
+        server = nettyServerBuilder.build().start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Use stderr here since the logger may have been reset by its 
JVM shutdown hook.
+                System.err.println("*** shutting down gRPC server since JVM is 
shutting down");
+                TestGRPCServer.this.stop();
+                System.err.println("*** server shut down");
+            }
+        });
+    }
+
+    /**
+     * Stop the server.
+     */
+    void stop() {
+        if (server != null) {
+            server.shutdown();
+        }
+    }
+
+    /**
+     * Await termination on the main thread since the grpc library uses daemon 
threads.
+     */
+    public void blockUntilShutdown() throws InterruptedException {
+        if (server != null) {
+            server.awaitTermination();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58a623df/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java
 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java
new file mode 100644
index 0000000..f709a19
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestInvokeGRPC.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.grpc;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import io.netty.handler.ssl.ClientAuth;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class TestInvokeGRPC {
+    // ids placed on flowfiles and used to dictate response codes in the 
DummyFlowFileService below
+    private static final long ERROR = 500;
+    private static final long SUCCESS = 501;
+    private static final long RETRY = 502;
+
+    @Test
+    public void testSuccess() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
+            assertThat(successFiles.size(), equalTo(1));
+            final MockFlowFile successFile = successFiles.get(0);
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testSuccessWithFlowFileContent() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+
+            runner.enqueue("content");
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"content");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
+            assertThat(successFiles.size(), equalTo(1));
+            final MockFlowFile successFile = successFiles.get(0);
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"content");
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testSuccessAlwaysOutputResponse() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, 
"true");
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
+            assertThat(successFiles.size(), equalTo(1));
+            final MockFlowFile successFile = successFiles.get(0);
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testExceedMaxMessageSize() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            // set max message size to 1B to force error
+            runner.setProperty(InvokeGRPC.PROP_MAX_MESSAGE_SIZE, "1B");
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+            // an exception should be thrown indicating that the max message 
size was exceeded.
+            response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, 
"io.grpc.StatusRuntimeException");
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testRetry() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+            runner.assertPenalizeCount(1);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.RETRY));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testRetryAlwaysOutputResponse() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, 
"true");
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+            runner.assertPenalizeCount(1);
+
+            final List<MockFlowFile> retryFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
+            assertThat(retryFiles.size(), equalTo(1));
+            final MockFlowFile retry = retryFiles.get(0);
+            retry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.RETRY));
+            retry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
+            retry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            retry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.RETRY));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testNoRetryOnError() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.ERROR));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testNoRetryOnErrorAlwaysOutputResponseAndPenalize() throws 
Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, 
"true");
+            runner.setProperty(InvokeGRPC.PROP_PENALIZE_NO_RETRY, "true");
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+            runner.assertPenalizeCount(1);
+
+            final List<MockFlowFile> noRetryFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
+            assertThat(noRetryFiles.size(), equalTo(1));
+            final MockFlowFile noRetry = noRetryFiles.get(0);
+            noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.ERROR));
+            noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
+            noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.ERROR));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testNoInput() throws Exception {
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            server.start(port);
+
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+            runner.assertPenalizeCount(0);
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testServerConnectionFail() throws Exception {
+
+        final int port = TestGRPCServer.randomPort();
+
+        // should be no gRPC server running @ that port, so processor will fail
+        final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
+        runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
+        runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
Integer.toString(port));
+
+        final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+        runner.enqueue(mockFlowFile);
+        runner.run();
+
+        runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
+        runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
+        runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+        runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+        runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);
+
+        final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
+        assertThat(responseFiles.size(), equalTo(1));
+        final MockFlowFile response = responseFiles.get(0);
+        response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+        response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
Integer.toString(port));
+        response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, 
"io.grpc.StatusRuntimeException");
+
+    }
+
+    @Test
+    public void testSecureTwoWaySsl() throws Exception {
+        final Map<String, String> sslProperties = getKeystoreProperties();
+        sslProperties.putAll(getTruststoreProperties());
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class, sslProperties);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");
+            useSSLContextService(runner, sslProperties);
+            server.start(port);
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
+            assertThat(successFiles.size(), equalTo(1));
+            final MockFlowFile successFile = successFiles.get(0);
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testSecureOneWaySsl() throws Exception {
+        final Map<String, String> sslProperties = getKeystoreProperties();
+        sslProperties.put(TestGRPCServer.NEED_CLIENT_AUTH, 
ClientAuth.NONE.name());
+        final TestGRPCServer<DummyFlowFileService> server = new 
TestGRPCServer<>(DummyFlowFileService.class, sslProperties);
+
+        try {
+            final int port = TestGRPCServer.randomPort();
+            final TestRunner runner = 
TestRunners.newTestRunner(InvokeGRPC.class);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, 
TestGRPCServer.HOST);
+            runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, 
String.valueOf(port));
+            runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");
+            useSSLContextService(runner, getTruststoreProperties());
+            server.start(port);
+
+            final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
+            runner.enqueue(mockFlowFile);
+            runner.run();
+            runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
+            runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
+            runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
+
+            final List<MockFlowFile> responseFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
+            assertThat(responseFiles.size(), equalTo(1));
+            final MockFlowFile response = responseFiles.get(0);
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+
+            final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
+            assertThat(successFiles.size(), equalTo(1));
+            final MockFlowFile successFile = successFiles.get(0);
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, 
String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
+            successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, 
"success");
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, 
TestGRPCServer.HOST);
+            successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, 
String.valueOf(port));
+        } finally {
+            server.stop();
+        }
+    }
+
+    private static Map<String, String> getTruststoreProperties() {
+        final Map<String, String> props = new HashMap<>();
+        props.put(StandardSSLContextService.TRUSTSTORE.getName(), 
"src/test/resources/localhost-ts.jks");
+        props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), 
"localtest");
+        props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+        return props;
+    }
+
+    private static Map<String, String> getKeystoreProperties() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put(StandardSSLContextService.KEYSTORE.getName(), 
"src/test/resources/localhost-ks.jks");
+        properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), 
"localtest");
+        properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), 
"JKS");
+        return properties;
+    }
+
+    private void useSSLContextService(final TestRunner controller, final 
Map<String, String> sslProperties) {
+        final SSLContextService service = new StandardSSLContextService();
+        try {
+            controller.addControllerService("ssl-service", service, 
sslProperties);
+            controller.enableControllerService(service);
+        } catch (InitializationException ex) {
+            ex.printStackTrace();
+            Assert.fail("Could not create SSL Context Service");
+        }
+
+        controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE, 
"ssl-service");
+    }
+
+    /**
+     * Dummy gRPC service whose responses are dictated by the IDs on the 
messages it receives
+     */
+    private static class DummyFlowFileService extends 
FlowFileServiceGrpc.FlowFileServiceImplBase {
+
+        public DummyFlowFileService() {
+        }
+
+        @Override
+        public void send(FlowFileRequest request, 
StreamObserver<FlowFileReply> responseObserver) {
+            final FlowFileReply.Builder replyBuilder = 
FlowFileReply.newBuilder();
+            // use the id to dictate response codes
+            final long id = request.getId();
+
+            if (id == ERROR) {
+                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
+                        .setBody("error");
+            } else if (id == SUCCESS) {
+                
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
+                        .setBody("success");
+            } else if (id == RETRY) {
+                replyBuilder.setResponseCode(FlowFileReply.ResponseCode.RETRY)
+                        .setBody("retry");
+                // else, assume the request is to include the flowfile content 
in the response
+            } else {
+                
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
+                        .setBody(request.getContent().toStringUtf8());
+
+            }
+            responseObserver.onNext(replyBuilder.build());
+            responseObserver.onCompleted();
+        }
+    }
+}

Reply via email to