[ https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902054#comment-15902054 ]
ASF GitHub Bot commented on FLINK-3850: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105029771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { + throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** + * Wrapper for {@link getForwardedFields} + */ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => + Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + + getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** + * Wraps provided indices with proper names. + * e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row + * + * @param inputType information of input data + * @param outputType information of output data + * @param forwardIndices tuple of (input, output) indices of a forwarded field + * @param customWrapper used for figuring out proper type in specific cases, + * e.g. {@see DataSetSingleRowJoin} + * @return string with forwarded fields mapped from input to output + */ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => --- End diff -- I didn't notice that the `customWrapper` was used. Can it be removed? > Add forward field annotations to DataSet operators generated by the Table API > ----------------------------------------------------------------------------- > > Key: FLINK-3850 > URL: https://issues.apache.org/jira/browse/FLINK-3850 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Nikolay Vasilishin > > The DataSet API features semantic annotations [1] to hint the optimizer which > input fields an operator copies. This information is valuable for the > optimizer because it can infer that certain physical properties such as > partitioning or sorting are not destroyed by user functions and thus generate > more efficient execution plans. > The Table API is built on top of the DataSet API and generates DataSet > programs and code for user-defined functions. Hence, it knows exactly which > fields are modified and which not. We should use this information to > automatically generate forward field annotations and attach them to the > operators. This can help to significantly improve the performance of certain > jobs. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations -- This message was sent by Atlassian JIRA (v6.3.15#6346)