Hi Thomas,

Thanks for your PRs!

I understand and fully agree with both points that you pointed out.

What I'm still a bit torn with is the current proposed solutions for these
issues (and other similar connector issues).

This might actually call for a good opportunity to bring some thoughts up
about connector contributions. My arguments would be the following:

The solutions actually break some fundamental designs of the connector code.
For example, in recent PRs for the Kinesis connector we've been proposing
to relax access of the `KinesisProxy` constructor.
AFAIK, this fix was triggered by an inefficiency in the
`KinesisProxy#getShardsOfStream` method which influences shard discovery
First of all, such a change breaks the fact that the class is an internal
class (it is marked as @Internal). It was made private as it handles
critical paths such as record fetching and shard listing, and is not
intended to be modified at all.
Second of all, the fix in the end did not fix the inefficiency at all -
only for advanced users who perhaps have saw the corresponding JIRA and
would bother to do the same and override the inefficient implementations by
If there is a fix that would have benefited all users of the connector in
general, I would definitely be more in favor of that.
This goes the same for https://github.com/apache/flink/pull/5803 - I'm not
sure that allowing overrides on the retry logic is ideal. For example, we
previously introduced in the Elasticsearch connector a RetryHandler
user-facing API to allow such customizations.

On one hand, I do understand that solving these connector issues properly
would perhaps require a more thorough design-wise ground-work and could be
more time-consuming.
On the other hand, I also understand that we need to find a good balance to
allow production users of these connectors to be able to quickly iterate
what issues the current code has and unblock encountered problems.

My main concern is that our current approach to fixing these issues, IMO,
actually do not encourage good fixes to be contributed back to the
connector code, and they would therefore remain problematic as they are.

What do you think? I may also be missing thing in a bigger picture here, so
feedback would be highly appreciated.


On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <t...@apache.org> wrote:

> PR to provide the hooks:  https://github.com/apache/flink/pull/5803
> On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote:
> > Hi,
> >
> > I’m working on implementing retry for getRecords in FlinkKinesisConsumer.
> > We occasionally get transient socket read timeouts. Instead of bubbling
> up
> > the exception and forcing a topology reset to checkpoint, we want to
> retry
> > getRecords. We also want to work with a lower socket read timeout than
> the
> > 50s default.
> >
> > Looking at the current KinesisProxy implementation, I’m aiming to remove
> > some baked in assumptions that get into the way of customizing this:
> >
> > 1) AWSUtil.createKinesisClient - statically wired to use default
> > ClientConfiguration. The user should be able to control all settings that
> > the SDK exposes instead.
> >
> > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
> > Perhaps it is OK as default, but the user should be able to retry on
> other
> > exceptions if desired.
> >
> > For 1) a generic option could be to set properties on ClientConfiguration
> > using reflection (the class isn’t serializable but follows the Java Bean
> > conventions). Something like BeanUtils would make it straightforward to
> > process user supplied properties with a specific prefix. Is there any
> other
> > place in the Flink codebase where this style of configuration approach is
> > used and a preferred alternative to BeanUtils?
> >
> > Thanks
> >
> >

Reply via email to