Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3808#discussion_r114791208
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
---
@@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
extends JavaTypeFactoryImp
}
/**
+ * Creates a indicator type for processing-time, but with similar
properties as SQL timestamp.
+ */
+ def createProctimeIndicatorType(): RelDataType = {
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isEventTime = false)
+ )
+ }
+
+ /**
+ * Creates a indicator type for event-time, but with similar properties
as SQL timestamp.
+ */
+ def createRowtimeIndicatorType(): RelDataType = {
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ canonize(
+ new TimeIndicatorRelDataType(
+ getTypeSystem,
+ originalType.asInstanceOf[BasicSqlType],
+ isEventTime = true)
+ )
+ }
+
+ /**
* Creates a struct type with the input fieldNames and input fieldTypes
using FlinkTypeFactory
*
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's
[[TypeInformation]]
- * @return a struct type with the input fieldNames and input fieldTypes
+ * @param rowtime optional system field to indicate event-time; the
index determines the index
+ * in the final record and might replace an existing
field
--- End diff --
existing fields are shifted not replaced, correct?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---