[
https://issues.apache.org/jira/browse/FLINK-10316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637252#comment-16637252
]
ASF GitHub Bot commented on FLINK-10316:
----------------------------------------
diego-carvallo-tx opened a new pull request #6789: [FLINK-10316][kinesis] bug
was preventing FlinkKinesisProducer to connect to Kinesalite
URL: https://github.com/apache/flink/pull/6789
## What is the purpose of the change
- Some time ago
[FLINK-4197](https://issues.apache.org/jira/browse/FLINK-4197) added the
ability to connect to a local Kinesis endpoint but also introduced some bugs
- Some time ago
[FLINK-9402](https://issues.apache.org/jira/browse/FLINK-9402) added some fixes
for those bugs but for FlinkKinesisConsumer only
- This PR addresses
[FLINK-10316](https://issues.apache.org/jira/browse/FLINK-10316) to fix the
remaining bugs to allow FlinkKinesisProducer also to connect to a local Kinesis
endpoint
## Brief change log
- The method `KinesisConfigUtil.validateAwsConfiguration(Properties
config)` is used by both `FlinkKinesisConsumer` and `FlinkKinesisProducer` but
AWS_REGION/AWS_ENDPOINT validation was performed only for Consumer who needs
only one of them to be set. On the other side Producer requires AWS_REGION to
be set even if AWS_ENDPOINT is defined to connect to local Kinesis. So the
change in this PR is to change the method signature to have a boolean to
identify if it is a Consumer or Producer so the validation can be done properly
for each case:
`KinesisConfigUtil.validateAwsConfiguration(Properties config, boolean
isProducer)`
- Changed the Unit Tests accordingly
## Jira Note
There are 2 other tickets that could be closed along with this one since
they all refer to the same issue:
[FLINK-9618](https://issues.apache.org/jira/browse/FLINK-9618) and
[FLINK-8936](https://issues.apache.org/jira/browse/FLINK-8936)
## Verifying this change
This change is already covered by existing tests, such as *(please describe
tests)*.
- KinesisConfigUtilTest.testMissingAwsRegionInConfig
- KinesisConfigUtilTest.testAwsRegionOrEndpointInConfig
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kinesalite bug fix (FLINK-9402) doesn't fully fix the problem.
> --------------------------------------------------------------
>
> Key: FLINK-10316
> URL: https://issues.apache.org/jira/browse/FLINK-10316
> Project: Flink
> Issue Type: Bug
> Components: Kinesis Connector
> Affects Versions: 1.5.0, 1.6.0, 1.7.0
> Reporter: Joseph Sims
> Priority: Minor
> Labels: pull-request-available
>
> Related to: https://issues.apache.org/jira/browse/FLINK-9402
> A fix (Flink-9402) was introduced to force the configurations to have
> *exactly* one of "aws.region" or "aws.endpoint", but the underlying problem
> wasn't entirely fixed.
> If "aws.endpoint" is set and "aws.region" is unset, a different exception is
> thrown, instead.
> In *KinesisConfigUtil.java*, *getValidatedProducerConfiguration()* calls:
> {code:java}
> validateAwsConfiguration(config)
> {code}
> Where the original fix was introduced ("aws.region" XOR "aws.endpoint")
> But a couple of lines later, *getValidatedProducerConfiguration()* then
> calls:
> {code:java}
> kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
> {code}
> Which explicitly checks for the existence of "aws.region", and throws an
> exception if it is not set.
> Thus, this class needs to be fixed such that "aws.region" is only required
> if "aws.endpoint" is unset, as the original ticket indicated.
> Also, *validateAwsConfiguration()* has a minor bug where is labels the
> "aws.endpoint" incorrectly.
> Current:
> {code:java}
> if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^
> config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
> // per validation in AwsClientBuilder
> throw new IllegalArgumentException(String.format("Either AWS region ('%s')
> or AWS endpoint ('%s') must be set in the config.",
> AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION));
> }
> {code}
> But should be:
> {code:java}
> if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^
> config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
> // per validation in AwsClientBuilder
> throw new IllegalArgumentException(String.format("Either AWS region ('%s')
> or AWS endpoint ('%s') must be set in the config.",
> AWSConfigConstants.AWS_REGION, ConsumerConfigConstants.AWS_ENDPOINT));
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)