Hi Gordon,

This is indeed a discussion necessary to have!

The purpose of previous PRs wasn't to provide solutions to the original
identified issues, but rather to enable solve those through customization.
What those customizations would be was also communicated, along with the
intent to contribute them subsequently as well, if they are deemed broadly
enough applicable and we find a reasonable contribution path.

So far we have implemented the following in custom code:

* use ListShards for discovery (plus also considered to limit the discovery
to a single subtask and share the results between subtasks, which is almost
certainly not something I would propose to add to Flink due to additional
deployment dependencies).

* override emitRecords in the fetcher to provide source watermarking with
idle shard handling. Related discussions for the Kafka consumer show that
it isn't straightforward to arrive at a solution that will satisfy
everyone. Still open to contribute those changes also, but had not seen a
response to that. Nevertheless, it is key to allow users to implement what
they need for their use case.

* retry certain exceptions in getRecords based on our production learnings.
Whether or not those are applicable to everyone and the Flink
implementation should be changed to retry by default is actually a future
discussion I'm intending to start. But in any case, we need to be able to
make the changes that we need on our end.

* ability to configure the AWS HTTP client when defaults turn out
unsuitable for the use case. This is a very basic requirement and it is
rather surprising that the Flink Kinesis consumer wasn't written to provide
access to the settings that the AWS SDK provides.

I hope above examples make clear that it is necessary to leave room for
users to augment a base implementation. There is no such thing as a perfect
connector and there will always be new discoveries by users that require
improvements or changes. Use case specific considerations may require to
augment the even best default behavior, what works for one user may not
work for another.

If I don't have the hooks that referenced PRs enable, then the alternative
is to fork the code. That will further reduce the likelihood of changes
making their way back to Flink.

I think we agree in the ultimate goal of improving the default
implementation of the connector. There are more fundamental issues with the
Kinesis connector (and other connectors) that I believe require deeper
design work and rewrite, which go beyond what we discuss here.

Finally, I'm also curious how much appetite for contributions in the
connector areas there is? I see that we have now accumulated 340 open PRs,
and review bandwidth seems hard to come by.


On Sun, Apr 15, 2018 at 8:56 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>

> Hi Thomas,
> Thanks for your PRs!
> I understand and fully agree with both points that you pointed out.
> What I'm still a bit torn with is the current proposed solutions for these
> issues (and other similar connector issues).
> This might actually call for a good opportunity to bring some thoughts up
> about connector contributions. My arguments would be the following:
> The solutions actually break some fundamental designs of the connector
> code.
> For example, in recent PRs for the Kinesis connector we've been proposing
> to relax access of the `KinesisProxy` constructor.
> AFAIK, this fix was triggered by an inefficiency in the
> `KinesisProxy#getShardsOfStream` method which influences shard discovery
> performance.
> First of all, such a change breaks the fact that the class is an internal
> class (it is marked as @Internal). It was made private as it handles
> critical paths such as record fetching and shard listing, and is not
> intended to be modified at all.
> Second of all, the fix in the end did not fix the inefficiency at all -
> only for advanced users who perhaps have saw the corresponding JIRA and
> would bother to do the same and override the inefficient implementations by
> themselves.
> If there is a fix that would have benefited all users of the connector in
> general, I would definitely be more in favor of that.
> This goes the same for https://github.com/apache/flink/pull/5803 - I'm not
> sure that allowing overrides on the retry logic is ideal. For example, we
> previously introduced in the Elasticsearch connector a RetryHandler
> user-facing API to allow such customizations.
> On one hand, I do understand that solving these connector issues properly
> would perhaps require a more thorough design-wise ground-work and could be
> more time-consuming.
> On the other hand, I also understand that we need to find a good balance to
> allow production users of these connectors to be able to quickly iterate
> what issues the current code has and unblock encountered problems.
> My main concern is that our current approach to fixing these issues, IMO,
> actually do not encourage good fixes to be contributed back to the
> connector code, and they would therefore remain problematic as they are.
> What do you think? I may also be missing thing in a bigger picture here, so
> feedback would be highly appreciated.
> Cheers,
> Gordon
> On Tue, Apr 3, 2018 at 1:25 PM, Thomas Weise <t...@apache.org> wrote:
> > PR to provide the hooks:  https://github.com/apache/flink/pull/5803
> >
> >
> > On Mon, Apr 2, 2018 at 6:14 PM, Thomas Weise <t...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > I’m working on implementing retry for getRecords in
> FlinkKinesisConsumer.
> > > We occasionally get transient socket read timeouts. Instead of bubbling
> > up
> > > the exception and forcing a topology reset to checkpoint, we want to
> > retry
> > > getRecords. We also want to work with a lower socket read timeout than
> > the
> > > 50s default.
> > >
> > > Looking at the current KinesisProxy implementation, I’m aiming to
> remove
> > > some baked in assumptions that get into the way of customizing this:
> > >
> > > 1) AWSUtil.createKinesisClient - statically wired to use default
> > > ClientConfiguration. The user should be able to control all settings
> that
> > > the SDK exposes instead.
> > >
> > > 2) Retry in KinesisProxy.getRecords limited to AmazonServiceException.
> > > Perhaps it is OK as default, but the user should be able to retry on
> > other
> > > exceptions if desired.
> > >
> > > For 1) a generic option could be to set properties on
> ClientConfiguration
> > > using reflection (the class isn’t serializable but follows the Java
> Bean
> > > conventions). Something like BeanUtils would make it straightforward to
> > > process user supplied properties with a specific prefix. Is there any
> > other
> > > place in the Flink codebase where this style of configuration approach
> is
> > > used and a preferred alternative to BeanUtils?
> > >
> > > Thanks
> > >
> > >
> >

