[
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170804#comment-16170804
]
ASF GitHub Bot commented on FLINK-6563:
---------------------------------------
Github user haohui commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r139553991
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Assigns ingestion time timestamps and watermarks.
+ */
+ public static class IngestionTimeWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private long curTime = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Row element, long
previousElementTimestamp) {
+ long t = System.currentTimeMillis();
+ if (t > curTime) {
+ curTime = t;
+ }
+ return curTime;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(curTime - 1);
+ }
+ }
+
+ protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
+ return this.timestampAssigner;
+ }
+
+ /**
+ * Checks that the provided row time attribute is valid, determines its
position in the schema,
+ * and adjusts the return type.
+ *
+ * @param rowtime The attribute to check.
+ */
+ private void configureRowTimeAttribute(String rowtime) {
+ Preconditions.checkNotNull(rowtime, "Row time attribute must
not be null.");
+
+ if (this.ingestionTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify a row time attribute OR
an ingestion time attribute.");
+ }
+
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "Row time attribute can only be specified
once.");
+ }
+
+ // get current fields
+ String[] fieldNames = ((RowTypeInfo)
this.getReturnType()).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
this.getReturnType()).getFieldTypes();
+
+ // check if the rowtime field exists and remember position
+ this.rowtimeFieldPos = -1;
--- End diff --
Sorry for not stating it more clear. I was proposing having a tighter
connection between `TimestampAssigner` and `DefinedProctimeAttribute`/
`DefinedRowtimeAttribute`, but not merging `DefinedProctimeAttribute`/
`DefinedRowtimeAttribute`.
In our use cases it is not enough to just have a trivial implementation of
`TimeStampAssigner` to extract rowtime. We need to convert the corresponding
field from `double` to `bigint`. It would be great if we can do it a little bit
cleaner.
> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
> Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the
> processing time and the event time for the data stream. This jira proposes to
> expose these two information in the Kafka table source.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)