Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159885776
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {
- val fieldTypes: Array[TypeInformation[_]] = streamType match {
- case c: CompositeType[_] => (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray
- case a: AtomicType[_] => Array(a)
+ val (isRefByPos, fieldTypes) = streamType match {
+ case c: CompositeType[_] =>
+ // determine schema definition mode (by position or by name)
+ (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray)
+ case t: TypeInformation[_] =>
+ (false, Array(t))
}
var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None
+ def checkRowtimeType(t: TypeInformation[_]): Unit = {
+ if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+ throw new TableException(
+ s"The rowtime attribute can only replace a field with a valid
time type, " +
+ s"such as Timestamp or Long. But was: $t")
+ }
+ }
+
def extractRowtime(idx: Int, name: String, origName: Option[String]):
Unit = {
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table
schema.")
} else {
- val mappedIdx = streamType match {
- case pti: PojoTypeInfo[_] =>
- pti.getFieldIndex(origName.getOrElse(name))
- case _ => idx;
+ // if the fields are referenced by position,
+ // it is possible to replace an existing field or append the time
attribute at the end
+ if (isRefByPos) {
+ // check type of field that is replaced
+ if (idx < 0) {
--- End diff --
I think this will never be `true` because the method is called with `idx`
being an index of `exprs`
---