dawillcox opened a new issue #6024: PulsarSpout emits only to default stream
URL: https://github.com/apache/pulsar/issues/6024
 
 
   **Is your feature request related to a problem? Please describe.**
   We have an application, currently using another streaming ingestion method, 
that we'd like to replace with either Kafka or Pulsar. Either would be a 
candidate, _except_ the spout needs to emit to different output streams (to be 
processed by different bolts) based on the content of individual messages.
   
   The Kafka spout supports this. Topologies implement `RecordTranslator`, an 
interface that returns the output stream and the `Values` for the tuple.
   
   The equivalent in `PulsarSpout` is `MessageToValuesMapper`, but that returns 
only the `Values`. There's no way to specify an alternate stream; everything 
goes to "default."
   
   **Describe the solution you'd like**
   Simply put, a way for `MessageToValuesMapper` to specify the output stream 
for a tuple. (It already has a `declareOutputFields()` method so can declare 
the appropriate output streams.)
   
   Two possible solutions, both of which would require a backwards-compatible 
extension of `MessageToValuesMapper`. The first I think is a bit cleaner, but 
both would work.
   
   1. Change `PulsarSpout` to use a new method in `MessageToValuesMapper` that 
returns a `PulsarTuple` interface similar to Kafka's `KafkaTuple`. It would 
return the `Values` _and_ output stream. A default implementation of the new 
method would just call the existing method (which probably should be 
deprecated) and return a `PulsarTuple` with stream "default."
   1. Add a new method to `MessageToValuesMapper`: `String outputStream(Message 
msg, Values values)` to return the output stream given the original `Message` 
and extracted `Values`. The default implementation would just return "default."
   
   **Describe alternatives you've considered**
   All of these would require `collector` to be not `private`, and probably 
other things. None are very satisfying. All would require an application to 
extend `PulsarSpout`, which can cause compatibility issues with new versions.
   1. Change the existing `collector.emit()` calls with a call to a non-private 
`doEmit(Message msg, Values values)` call. Applications could override that and 
figure out the output stream as needed. 
   1. Make `mapToValueAndEmit()` at least protected so it can be overridden.
   1. Override nextTuple() and replicate most of the logic except the `emit()` 
calls.
   
   Or, just use Kafka, which is what we'd actually do.
   
   **Additional context**
   One application receives Avro-encoded events with a header that identifies 
the schema type and version. (Also some other information not relevant here.) 
The schema type determines the output stream.
   
   Another application has a fixed Avro schema with two fields, an event type 
and a serialized protobuf. The event type determines the output stream.
   
   Changing those formats isn't an option.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to