twalthr commented on a change in pull request #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#discussion_r228906097
########## File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * A table schema that represents a table's structure with field names and types. + */ +@PublicEvolving +public class TableSchema { + + private static final String ATOMIC_TYPE_FIELD_NAME = "f0"; + + private final String[] fieldNames; + + private final TypeInformation<?>[] fieldTypes; + + private final Map<String, Integer> fieldNameToIndex; + + public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes); + + if (fieldNames.length != fieldTypes.length) { + throw new TableException( + "Number of field names and field types must be equal.\n" + + "Number of names is " + fieldNames.length + ", number of types is " + fieldTypes.length + ".\n" + + "List of field names: " + Arrays.toString(fieldNames) + "\n" + + "List of field types: " + Arrays.toString(fieldTypes)); + } + + // validate and create name to index mapping + fieldNameToIndex = new HashMap<>(); + final Set<String> duplicateNames = new HashSet<>(); + final Set<String> uniqueNames = new HashSet<>(); + for (int i = 0; i < fieldNames.length; i++) { + // check for null + Preconditions.checkNotNull(fieldTypes[i]); + final String fieldName = Preconditions.checkNotNull(fieldNames[i]); + + // collect indices + fieldNameToIndex.put(fieldName, i); + + // check uniqueness of field names + if (uniqueNames.contains(fieldName)) { + duplicateNames.add(fieldName); + } else { + uniqueNames.add(fieldName); + } + } + if (!duplicateNames.isEmpty()) { + throw new TableException( + "Field names must be unique.\n" + + "List of duplicate fields: " + duplicateNames.toString() + "\n" + + "List of all fields: " + Arrays.toString(fieldNames)); + } + } + + /** + * Returns a deep copy of the table schema. + */ + public TableSchema copy() { + return new TableSchema(fieldNames.clone(), fieldTypes.clone()); Review comment: I don't think that we needed it. `TypeInformation` should be immutable. The current Scala implementation is also like this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
