I am thinking of adding an input operator to Apex Malhar that allows gRPC based 
message streams to be consumed by an Apex system.

 

gRPC (http://www.grpc.io/posts/principles) is a recent open source RPC 
framework that started at Google and is becoming popular. It is typically used 
with Protobuf (a serialization framework also developed at Google, see 
https://developers.google.com/protocol-buffers/docs/overview).

 

In this proposal I will create an AbstractGrpcInputOperator that will behave 
somewhat like the Http input operator in the sense that it will generate a 
request to the Grpc service and will process the response to parse the 
individual messages and emit tuples. Of course the operator will have support 
for idempotency and exception handling. We will also try to add support for 
partitionability and dynamic scalability based on their applicability to the 
Grpc input operator. Similarly we will opportunistically add support for Client 
interceptors 
(http://www.grpc.io/grpc-java/javadoc/io/grpc/ClientInterceptor.html) and other 
gRPC usage models (e.g. unary vs streaming).

 

A developer uses the “protoc” compiler and an input “proto” file to generate 
Java classes that define the client “stubs” and serialized message classes that 
correspond to the RPC definition in the proto file. Hence 
AbstractGrpcInputOperator is a generic class requiring the request and response 
type arguments:

 

abstract class AbstractGrpcInputOperator<RequestType extends 
GeneratedMessageV3, ResponseType extends GeneratedMessageV3> 

 

All Protobuf (version 3) generated protocol message classes extend class 
com.google.protobuf.GeneratedMessageV3. This class implements most of the 
Message and Builder interfaces using Java reflection. 

 

The operator also needs an “AbstractStub” instance that is generated by 
“protoc”. AbstractStub is the common base type for client stub implementations. 
It encapsulates things such as remote host+port, TLS vs TCP transport, and 
trust store in case of TLS.

 

The operator also needs a MethodDescriptor object (which encapsulates the name 
of the operation to execute as well as Marshaller instances used to parse and 
serialize request and response messages) and a RequestType object that contains 
the RPC/Request arguments.

 

The operator will create a separate thread to asynchronously post gRPC requests 
in an infinite loop and the same thread will process the response for received 
messages (ResponseType objects). These ResponseType objects will be added to an 
ArrayBlockingQueue and the emitTuple() will read this queue to generate the 
tuples (similar to the logic in AbstractJMSInputOperator<T> of Malhar).

 

The class will go in the package org.apache.apex.malhar.lib.io.grpc . User will 
need to subclass this class and provide the actual types for RequestType and 
ResponseType as well as the properties described above.

 

Comments/feedback welcome.

 

Sanjay

 

Reply via email to