[
https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171795#comment-16171795
]
ASF GitHub Bot commented on FLINK-6563:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4638#discussion_r139711376
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
@@ -106,8 +240,191 @@
return deserializationSchema;
}
- @Override
- public String explainSource() {
- return "";
+ /**
+ * Assigns ingestion time timestamps and watermarks.
+ */
+ public static class IngestionTimeWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private long curTime = Long.MIN_VALUE;
+
+ @Override
+ public long extractTimestamp(Row element, long
previousElementTimestamp) {
+ long t = System.currentTimeMillis();
+ if (t > curTime) {
+ curTime = t;
+ }
+ return curTime;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(curTime - 1);
+ }
+ }
+
+ protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
+ return this.timestampAssigner;
+ }
+
+ /**
+ * Checks that the provided row time attribute is valid, determines its
position in the schema,
+ * and adjusts the return type.
+ *
+ * @param rowtime The attribute to check.
+ */
+ private void configureRowTimeAttribute(String rowtime) {
+ Preconditions.checkNotNull(rowtime, "Row time attribute must
not be null.");
+
+ if (this.ingestionTimeAttribute != null) {
+ throw new ValidationException(
+ "You can only specify a row time attribute OR
an ingestion time attribute.");
+ }
+
+ if (this.rowTimeAttribute != null) {
+ throw new ValidationException(
+ "Row time attribute can only be specified
once.");
+ }
+
+ // get current fields
+ String[] fieldNames = ((RowTypeInfo)
this.getReturnType()).getFieldNames();
+ TypeInformation[] fieldTypes = ((RowTypeInfo)
this.getReturnType()).getFieldTypes();
+
+ // check if the rowtime field exists and remember position
+ this.rowtimeFieldPos = -1;
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(rowtime)) {
+ if (fieldTypes[i] != Types.LONG) {
+ throw new
IllegalArgumentException("Specified rowtime field must be of type BIGINT. " +
+ "Available fields: " +
toSchemaString(fieldNames, fieldTypes));
+ }
+ this.rowtimeFieldPos = i;
+ break;
+ }
+ }
+ if (this.rowtimeFieldPos < 0) {
+ throw new IllegalArgumentException("Specified rowtime
field must be present in data. " +
+ "Available fields: " +
toSchemaString(fieldNames, fieldTypes));
+ }
+ this.rowTimeAttribute = rowtime;
+
+ // adjust result type by removing rowtime field (will be added
later)
+ String[] newNames = new String[fieldNames.length - 1];
+ TypeInformation[] newTypes = new
TypeInformation[fieldTypes.length - 1];
+ for (int i = 0; i < rowtimeFieldPos; i++) {
+ newNames[i] = fieldNames[i];
+ newTypes[i] = fieldTypes[i];
+ }
+ for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) {
+ newNames[i - 1] = fieldNames[i];
+ newTypes[i - 1] = fieldTypes[i];
+ }
+ this.typeInfo = new RowTypeInfo(newTypes, newNames);
+ }
+
+ /**
+ * Util method to create a schema description.
+ *
+ * @param fieldNames The names of the fields.
+ * @param fieldTypes The types of the fields.
+ * @return A string describing the schema of the given field
information.
+ */
+ private String toSchemaString(String[] fieldNames, TypeInformation[]
fieldTypes) {
+ Preconditions.checkArgument(fieldNames.length ==
fieldTypes.length);
+
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < fieldNames.length - 1; i++) {
+ sb.append(fieldNames[i]);
+ sb.append(": ");
+ if (fieldTypes[i] == Types.BOOLEAN) {
+ sb.append("BOOLEAN");
+ } else if (fieldTypes[i] == Types.BYTE) {
+ sb.append("TINYINT");
+ } else if (fieldTypes[i] == Types.SHORT) {
+ sb.append("SMALLINT");
+ } else if (fieldTypes[i] == Types.INT) {
+ sb.append("INTEGER");
+ } else if (fieldTypes[i] == Types.LONG) {
+ sb.append("BIGINT");
+ } else if (fieldTypes[i] == Types.FLOAT) {
+ sb.append("FLOAT");
+ } else if (fieldTypes[i] == Types.DOUBLE) {
+ sb.append("DOUBLE");
+ } else if (fieldTypes[i] == Types.STRING) {
+ sb.append("VARCHAR");
+ } else if (fieldTypes[i] == Types.DECIMAL) {
+ sb.append("DECIMAL");
+ } else if (fieldTypes[i] == Types.SQL_DATE) {
+ sb.append("DATE");
+ } else if (fieldTypes[i] == Types.SQL_TIME) {
+ sb.append("TIME");
+ } else if (fieldTypes[i] == Types.SQL_TIMESTAMP) {
+ sb.append("TIMESTAMP");
+ } else {
+
sb.append(fieldTypes[i].getTypeClass().getSimpleName());
+ }
+ sb.append(", ");
+ }
+ sb.delete(sb.length() - 2, sb.length());
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * Assigns an existing field as timestamp and generates bounded
out-of-order watermarks.
+ */
+ public static class RowFieldWatermarkAssigner implements
AssignerWithPeriodicWatermarks<Row> {
+
+ private final int timeField;
+ private final long delayMs;
+ private long maxTime = Long.MIN_VALUE;
+
+ private RowFieldWatermarkAssigner(int timeFieldPos, long
delayMs) {
+ Preconditions.checkArgument(delayMs >= 0, "Watermark
delay must be positive.");
+ this.timeField = timeFieldPos;
+ this.delayMs = delayMs;
+ }
+
+ @Override
+ public long extractTimestamp(Row row, long
previousElementTimestamp) {
+ long t = (long) row.getField(timeField);
+ if (t > maxTime) {
+ maxTime = t;
+ }
+ return t;
+ }
+
+ @Nullable
+ @Override
+ public Watermark getCurrentWatermark() {
+ return new Watermark(maxTime - delayMs);
+ }
}
+
+ /**
+ * Removes a field from a Row.
+ */
+ public static class FieldRemover implements MapFunction<Row, Row> {
+
+ private int fieldToRemove;
+
+ private FieldRemover(int fieldToRemove) {
+ this.fieldToRemove = fieldToRemove;
+ }
+
+ @Override
+ public Row map(Row value) throws Exception {
+
+ Row out = new Row(value.getArity());
--- End diff --
Yes, you're right. Thanks!
> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
> Key: FLINK-6563
> URL: https://issues.apache.org/jira/browse/FLINK-6563
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
> Priority: Critical
> Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the
> processing time and the event time for the data stream. This jira proposes to
> expose these two information in the Kafka table source.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)