[
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313211#comment-16313211
]
ASF GitHub Bot commented on FLINK-8203:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159887168
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -450,37 +450,54 @@ abstract class StreamTableEnvironment(
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {
- val fieldTypes: Array[TypeInformation[_]] = streamType match {
- case c: CompositeType[_] => (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray
- case a: AtomicType[_] => Array(a)
+ val (isRefByPos, fieldTypes) = streamType match {
+ case c: CompositeType[_] =>
+ // determine schema definition mode (by position or by name)
+ (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i =>
c.getTypeAt(i)).toArray)
+ case t: TypeInformation[_] =>
+ (false, Array(t))
}
var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None
+ def checkRowtimeType(t: TypeInformation[_]): Unit = {
+ if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+ throw new TableException(
+ s"The rowtime attribute can only replace a field with a valid
time type, " +
+ s"such as Timestamp or Long. But was: $t")
+ }
+ }
+
def extractRowtime(idx: Int, name: String, origName: Option[String]):
Unit = {
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table
schema.")
} else {
- val mappedIdx = streamType match {
- case pti: PojoTypeInfo[_] =>
- pti.getFieldIndex(origName.getOrElse(name))
- case _ => idx;
+ // if the fields are referenced by position,
+ // it is possible to replace an existing field or append the time
attribute at the end
+ if (isRefByPos) {
+ // check type of field that is replaced
+ if (idx < 0) {
+ throw new TableException(
+ s"The rowtime attribute can only replace a valid field. " +
+ s"${origName.getOrElse(name)} is not a field of type
$streamType.")
+ }
+ else if (idx < fieldTypes.length) {
+ checkRowtimeType(fieldTypes(idx))
+ }
}
- // check type of field that is replaced
- if (mappedIdx < 0) {
- throw new TableException(
- s"The rowtime attribute can only replace a valid field. " +
- s"${origName.getOrElse(name)} is not a field of type
$streamType.")
- }
- else if (mappedIdx < fieldTypes.length &&
- !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
- TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
- throw new TableException(
- s"The rowtime attribute can only replace a field with a valid
time type, " +
- s"such as Timestamp or Long. But was:
${fieldTypes(mappedIdx)}")
+ // check for valid alias if referenced by name
+ else if (origName.isDefined) {
--- End diff --
we should also check the field type for ref-by-name if no alias is used and
the name references a field that exists in the input, e.g., for `Tuple3` `('f1,
'f0.rowtime, 'f2)`.
> Make schema definition of DataStream/DataSet to Table conversion more flexible
> ------------------------------------------------------------------------------
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.4.0, 1.5.0
> Reporter: Fabian Hueske
> Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}},
> the schema of the table can be defined (by default it is extracted from the
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename
> fields, or define time attributes. Right now, there are several limitations
> how the fields can be defined that also depend on the type of the
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g.,
> tuples, case classes, Row) require schema definition based on the position of
> fields. Pojo types which have no fixed order of fields, require to refer to
> fields by name. Moreover, there are several restrictions on how time
> attributes can be defined, e.g., event time attribute must replace an
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are
> referenced by name (and possibly renamed using an alias ({{as}}). In this
> mode, fields can be reordered and projected out. Moreover, we can define
> proctime and eventtime attributes at arbitrary positions using arbitrary
> names (except those that existing the result schema). This mode can be used
> for any input type, including POJOs. This mode is used if all field
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to
> existing fields in the input type. In this mode, fields are simply renamed.
> Event-time attributes can replace the field on their position in the input
> data (if it is of correct type) or be appended at the end. Proctime
> attributes must be appended at the end. This mode can only be used if the
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and
> schema definition modes.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)