sijie commented on issue #4093: [issue#4042] improve java functions API
URL: https://github.com/apache/pulsar/pull/4093#issuecomment-485254074
 
 
   > is that really less error prone than:
   
   The `context` is designed for immutability. Hence the function implementor 
should not change any kind of state of context. At the time that you introduce 
`context.setOutputMessageConf(messageBuilderConf)`, you are turning context 
from immutability to mutability. The problem arises, because the context is 
shared by all the context.publish calls. Hence you have to take care of the 
sequence `setOutputMessageConf` and calling `context.publish`, if you don't 
take care of the sequence, race conditions can arise and unpredictability also 
arise. A lot of unknown bugs will be introduced. And that's what I called error 
prone.
   
   In the approach I proposed, context.newOutputMessageBuilder() returns a 
brand new builder for each calls. The function implementation can attach 
different properties to different builders and they are isolated. you don't 
need to worry about the sequence of setting properties between two publish 
calls.
   
   Just think about the following implementation. 
   
   ```
   MessageBuilder<T> msgBuilder1 = context.newOutputMessage("topic");
   msgBuilder1.key("key1");
   MessageBuilder<T> msgBuilder2 = context.newOutputMessage("topic");
   msgBuilder2.key("key2");
   MessageBuilder<T> msgBuilder3 = context.newOutputMessage("topic");
   
   msgBuilder3.value("value3");
   msgBuilder2.value("value2");
   msgBuilder1.value(“value1");
   
   msgBuilder1.sendAsync();
   msgBuilder2.sendAsync();
   msgBuilder3.sendAsync();
   ```
   
   > adding another support for MessageBuilder<O> process(I input, Context 
context); will likely involve adding another function interface that users can 
implement and adding support for it in the functions framework . This will be a 
bigger change and perhaps require broader discussion.
   
   
   @jerrypeng I am not proposing adding a new interface. it is still the same 
function interface. but the generic type of the function is 
`MessageBuilder<T>`. 
   
   ```
   public class MessageBuilderFunction<I, O> implements Function<I, 
TypedMessageBuilder<O>> {
       @Override
       public TypedMessageBuilder<O> process(I input, Context context) throws 
Exception {
   
           TypedMessageBuilder<O> msgBuilder = context.newOutputMessage();
           
           msgBuilder.value(process(input));
           msgBuilder.key(...);
   
           return msgBuilder;
       }
   }
   ```
   
    The only change is in PulsarSink#write(Record<T> record), check if 
`record.getValue` is a TypedMessageBuilder, if so use the builder. It is a 
simple and straightforward change without changing any interfaces.
   
   Regarding your concerns about my proposal, I believe it comes from your 
misunderstanding on my proposal. but anyway I am replying one by one as below.
   
   > User's will have to implement another interface to take advantage of being 
able to set these message confs for returned output. If a user has already 
written a function using our current function interface, he or she will have to 
change it. This forces users to make a considerable amount of changes instead 
of just calling an additional method.
   
   If users already have functions, they probably don't need to attach 
properties to the output message. If users really want to change old functions 
to attach propertites, what he needs to do is change the generic type from `O` 
to `TypedMessageBuilder<O>`, and change `return O;` to `return 
context.newOutputMessage().value(O);`. I don't see how this can be a 
considerable amount changes than calling additional method.
   
   > Additional interface we will have to support in the function runtime. 
While this is not a deal breaker for me, but I rather not support any more 
interfaces than necessary.
   
   This is not a new interface. As I described above, this only requires a few 
lines change in PulsarSink. I don't see why this is a breaker.
   
   > My vision for functions is that we keep a certain distance from a producer 
and consumer API to distinguish functions a way to do lightweight compute and 
not just some generic consume/produce application. So I would rather not have 
any more APIs that resemble consumer/producer semantics. 
   
   why it is called lightweight computing? It is because users don't have to 
worry about error handling, exactly-once, code distribution, failover and many 
things that they have to worry about using a pure producer or consumer. but it 
doesn't mean they shouldn't use the well-defined pulsar api around message 
builder. 
   
   Pulsar functions is part of Pulsar. we should use whatever is already 
defined as an API in pulsar, rather than spinning off any kind of new 
interfaces.
   
   Hence I don't see how `newOutputMessage()` would introduce any complexity 
into functions. It is actually much simpler, more flexible and easier to 
maintain than current messy `context.publish` interfaces.
   
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to