hequn8128 commented on a change in pull request #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution URL: https://github.com/apache/flink/pull/9653#discussion_r323096196
########## File path: flink-python/src/main/java/org/apache/flink/table/functions/python/BeamTypeUtils.java ########## @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.python; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.fnexecution.v1.FlinkFnApi; +import org.apache.flink.table.functions.python.coders.BaseRowCoder; +import org.apache.flink.table.functions.python.coders.BigIntCoder; +import org.apache.flink.table.functions.python.coders.RowCoder; +import org.apache.flink.table.types.logical.AnyType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import org.apache.beam.sdk.coders.Coder; + +/** + * Utilities for converting Flink data types to Beam data types. + */ +@Internal +public final class BeamTypeUtils { + + private static final String EMPTY_STRING = ""; + + public static Coder toCoder(LogicalType logicalType) { + return logicalType.accept(new LogicalTypeToCoderConverter()); + } + + public static Coder toBlinkCoder(LogicalType logicalType) { + return logicalType.accept(new LogicalTypeToBlinkCoderConverter()); + } + + public static FlinkFnApi.Schema.FieldType toProtoType(LogicalType logicalType) { + return logicalType.accept(new LogicalTypeToProtoTypeConverter()); + } + + private static class LogicalTypeToCoderConverter implements LogicalTypeVisitor<Coder> { + + @Override + public Coder visit(CharType charType) { + return null; + } + + @Override + public Coder visit(VarCharType varCharType) { + return null; + } + + @Override + public Coder visit(BooleanType booleanType) { + return null; + } + + @Override + public Coder visit(BinaryType binaryType) { + return null; + } + + @Override + public Coder visit(VarBinaryType varBinaryType) { + return null; + } + + @Override + public Coder visit(DecimalType decimalType) { + return null; + } + + @Override + public Coder visit(TinyIntType tinyIntType) { + return null; + } + + @Override + public Coder visit(SmallIntType smallIntType) { + return null; + } + + @Override + public Coder visit(IntType intType) { + return null; + } + + @Override + public Coder visit(BigIntType bigIntType) { + return BigIntCoder.INSTANCE; + } + + @Override + public Coder visit(FloatType floatType) { + return null; + } + + @Override + public Coder visit(DoubleType doubleType) { + return null; + } + + @Override + public Coder visit(DateType dateType) { + return null; + } + + @Override + public Coder visit(TimeType timeType) { + return null; + } + + @Override + public Coder visit(TimestampType timestampType) { + return null; + } + + @Override + public Coder visit(ZonedTimestampType zonedTimestampType) { + return null; + } + + @Override + public Coder visit(LocalZonedTimestampType localZonedTimestampType) { + return null; + } + + @Override + public Coder visit(YearMonthIntervalType yearMonthIntervalType) { + return null; + } + + @Override + public Coder visit(DayTimeIntervalType dayTimeIntervalType) { + return null; + } + + @Override + public Coder visit(ArrayType arrayType) { + return null; + } + + @Override + public Coder visit(MultisetType multisetType) { + return null; + } + + @Override + public Coder visit(MapType mapType) { + return null; + } + + @Override + public Coder visit(RowType rowType) { + final Coder[] fieldCoders = rowType.getFields() + .stream() + .map(f -> f.getType().accept(this)) + .toArray(Coder[]::new); + return new RowCoder(fieldCoders); + } + + @Override + public Coder visit(DistinctType distinctType) { + return null; + } + + @Override + public Coder visit(StructuredType structuredType) { + return null; + } + + @Override + public Coder visit(NullType nullType) { + return null; + } + + @Override + public Coder visit(AnyType<?> anyType) { + return null; + } + + @Override + public Coder visit(SymbolType<?> symbolType) { + return null; + } + + @Override + public Coder visit(LogicalType other) { + return null; + } + } + + private static class LogicalTypeToBlinkCoderConverter implements LogicalTypeVisitor<Coder> { Review comment: Make this class extends from the `LogicalTypeToCoderConverter`? Most parts of the two classes are same. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
