[
https://issues.apache.org/jira/browse/FLINK-32400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
KianChen updated FLINK-32400:
-----------------------------
Description:
Since flink 1.14 the FlinkKafkaConsumer is deprecated, using KafkaSouce
instead. As the summary, the KafkaSourceBuilder can be set properties through
setProperties(Properties props) method. And the value of props can be an object
(such as the "enable.auto.commit" can be boolean). The #maybeOverride method
used Properties#getProperty (in this method if the value is not type of string
then return null) to get the value and checking if it's null.
KafkaSourceBuilder#build calls #parseAndSetRequiredProperties method and it
checks the "enable.auto.commit" property. If the value is true which is type of
boolean then it will be overridden by false. But the kafka-clients supports
boolean type. the value only for checking and printing, so I think use
Properties#get method to get the value as an object is better.
the source code as below:
{code:java}
private boolean maybeOverride(String key, String value, boolean override) {
boolean overridden = false;
String userValue = this.props.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(String.format("Property %s is provided but will be
overridden from %s to %s", key, userValue, value));
this.props.setProperty(key, value);
overridden = true;
}
} else {
this.props.setProperty(key, value);
}
return overridden;
} {code}
the improvement as below:
{code:java}
private boolean maybeOverride(String key, String value, boolean override) {
boolean overridden = false;
Object userValue = this.props.get(key);
if (userValue != null) {
if (override) {
LOG.warn(String.format("Property %s is provided but will be
overridden from %s to %s", key, userValue, value));
props.setProperty(key, value);
overridden = true;
}
} else {
props.setProperty(key, value);
}
return overridden;
} {code}
was:
Since flink 1.14 the FlinkKafkaConsumer is deprecated, using KafkaSouce
instead. As the summary, the KafkaSourceBuilder can be set properties through
setProperties(Properties props) method. And the value of props can be an object
(such as the "enable.auto.commit" can be boolean). The #maybeOverride method
used Properties#getProperty (in this method if the value is not type of string
then return null) to get the value and checking if it's null.
KafkaSourceBuilder#build calls #parseAndSetRequiredProperties method and it
checks the "enable.auto.commit" property. If the value is true which is type of
boolean then it will be overridden by false. But the kafka-clients supports
boolean type. the value only for checking and printing, so I think use
Properties#get method to get the value as an object is better.
the source code as below:
{code:java}
private boolean maybeOverride(String key, String value, boolean override) {
boolean overridden = false;
String userValue = this.props.getProperty(key);
if (userValue != null) {
if (override) {
LOG.warn(String.format("Property %s is provided but will be
overridden from %s to %s", key, userValue, value));
this.props.setProperty(key, value);
overridden = true;
}
} else {
this.props.setProperty(key, value);
}
return overridden;
} {code}
the improvement as below:
{code:java}
private boolean maybeOverride(String key, String value, boolean override) {
boolean overridden = false;
Object userValue = props.get(key);
if (userValue != null) {
if (override) {
LOG.warn(String.format("Property %s is provided but will be
overridden from %s to %s", key, userValue, value));
props.setProperty(key, value);
overridden = true;
}
} else {
props.setProperty(key, value);
}
return overridden;
} {code}
> KafkaSourceBuilder#maybeOverride get property as a string to checking if has
> value may not be the best way
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32400
> URL: https://issues.apache.org/jira/browse/FLINK-32400
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: KianChen
> Priority: Major
> Fix For: 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0
>
>
> Since flink 1.14 the FlinkKafkaConsumer is deprecated, using KafkaSouce
> instead. As the summary, the KafkaSourceBuilder can be set properties through
> setProperties(Properties props) method. And the value of props can be an
> object (such as the "enable.auto.commit" can be boolean). The #maybeOverride
> method used Properties#getProperty (in this method if the value is not type
> of string then return null) to get the value and checking if it's null.
> KafkaSourceBuilder#build calls #parseAndSetRequiredProperties method and it
> checks the "enable.auto.commit" property. If the value is true which is type
> of boolean then it will be overridden by false. But the kafka-clients
> supports boolean type. the value only for checking and printing, so I think
> use Properties#get method to get the value as an object is better.
> the source code as below:
> {code:java}
> private boolean maybeOverride(String key, String value, boolean override) {
> boolean overridden = false;
> String userValue = this.props.getProperty(key);
> if (userValue != null) {
> if (override) {
> LOG.warn(String.format("Property %s is provided but will be
> overridden from %s to %s", key, userValue, value));
> this.props.setProperty(key, value);
> overridden = true;
> }
> } else {
> this.props.setProperty(key, value);
> }
> return overridden;
> } {code}
> the improvement as below:
> {code:java}
> private boolean maybeOverride(String key, String value, boolean override) {
> boolean overridden = false;
> Object userValue = this.props.get(key);
> if (userValue != null) {
> if (override) {
> LOG.warn(String.format("Property %s is provided but will be
> overridden from %s to %s", key, userValue, value));
> props.setProperty(key, value);
> overridden = true;
> }
> } else {
> props.setProperty(key, value);
> }
> return overridden;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)