dawillcox opened a new issue #6024: PulsarSpout emits only to default stream URL: https://github.com/apache/pulsar/issues/6024 **Is your feature request related to a problem? Please describe.** We have an application, currently using another streaming ingestion method, that we'd like to replace with either Kafka or Pulsar. Either would be a candidate, _except_ the spout needs to emit to different output streams (to be processed by different bolts) based on the content of individual messages. The Kafka spout supports this. Topologies implement `RecordTranslator`, an interface that returns the output stream and the `Values` for the tuple. The equivalent in `PulsarSpout` is `MessageToValuesMapper`, but that returns only the `Values`. There's no way to specify an alternate stream; everything goes to "default." **Describe the solution you'd like** Simply put, a way for `MessageToValuesMapper` to specify the output stream for a tuple. (It already has a `declareOutputFields()` method so can declare the appropriate output streams.) Two possible solutions, both of which would require a backwards-compatible extension of `MessageToValuesMapper`. The first I think is a bit cleaner, but both would work. 1. Change `PulsarSpout` to use a new method in `MessageToValuesMapper` that returns a `PulsarTuple` interface similar to Kafka's `KafkaTuple`. It would return the `Values` _and_ output stream. A default implementation of the new method would just call the existing method (which probably should be deprecated) and return a `PulsarTuple` with stream "default." 1. Add a new method to `MessageToValuesMapper`: `String outputStream(Message msg, Values values)` to return the output stream given the original `Message` and extracted `Values`. The default implementation would just return "default." **Describe alternatives you've considered** All of these would require `collector` to be not `private`, and probably other things. None are very satisfying. All would require an application to extend `PulsarSpout`, which can cause compatibility issues with new versions. 1. Change the existing `collector.emit()` calls with a call to a non-private `doEmit(Message msg, Values values)` call. Applications could override that and figure out the output stream as needed. 1. Make `mapToValueAndEmit()` at least protected so it can be overridden. 1. Override nextTuple() and replicate most of the logic except the `emit()` calls. Or, just use Kafka, which is what we'd actually do. **Additional context** One application receives Avro-encoded events with a header that identifies the schema type and version. (Also some other information not relevant here.) The schema type determines the output stream. Another application has a fixed Avro schema with two fields, an event type and a serialized protobuf. The event type determines the output stream. Changing those formats isn't an option.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services