[
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170660#comment-16170660
]
ASF GitHub Bot commented on FLINK-6563:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r139534525
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Assigns ingestion time timestamps and watermarks.
+ */
+ public static class IngestionTimeWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private long curTime = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Row element, long
previousElementTimestamp) {
+ long t = System.currentTimeMillis();
+ if (t > curTime) {
+ curTime = t;
+ }
+ return curTime;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(curTime - 1);
+ }
+ }
+
+ protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
+ return this.timestampAssigner;
+ }
+
+ /**
+ * Checks that the provided row time attribute is valid, determines its
position in the schema,
+ * and adjusts the return type.
+ *
+ * @param rowtime The attribute to check.
+ */
+ private void configureRowTimeAttribute(String rowtime) {
+ Preconditions.checkNotNull(rowtime, "Row time attribute must
not be null.");
+
+ if (this.ingestionTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify a row time attribute OR
an ingestion time attribute.");
+ }
+
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "Row time attribute can only be specified
once.");
+ }
+
+ // get current fields
+ String[] fieldNames = ((RowTypeInfo)
this.getReturnType()).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
this.getReturnType()).getFieldTypes();
+
+ // check if the rowtime field exists and remember position
+ this.rowtimeFieldPos = -1;
--- End diff --
Thanks for the response!
I would not merge `DefinedProctimeAttribute` and `DefinedProctimeAttribute`
into a single interface. Both are for different types of timestamps (processing
time and event time):
The idea is to have two interfaces:
- `DefinedProctimeAttribute` adds a timestamp attribute for processing time
operations. The interface specifies the name of the new attribute. The source
will simply add a virtual attribute, that fetches the current time when it is
accessed.
- `DefinedRowtimeAttribute` (or its successor) specifies an existing
attribute (via its name) to be an event time attribute and provides a watermark
strategy (ascending, bounded-ooo, custom). If a `TableSource` specifies the
interface, the scan will generate watermarks based on the existing field and
the watermark strategy. During registration, we check that the field exists and
has the right type. So instead of adding a field, we simply ensure that we have
watermarks for it and change its type to be a time indicator field.
In this design, we `DefinedRowtimeAttribute` specifies the field and the
watermark strategy as you proposed.
> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
> Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the
> processing time and the event time for the data stream. This jira proposes to
> expose these two information in the Kafka table source.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)