Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159876208
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -446,52 +443,60 @@ abstract class StreamTableEnvironment(
* Checks for at most one rowtime and proctime attribute.
* Returns the time attributes.
*
- * @param isReferenceByPosition schema mode see
[[isReferenceByPosition()]]
- *
* @return rowtime attribute and proctime attribute
*/
private def validateAndExtractTimeAttributes(
- isReferenceByPosition: Boolean,
streamType: TypeInformation[_],
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {
- val fieldTypes: Array[TypeInformation[_]] = streamType match {
- case c: CompositeType[_] => (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray
- case t: TypeInformation[_] => Array(t)
+ val (isRefByPos, fieldTypes) = streamType match {
+ case c: CompositeType[_] =>
+ // determine schema definition mode (by position or by name)
+ (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray)
+ case t: TypeInformation[_] =>
+ (false, Array(t))
}
var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None
+ def checkRowtimeType(t: TypeInformation[_]): Unit = {
+ if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+ throw new TableException(
+ s"The rowtime attribute can only replace a field with a valid
time type, " +
+ s"such as Timestamp or Long. But was: $t")
+ }
+ }
+
def extractRowtime(idx: Int, name: String, origName: Option[String]):
Unit = {
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table
schema.")
} else {
// if the fields are referenced by position,
// it is possible to replace an existing field or append the time
attribute at the end
- if (isReferenceByPosition) {
-
- val mappedIdx = streamType match {
- case pti: PojoTypeInfo[_] =>
- pti.getFieldIndex(origName.getOrElse(name))
- case _ => idx;
- }
-
+ if (isRefByPos) {
// check type of field that is replaced
- if (mappedIdx < 0) {
+ if (idx < 0) {
throw new TableException(
s"The rowtime attribute can only replace a valid field. " +
s"${origName.getOrElse(name)} is not a field of type
$streamType.")
}
- else if (mappedIdx < fieldTypes.length &&
- !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
- TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
- throw new TableException(
- s"The rowtime attribute can only replace a field with a
valid time type, " +
- s"such as Timestamp or Long. But was:
${fieldTypes(mappedIdx)}")
+ else if (idx < fieldTypes.length) {
+ checkRowtimeType(fieldTypes(idx))
+ }
+ }
+ // check for valid alias if referenced by name
+ else if (origName.isDefined) {
+ // check for valid alias
+ streamType match {
+ case ct: CompositeType[_] if ct.hasField(origName.get) =>
+ val t = ct.getTypeAt(ct.getFieldIndex(origName.get))
+ checkRowtimeType(t)
+ case _ =>
+ throw new TableException("An alias must always reference an
existing field.")
--- End diff --
Add `origName` (and the existing fields?) to the error message.
---