[ 
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171704#comment-16171704
 ] 

ASF GitHub Bot commented on FLINK-6563:
---------------------------------------

Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r139692943
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -62,10 +88,107 @@
                        DeserializationSchema<Row> deserializationSchema,
                        TypeInformation<Row> typeInfo) {
     
    -           this.topic = Preconditions.checkNotNull(topic, "Topic");
    -           this.properties = Preconditions.checkNotNull(properties, 
"Properties");
    -           this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
    -           this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information");
    +           this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
    +           this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
    +           this.deserializationSchema = 
Preconditions.checkNotNull(deserializationSchema, "Deserialization schema must 
not be null.");
    +           this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type 
information must not be null.");
    +   }
    +
    +   /**
    +    * Adds processing time attribute to the table. The attribute is 
appended to each row.
    +    *
    +    * @param proctime The name of the added processing time attribute.
    +    */
    +   public void addProcTimeAttribute(String proctime) {
    +           Preconditions.checkNotNull(proctime, "Processing time attribute 
must not be null.");
    +           this.procTimeAttribute = proctime;
    +   }
    +
    +   /**
    +    * Adds an ingestion time attribute to the table. The attribute is 
append at the end of each row.
    +    *
    +    * <p>For each row, the ingestion time attribute is initialized with 
the current time when the row
    +    * is read from Kafka. From there on, it behaves as an event time 
attribute.
    +    *
    +    * @param ingestionTime The name of the added ingestion time attribute.
    +    */
    +   public void addIngestionTimeAttribute(String ingestionTime) {
    +           Preconditions.checkNotNull(ingestionTime, "Ingestion time 
attribute must not be null.");
    +           if (this.rowTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "You can only specify an ingestion time 
attribute OR a row time attribute.");
    +           }
    +           this.rowTimeAttribute = ingestionTime;
    --- End diff --
    
    it should be `this.ingestionTimeAttribute = ingestionTime;` 
    Otherwise no need of `ingestionTimeAttribute` variable


> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
>                 Key: FLINK-6563
>                 URL: https://issues.apache.org/jira/browse/FLINK-6563
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>            Priority: Critical
>             Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the 
> processing time and the event time for the data stream. This jira proposes to 
> expose these two information in the Kafka table source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to