[ 
https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260655#comment-14260655
 ] 

Saisai Shao edited comment on SPARK-4960 at 12/30/14 1:34 AM:
--------------------------------------------------------------

Hi Cody, please see the inline comments.

{quote}
You're saying for an implementation that currently extends e.g.
Receiver\[T\]
that a user can provide a function
T => Iterable\[M\]
But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't 
need a user provided function of type
(K, V) => Iterable\[M\]
We need a user provided function of type
MessageAndMetadata => Iterable\[M\]
In other words, a third type parameter.
{quote}

I know your requirement in Kafka, the solution mentioned doc is a general 
solution, if acceptable, I will change the KafkaReceiver accordingly.

{quote}
I'm also not clear on how your proposed solution deals with the 9 different 
overloads of store(), including the one that takes raw serialized bytes.

At that point I'm not sure that having an interceptor setter defined on a 
parent class makes a lot of sense, because it's the particular subclass that 
knows what its intermediate third type is (MessageAndMetadata in this case), as 
well as which store method(s) it cares about.
{quote}

Sorry I neglect the store() API which takes the raw serialized bytes, I will 
redesign the doc to find a better way :).

{quote}
Thats part of why I think constructor arguments are actually a cleaner way to 
handle this - kafka can have an "interceptor" argument that defaults to a 
function MessageAndMetadata => Iterable\[(K,V)\], other implementations can 
have a type signature for the interceptor that makes sense for them.

As an aside, I think it should actually be TraversableOnce, not Iterable. All 
we care about is being able to call foreach on it once, and the classes that 
implement TraversableOnce are a superset of those that implement iterable.
{quote}

Put "interceptor" in the construct arguments actually is a cleaner way, but 
since all the receiver related API should add this "interceptor", the change of 
existing API will be a large work, also break the binary compatibility. The 
prerequisite of my design is not to change the current API.

My design consideration is less modification of current code and generality, 
though as you mentioned this design may not so straightforward compared to add 
"interceptor" in construct arguments, I will refine the design and to see 
there's no thing not considered. Thanks a lot for your review and comments, 
appreciate your time :).


was (Author: jerryshao):
Hi Cody, please see the inline comments.

{quote}
You're saying for an implementation that currently extends e.g.
Receiver\[T\]
that a user can provide a function
T => Iterable\[M\]
But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't 
need a user provided function of type
(K, V) => Iterable\[M\]
We need a user provided function of type
MessageAndMetadata => Iterable\[M\]
In other words, a third type parameter.
{quote}

I know your requirement in Kafka, the solution mentioned doc is a general 
solution, if acceptable, I will change the KafkaReceiver accordingly.

{quote}
I'm also not clear on how your proposed solution deals with the 9 different 
overloads of store(), including the one that takes raw serialized bytes.

At that point I'm not sure that having an interceptor setter defined on a 
parent class makes a lot of sense, because it's the particular subclass that 
knows what its intermediate third type is (MessageAndMetadata in this case), as 
well as which store method(s) it cares about.
{quote}

Sorry I neglect the store() API which takes the raw serialized bytes, I will 
redesign the doc to find a better way :).

{quote}
Thats part of why I think constructor arguments are actually a cleaner way to 
handle this - kafka can have an "interceptor" argument that defaults to a 
function MessageAndMetadata => Iterable\[(K,V)\], other implementations can 
have a type signature for the interceptor that makes sense for them.

As an aside, I think it should actually be TraversableOnce, not Iterable. All 
we care about is being able to call foreach on it once, and the classes that 
implement TraversableOnce are a superset of those that implement iterable.
{quote}

Put "interceptor" in the construct arguments actually a cleaner way, but since 
all the receiver related API should add this "interceptor", the change of 
existing API will be a large work, also break the binary compatibility. The 
prerequisite of my design is not to change the current API.

My design consideration is less modification of current code and generality, 
though as you mentioned this design may not so straightforward compared to add 
"interceptor" in construct arguments. Thanks a lot for your review and 
comments, appreciate your time :).

> Interceptor pattern in receivers
> --------------------------------
>
>                 Key: SPARK-4960
>                 URL: https://issues.apache.org/jira/browse/SPARK-4960
>             Project: Spark
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tathagata Das
>
> Sometimes it is good to intercept a message received through a receiver and 
> modify / do something with the message before it is stored into Spark. This 
> is often referred to as the interceptor pattern. There should be general way 
> to specify an interceptor function that gets applied to all receivers. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to