[
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15775055#comment-15775055
]
ASF GitHub Bot commented on FLINK-5280:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/3039#discussion_r93818027
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -535,4 +510,77 @@ object TableEnvironment {
new ScalaStreamTableEnv(executionEnvironment, tableConfig)
}
+
+ /**
+ * Returns field names and field positions for a given
[[TypeInformation]].
+ *
+ * Field names are automatically extracted for
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ * The method fails if inputType is not a
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ *
+ * @param inputType The TypeInformation extract the field names and
positions from.
+ * @tparam A The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and
corresponding field positions.
+ */
+ def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+ validateType(inputType)
+
+ val fieldNames: Array[String] = inputType match {
+ case t: CompositeType[_] => t.getFieldNames
+ case a: AtomicType[_] => Array("f0")
+ case tpe =>
+ throw new TableException(s"Currently only CompositeType and
AtomicType are supported. " +
+ s"Type $tpe lacks explicit field naming")
+ }
+
+ if (fieldNames.contains("*")) {
+ throw new TableException("Field name can not be '*'.")
+ }
+
+ fieldNames
+ }
+
+ /**
+ * Validate if class represented by the typeInfo is static and globally
accessible
+ * @param typeInfo type to check
+ * @throws TableException if type does not meet these criteria
+ */
+ def validateType(typeInfo: TypeInformation[_]): Unit = {
+ val clazz = typeInfo.getTypeClass
+ if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+ !Modifier.isPublic(clazz.getModifiers) ||
+ clazz.getCanonicalName == null) {
+ throw TableException(s"Class '$clazz' described in type information
'$typeInfo' must be " +
+ s"static and globally accessible.")
+ }
+ }
+
+ def getFieldIndexes(inputType: TypeInformation[_]): Array[Int] = {
+ getFieldNames(inputType).indices.toArray
+ }
+
+ /**
+ * Returns field types for a given [[TypeInformation]].
+ *
+ * Field types are automatically extracted for
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ * The method fails if inputType is not a
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ *
+ * @param inputType The TypeInformation to extract field types from.
+ * @return an holding the field types.
--- End diff --
An array holding the field types.
> Extend TableSource to support nested data
> -----------------------------------------
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of
> flat rows.
> However, there are several storage formats for nested data that should be
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in
> Calcite's schema need to be extended to support nested data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)