[
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14613221#comment-14613221
]
ASF GitHub Bot commented on FLINK-1967:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/879#discussion_r33868015
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
---
@@ -17,87 +17,104 @@
package org.apache.flink.streaming.runtime.streamrecord;
-import java.io.Serializable;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.joda.time.Instant;
/**
- * Object for wrapping a tuple or other object with ID used for sending
records
- * between streaming task in Apache Flink stream processing.
+ * One value in a data stream. This stores the value and the associated
timestamp.
*/
-public class StreamRecord<T> implements Serializable {
- private static final long serialVersionUID = 1L;
+public class StreamRecord<T> {
- private T streamObject;
- public boolean isTuple;
+ // We store it as Object so that we can reuse a StreamElement for
emitting
+ // elements of a different type while still reusing the timestamp.
+ private Object value;
+ private Instant timestamp;
/**
- * Creates an empty StreamRecord
+ * Creates a new {@link StreamRecord} wrapping the given value. The
timestamp is set to the
+ * result of {@code new Instant(0)}.
*/
- public StreamRecord() {
+ public StreamRecord(T value) {
+ this(value, new Instant(0));
}
/**
- * Gets the wrapped object from the StreamRecord
- *
- * @return The object wrapped
+ * Creates a new {@link StreamRecord} wrapping the given value. The
timestamp is set to the
+ * given timestamp.
*/
- public T getObject() {
- return streamObject;
+ public StreamRecord(T value, Instant timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
}
+
/**
- * Gets the field of the contained object at the given position. If a
tuple
- * is wrapped then the getField method is invoked. If the StreamRecord
- * contains and object of Basic types only position 0 could be returned.
- *
- * @param pos
- * Position of the field to get.
- * @return Returns the object contained in the position.
+ * Returns the value wrapped in this stream value.
*/
- public Object getField(int pos) {
- if (isTuple) {
- return ((Tuple) streamObject).getField(pos);
- } else {
- if (pos == 0) {
- return streamObject;
- } else {
- throw new IndexOutOfBoundsException();
- }
- }
+ @SuppressWarnings("unchecked")
+ public T getValue() {
+ return (T) value;
}
/**
- * Extracts key for the stored object using the keySelector provided.
- *
- * @param keySelector
- * KeySelector for extracting the key
- * @return The extracted key
+ * Returns the timestamp associated with this stream value/
*/
- public <R> R getKey(KeySelector<T, R> keySelector) {
- try {
- return keySelector.getKey(streamObject);
- } catch (Exception e) {
- throw new RuntimeException("Failed to extract key: " +
e.getMessage());
- }
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Replace the currently stored value by the given new value. This
returns a StreamElement
+ * with the generic type parameter that matches the new value while
keeping the old
+ * timestamp.
+ *
+ * @param element Element to set in this stream value
+ * @return Returns the StreamElement with replaced value
+ */
+ @SuppressWarnings("unchecked")
+ public <X> StreamRecord<X> replace(X element) {
+ this.value = element;
+ return (StreamRecord<X>) this;
}
/**
- * Sets the object stored
- *
- * @param object
- * Object to set
- * @return Returns the StreamRecord object
+ * Replace the currently stored value by the given new value and the
currently stored
+ * timestamp with the new timestamp. This returns a StreamElement with
the generic type
+ * parameter that matches the new value.
+ *
+ * @param element Element The new value
+ * @param timestamp The new timestamp
+ * @return Returns the StreamElement with replaced value
*/
- public StreamRecord<T> setObject(T object) {
- this.streamObject = object;
- return this;
+ @SuppressWarnings("unchecked")
+ public <X> StreamRecord<X> replace(X element, Instant timestamp) {
--- End diff --
Yes, this is true, but I think in this case it is ok since the user never
sees the StreamRecord. If we did not reuse the StreamRecord this would have the
overhead of creating a new StreamRecord for every element that is processed.
> Introduce (Event)time in Streaming
> ----------------------------------
>
> Key: FLINK-1967
> URL: https://issues.apache.org/jira/browse/FLINK-1967
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the
> sources to add timestamps to records. This will also introduce punctuations
> (or low watermarks) to allow windows to work correctly on unordered,
> timestamped input data. In the process of this, the windowing subsystem also
> needs to be adapted to use the punctuations. Furthermore, all operators need
> to be made aware of punctuations and correctly forward them. Then, a new
> operator must be introduced to to allow modification of timestamps.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)