[
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16157798#comment-16157798
]
ASF GitHub Bot commented on FLINK-6563:
---------------------------------------
Github user haohui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r137671151
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Assigns ingestion time timestamps and watermarks.
+ */
+ public static class IngestionTimeWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private long curTime = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Row element, long
previousElementTimestamp) {
+ long t = System.currentTimeMillis();
+ if (t > curTime) {
+ curTime = t;
+ }
+ return curTime;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(curTime - 1);
+ }
+ }
+
+ protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
+ return this.timestampAssigner;
+ }
+
+ /**
+ * Checks that the provided row time attribute is valid, determines its
position in the schema,
+ * and adjusts the return type.
+ *
+ * @param rowtime The attribute to check.
+ */
+ private void configureRowTimeAttribute(String rowtime) {
+ Preconditions.checkNotNull(rowtime, "Row time attribute must
not be null.");
+
+ if (this.ingestionTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify a row time attribute OR
an ingestion time attribute.");
+ }
+
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "Row time attribute can only be specified
once.");
+ }
+
+ // get current fields
+ String[] fieldNames = ((RowTypeInfo)
this.getReturnType()).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
this.getReturnType()).getFieldTypes();
+
+ // check if the rowtime field exists and remember position
+ this.rowtimeFieldPos = -1;
--- End diff --
I'm wondering why we need to remove the field here and add it back later
on. Changing the orders of the fields seems problematic and can potentially
break serialization (in very hacky cases).
Another question is that to which extent a customized timestamp assigner
can reuse the code here? Is it possible to implement it as a decorator of the
table source? That way it opens up the possibilities to reuse the code for
other table sources.
> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
> Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the
> processing time and the event time for the data stream. This jira proposes to
> expose these two information in the Kafka table source.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)