[ 
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14613233#comment-14613233
 ] 

ASF GitHub Bot commented on FLINK-1967:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/879#discussion_r33868988
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
 ---
    @@ -17,87 +17,104 @@
     
     package org.apache.flink.streaming.runtime.streamrecord;
     
    -import java.io.Serializable;
    -
    -import org.apache.flink.api.java.functions.KeySelector;
    -import org.apache.flink.api.java.tuple.Tuple;
    +import org.joda.time.Instant;
     
     /**
    - * Object for wrapping a tuple or other object with ID used for sending 
records
    - * between streaming task in Apache Flink stream processing.
    + * One value in a data stream. This stores the value and the associated 
timestamp.
      */
    -public class StreamRecord<T> implements Serializable {
    -   private static final long serialVersionUID = 1L;
    +public class StreamRecord<T> {
     
    -   private T streamObject;
    -   public boolean isTuple;
    +   // We store it as Object so that we can reuse a StreamElement for 
emitting
    +   // elements of a different type while still reusing the timestamp.
    +   private Object value;
    +   private Instant timestamp;
     
        /**
    -    * Creates an empty StreamRecord
    +    * Creates a new {@link StreamRecord} wrapping the given value. The 
timestamp is set to the
    +    * result of {@code new Instant(0)}.
         */
    -   public StreamRecord() {
    +   public StreamRecord(T value) {
    +           this(value, new Instant(0));
        }
     
        /**
    -    * Gets the wrapped object from the StreamRecord
    -    * 
    -    * @return The object wrapped
    +    * Creates a new {@link StreamRecord} wrapping the given value. The 
timestamp is set to the
    +    * given timestamp.
         */
    -   public T getObject() {
    -           return streamObject;
    +   public StreamRecord(T value, Instant timestamp) {
    +           this.value = value;
    +           this.timestamp = timestamp;
        }
     
    +
        /**
    -    * Gets the field of the contained object at the given position. If a 
tuple
    -    * is wrapped then the getField method is invoked. If the StreamRecord
    -    * contains and object of Basic types only position 0 could be returned.
    -    * 
    -    * @param pos
    -    *            Position of the field to get.
    -    * @return Returns the object contained in the position.
    +    * Returns the value wrapped in this stream value.
         */
    -   public Object getField(int pos) {
    -           if (isTuple) {
    -                   return ((Tuple) streamObject).getField(pos);
    -           } else {
    -                   if (pos == 0) {
    -                           return streamObject;
    -                   } else {
    -                           throw new IndexOutOfBoundsException();
    -                   }
    -           }
    +   @SuppressWarnings("unchecked")
    +   public T getValue() {
    +           return (T) value;
        }
     
        /**
    -    * Extracts key for the stored object using the keySelector provided.
    -    * 
    -    * @param keySelector
    -    *            KeySelector for extracting the key
    -    * @return The extracted key
    +    * Returns the timestamp associated with this stream value/
         */
    -   public <R> R getKey(KeySelector<T, R> keySelector) {
    -           try {
    -                   return keySelector.getKey(streamObject);
    -           } catch (Exception e) {
    -                   throw new RuntimeException("Failed to extract key: " + 
e.getMessage());
    -           }
    +   public Instant getTimestamp() {
    +           return timestamp;
    +   }
    +
    +   /**
    +    * Replace the currently stored value by the given new value. This 
returns a StreamElement
    +    * with the generic type parameter that matches the new value while 
keeping the old
    +    * timestamp.
    +    *
    +    * @param element Element to set in this stream value
    +    * @return Returns the StreamElement with replaced value
    +    */
    +   @SuppressWarnings("unchecked")
    +   public <X> StreamRecord<X> replace(X element) {
    +           this.value = element;
    +           return (StreamRecord<X>) this;
        }
     
        /**
    -    * Sets the object stored
    -    * 
    -    * @param object
    -    *            Object to set
    -    * @return Returns the StreamRecord object
    +    * Replace the currently stored value by the given new value and the 
currently stored
    +    * timestamp with the new timestamp. This returns a StreamElement with 
the generic type
    +    * parameter that matches the new value.
    +    *
    +    * @param element Element The new value
    +    * @param timestamp The new timestamp
    +    * @return Returns the StreamElement with replaced value
         */
    -   public StreamRecord<T> setObject(T object) {
    -           this.streamObject = object;
    -           return this;
    +   @SuppressWarnings("unchecked")
    +   public <X> StreamRecord<X> replace(X element, Instant timestamp) {
    --- End diff --
    
    If you look at `StreamMap` you will see what I mean. Map maps from `IN` to 
`OUT`: type-change. But the stream element can be reused to emit the value.
    
    A ClassCastException should occur when `getValue()` is called and the type 
of the internal value does not match what is expected by the outside code.


> Introduce (Event)time in Streaming
> ----------------------------------
>
>                 Key: FLINK-1967
>                 URL: https://issues.apache.org/jira/browse/FLINK-1967
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the 
> sources to add timestamps to records. This will also introduce punctuations 
> (or low watermarks) to allow windows to work correctly on unordered, 
> timestamped input data. In the process of this, the windowing subsystem also 
> needs to be adapted to use the punctuations. Furthermore, all operators need 
> to be made aware of punctuations and correctly forward them. Then, a new 
> operator must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to