This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new fdec46d [FLINK-17790][kafka] Fix JDK 11 compile error
fdec46d is described below
commit fdec46d267ca938522f170a38fd8eb268941aa03
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon May 18 15:23:05 2020 +0200
[FLINK-17790][kafka] Fix JDK 11 compile error
---
.../flink/streaming/connectors/kafka/table/KafkaOptions.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 3e326fd..337fe76 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -263,7 +263,6 @@ public class KafkaOptions {
/**
* The partitioner can be either "fixed", "round-robin" or a customized
partitioner full class name.
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
public static Optional<FlinkKafkaPartitioner<RowData>>
getFlinkKafkaPartitioner(
ReadableConfig tableOptions,
ClassLoader classLoader) {
@@ -340,8 +339,7 @@ public class KafkaOptions {
/**
* Returns a class value with the given class name.
*/
- @SuppressWarnings("rawtypes")
- private static FlinkKafkaPartitioner initializePartitioner(String name,
ClassLoader classLoader) {
+ private static <T> FlinkKafkaPartitioner<T>
initializePartitioner(String name, ClassLoader classLoader) {
try {
Class<?> clazz = Class.forName(name, true, classLoader);
if
(!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
@@ -350,7 +348,10 @@ public class KafkaOptions {
name,
FlinkKafkaPartitioner.class.getName()));
}
- return InstantiationUtil.instantiate(name,
FlinkKafkaPartitioner.class, classLoader);
+ @SuppressWarnings("unchecked")
+ final FlinkKafkaPartitioner<T> kafkaPartitioner =
InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
+
+ return kafkaPartitioner;
} catch (ClassNotFoundException | FlinkException e) {
throw new ValidationException(
String.format("Could not find and
instantiate partitioner class '%s'", name), e);