[
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942302#comment-15942302
]
ASF GitHub Bot commented on BEAM-1573:
--------------------------------------
GitHub user peay opened a pull request:
https://github.com/apache/beam/pull/2330
[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the PR title is formatted like:
`[BEAM-<Jira issue #>] Description of pull request`
- [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
Travis-CI on your fork and ensure the whole test matrix passes).
- [x] Replace `<Jira issue #>` in the title with the actual Jira issue
number, if there is one.
- [ ] If this contribution is large, please file an Apache
[Individual Contributor License
Agreement](https://www.apache.org/licenses/icla.txt).
---
**This is a work in progress, do not merge**.
Modified:
- Use Kafka serializers and deserializers in KafkaIO
- Added helper methods `fromAvro` and `toAvro`, to use serialization based
on `AvroCoder`. This is uniform with other IO such as HDFS.
- Moved `CoderBasedKafkaSerializer` out, and added
`CoderBaseKafkaDeserializer`. These are used for `toAvro/fromAvro`, and can be
useful to port existing code that relies on coder.
- Added `InstantSerializer` and `InstantDeserializer`, as `Instant` is used
in some of the tests.
Writer lets Kafka handle serialization itself. Reader uses Kafka byte
deserializers, and calls the user-provided Kafka deserializer from `advance`.
Note that Kafka serializers and deserializers are not themselves
`Serializable`. Hence, I've used a `Class<..>` in the `spec` both for read and
write.
There is still an issue, though. `Read` still takes **both a deserializer
and a coder**. This is because the source must implement
`getDefaultOutputCoder`, and I am not sure how to infer it. Having to provide
the two is heavy, but I am not sure how to infer the coders in this context.
Any thoughts?
cc @rangadi @jkff
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/peay/beam BEAM-1573
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/beam/pull/2330.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2330
----
commit 511a1301190b08b05573e3025d7ade2746d61e5f
Author: peay <[email protected]>
Date: 2017-03-26T14:51:59Z
[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO
----
> KafkaIO does not allow using Kafka serializers and deserializers
> ----------------------------------------------------------------
>
> Key: BEAM-1573
> URL: https://issues.apache.org/jira/browse/BEAM-1573
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-extensions
> Affects Versions: 0.4.0, 0.5.0
> Reporter: peay
> Assignee: Raghu Angadi
> Priority: Minor
> Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings
> of the Kafka consumer and producers it uses internally. Instead, it allows to
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the
> rest of the system. However, is there a reason to completely disallow to use
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance,
> which requires custom serializers. One can write a `Coder` that wraps a
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper
> would require duplicating the output topic setting in the argument to
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)