[
https://issues.apache.org/jira/browse/FLINK-5884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997614#comment-15997614
]
ASF GitHub Bot commented on FLINK-5884:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3808#discussion_r114781827
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -577,70 +577,94 @@ abstract class TableEnvironment(val config:
TableConfig) {
/**
* Returns field names and field positions for a given
[[TypeInformation]] and [[Array]] of
- * [[Expression]].
+ * [[Expression]]. It does not handle time attributes but considers
them in indices, if
+ * ignore flag is not false.
*
* @param inputType The [[TypeInformation]] against which the
[[Expression]]s are evaluated.
* @param exprs The expressions that define the field names.
+ * @param ignoreTimeAttributes ignore time attributes and handle them
as regular expressions.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and
corresponding field positions.
*/
protected[flink] def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression]): (Array[String], Array[Int]) = {
+ inputType: TypeInformation[A],
+ exprs: Array[Expression],
+ ignoreTimeAttributes: Boolean)
+ : (Array[String], Array[Int]) = {
TableEnvironment.validateType(inputType)
+ val filteredExprs = if (ignoreTimeAttributes) {
+ exprs.map {
+ case ta: TimeAttribute => ta.expression
+ case e@_ => e
+ }
+ } else {
+ exprs
+ }
+
val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table.
" +
"Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
- if (exprs.length != 1) {
- throw new TableException("Table of atomic type can only have a
single field.")
- }
- exprs.map {
- case UnresolvedFieldReference(name) => (0, name)
+ filteredExprs.zipWithIndex flatMap {
+ case (UnresolvedFieldReference(name), idx) =>
+ if (idx > 0) {
--- End diff --
Is this correct? Couldn't we have a `DataStream[String]` which gets mapped
to a `Table` with schema `'t.rowtime, 'name`? In this case the `idx` of `'name`
would be `1`.
> Integrate time indicators for Table API & SQL
> ---------------------------------------------
>
> Key: FLINK-5884
> URL: https://issues.apache.org/jira/browse/FLINK-5884
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Blocker
> Fix For: 1.3.0
>
>
> We already discussed the need for a proper integration of time indicators
> (event-time or processing-time) for both the Table API & SQL on the ML:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html
> This issue will track the progress. I will work on a design document how we
> can solve this issue.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)