Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r139534525
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Assigns ingestion time timestamps and watermarks.
+ */
+ public static class IngestionTimeWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private long curTime = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Row element, long
previousElementTimestamp) {
+ long t = System.currentTimeMillis();
+ if (t > curTime) {
+ curTime = t;
+ }
+ return curTime;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(curTime - 1);
+ }
+ }
+
+ protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
+ return this.timestampAssigner;
+ }
+
+ /**
+ * Checks that the provided row time attribute is valid, determines its
position in the schema,
+ * and adjusts the return type.
+ *
+ * @param rowtime The attribute to check.
+ */
+ private void configureRowTimeAttribute(String rowtime) {
+ Preconditions.checkNotNull(rowtime, "Row time attribute must
not be null.");
+
+ if (this.ingestionTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify a row time attribute OR
an ingestion time attribute.");
+ }
+
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "Row time attribute can only be specified
once.");
+ }
+
+ // get current fields
+ String[] fieldNames = ((RowTypeInfo)
this.getReturnType()).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
this.getReturnType()).getFieldTypes();
+
+ // check if the rowtime field exists and remember position
+ this.rowtimeFieldPos = -1;
--- End diff --
Thanks for the response!
I would not merge `DefinedProctimeAttribute` and `DefinedProctimeAttribute`
into a single interface. Both are for different types of timestamps (processing
time and event time):
The idea is to have two interfaces:
- `DefinedProctimeAttribute` adds a timestamp attribute for processing time
operations. The interface specifies the name of the new attribute. The source
will simply add a virtual attribute, that fetches the current time when it is
accessed.
- `DefinedRowtimeAttribute` (or its successor) specifies an existing
attribute (via its name) to be an event time attribute and provides a watermark
strategy (ascending, bounded-ooo, custom). If a `TableSource` specifies the
interface, the scan will generate watermarks based on the existing field and
the watermark strategy. During registration, we check that the field exists and
has the right type. So instead of adding a field, we simply ensure that we have
watermarks for it and change its type to be a time indicator field.
In this design, we `DefinedRowtimeAttribute` specifies the field and the
watermark strategy as you proposed.
---