[
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250
]
Devin Thomson edited comment on FLINK-4582 at 2/4/19 9:45 PM:
--------------------------------------------------------------
[~yxu-lyft] circling back here, looks like this got merged in, congrats!
I've been preemptively cutting over from my implementation to this one and
noticed one (blocking) bug:
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property
\"eventName\":
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params) vs
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for
property \"eventName\":
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params...
{code}
It looks like the root cause is that DynamoDBStreamsSchema.java defines the
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer
implementation. Not sure if it makes sense to track this as a separate issue or
not since this is still a 1.8-SNAPSHOT feature.
Let me know if you have any questions!
Devin
was (Author: tinder-dthomson):
[~yxu-lyft] circling back here, looks like this got merged in, congrats!
I've been preemptively cutting over from my implementation to this one and
noticed one (blocking) bug:
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property
\"eventName\":
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params) vs
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for
property \"eventName\":
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
params...
{code}
It looks like the root cause is that DynamoDBStreamsSchema.java defines the
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer
implementation. Not sure if it makes sense to track this as a separate issue or
not since this is still a 1.8-SNAPSHOT feature.
Let me know if you have any questions!
- Devin
> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> ------------------------------------------------------------
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
> Issue Type: New Feature
> Components: Kinesis Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Ying Xu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data
> capture) feature called DynamoDB Streams
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
> which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with
> only a slight difference in resharding behaviours, so it is possible to build
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc =
> FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and
> combining what Flink has for exactly-once semantics, out-of-core state
> backends, and queryable state with CDC can have very strong use cases. For
> this feature there should only be an extra dependency to the AWS Java SDK for
> DynamoDB, which has Apache License 2.0.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)