[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534867#comment-16534867
 ] 

ASF GitHub Bot commented on FLINK-8558:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6264#discussion_r200597054
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 ---
    @@ -18,48 +18,99 @@
     
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.annotation.Internal;
     import org.apache.flink.api.common.serialization.DeserializationSchema;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
    +import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
     import org.apache.flink.table.api.TableSchema;
    +import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
     import org.apache.flink.table.sources.StreamTableSource;
     import org.apache.flink.types.Row;
     
    +import java.util.List;
    +import java.util.Map;
     import java.util.Properties;
     
     /**
      * Kafka {@link StreamTableSource} for Kafka 0.10.
      */
    -@PublicEvolving
    -public abstract class Kafka010TableSource extends KafkaTableSource {
    +@Internal
    +public class Kafka010TableSource extends KafkaTableSource {
     
    -   // The deserialization schema for the Kafka records
    -   private final DeserializationSchema<Row> deserializationSchema;
    +   /**
    +    * Creates a Kafka 0.10 {@link StreamTableSource}.
    +    *
    +    * @param schema                      Schema of the produced table.
    +    * @param proctimeAttribute           Field name of the processing time 
attribute, null if no
    +    *                                    processing time field is defined.
    +    * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
    +    * @param fieldMapping                Mapping for the fields of the 
table schema to
    +    *                                    fields of the physical returned 
type or null.
    +    * @param topic                       Kafka topic to consume.
    +    * @param properties                  Properties for the Kafka consumer.
    +    * @param deserializationSchema       Deserialization schema for 
decoding records from Kafka.
    +    * @param startupMode                 Startup mode for the contained 
consumer.
    +    * @param specificStartupOffsets      Specific startup offsets; only 
relevant when startup
    +    *                                    mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
    +    */
    +   public Kafka010TableSource(
    +                   TableSchema schema,
    +                   String proctimeAttribute,
    +                   List<RowtimeAttributeDescriptor> 
rowtimeAttributeDescriptors,
    +                   Map<String, String> fieldMapping,
    +                   String topic, Properties properties,
    +                   DeserializationSchema<Row> deserializationSchema,
    +                   StartupMode startupMode,
    +                   Map<KafkaTopicPartition, Long> specificStartupOffsets) {
    +
    +           super(
    +                   schema,
    +                   proctimeAttribute,
    +                   rowtimeAttributeDescriptors,
    +                   fieldMapping,
    +                   topic,
    +                   properties,
    +                   deserializationSchema,
    +                   startupMode,
    +                   specificStartupOffsets);
    +   }
    +
    +   /**
    +    * Creates a Kafka 0.10 {@link StreamTableSource}.
    +    *
    +    * @param schema                Schema of the produced table.
    +    * @param topic                 Kafka topic to consume.
    +    * @param properties            Properties for the Kafka consumer.
    +    * @param deserializationSchema Deserialization schema for decoding 
records from Kafka.
    +    */
    +   public Kafka010TableSource(
    +                   TableSchema schema,
    +                   String topic,
    +                   Properties properties,
    +                   DeserializationSchema<Row> deserializationSchema) {
    +
    +           super(schema, topic, properties, deserializationSchema);
    +   }
     
        /**
         * Creates a Kafka 0.10 {@link StreamTableSource}.
         *
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
    -    * @param typeInfo              Type information describing the result 
type. The field names are used
    -    *                              to parse the JSON file and so are the 
types.
    +    * @param typeInfo              Not relevant anymore.
    --- End diff --
    
    Why is it not relevant? Doesn't it brake the backward compatibility? If so, 
it would be safer to drop the constructor altogether.
    
    The only reason for keeping it, would be if **ALL** old invocations will 
still work the same as they used to, regardless of this value. However in that 
case, I would be also inclined to drop this constructor, since it's easy change 
for the users and the class was `@PublicEvolving` and now it's `@Internal`


> Add unified format interfaces and format discovery
> --------------------------------------------------
>
>                 Key: FLINK-8558
>                 URL: https://issues.apache.org/jira/browse/FLINK-8558
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to