This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new fdec46d  [FLINK-17790][kafka] Fix JDK 11 compile error
fdec46d is described below

commit fdec46d267ca938522f170a38fd8eb268941aa03
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon May 18 15:23:05 2020 +0200

    [FLINK-17790][kafka] Fix JDK 11 compile error
---
 .../flink/streaming/connectors/kafka/table/KafkaOptions.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 3e326fd..337fe76 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -263,7 +263,6 @@ public class KafkaOptions {
        /**
         * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class name.
         */
-       @SuppressWarnings({"unchecked", "rawtypes"})
        public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
                        ReadableConfig tableOptions,
                        ClassLoader classLoader) {
@@ -340,8 +339,7 @@ public class KafkaOptions {
        /**
         * Returns a class value with the given class name.
         */
-       @SuppressWarnings("rawtypes")
-       private static FlinkKafkaPartitioner initializePartitioner(String name, 
ClassLoader classLoader) {
+       private static <T> FlinkKafkaPartitioner<T> 
initializePartitioner(String name, ClassLoader classLoader) {
                try {
                        Class<?> clazz = Class.forName(name, true, classLoader);
                        if 
(!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
@@ -350,7 +348,10 @@ public class KafkaOptions {
                                                                name,
                                                                
FlinkKafkaPartitioner.class.getName()));
                        }
-                       return InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
+                       @SuppressWarnings("unchecked")
+                       final FlinkKafkaPartitioner<T> kafkaPartitioner = 
InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
+
+                       return kafkaPartitioner;
                } catch (ClassNotFoundException | FlinkException e) {
                        throw new ValidationException(
                                        String.format("Could not find and 
instantiate partitioner class '%s'", name), e);

Reply via email to