Airblader commented on a change in pull request #15098: URL: https://github.com/apache/flink/pull/15098#discussion_r590393225
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of a {@link CatalogTable}. */ +@Internal +class DefaultCatalogTable implements CatalogTable { + + private final Schema schema; + private final @Nullable String comment; + private final List<String> partitionKeys; + private final Map<String, String> options; + + DefaultCatalogTable( + Schema schema, + @Nullable String comment, + List<String> partitionKeys, + Map<String, String> options) { + this.schema = checkNotNull(schema, "Schema must not be null."); + this.comment = comment; + this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null."); + this.options = checkNotNull(options, "Options must not be null."); + + checkArgument( + options.entrySet().stream() + .allMatch(e -> e.getKey() != null && e.getValue() != null), + "Options cannot have null keys or values."); + } + + // TODO uncomment + // @Override + public Schema getUnresolvedSchema() { + return schema; + } + + @Override + public TableSchema getSchema() { + // TODO move to upper class + return null; + } + + @Override + public String getComment() { + return comment != null ? comment : ""; + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List<String> getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map<String, String> getOptions() { + return options; + } + + @Override + public CatalogBaseTable copy() { + return this; Review comment: I would expect `copy` to create a new instance, even if it is immutable. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of a {@link CatalogTable}. */ +@Internal +class DefaultCatalogTable implements CatalogTable { + + private final Schema schema; + private final @Nullable String comment; + private final List<String> partitionKeys; + private final Map<String, String> options; + + DefaultCatalogTable( + Schema schema, + @Nullable String comment, + List<String> partitionKeys, + Map<String, String> options) { + this.schema = checkNotNull(schema, "Schema must not be null."); + this.comment = comment; + this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must not be null."); + this.options = checkNotNull(options, "Options must not be null."); + + checkArgument( + options.entrySet().stream() + .allMatch(e -> e.getKey() != null && e.getValue() != null), + "Options cannot have null keys or values."); + } + + // TODO uncomment + // @Override + public Schema getUnresolvedSchema() { + return schema; + } + + @Override + public TableSchema getSchema() { + // TODO move to upper class + return null; + } + + @Override + public String getComment() { + return comment != null ? comment : ""; + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List<String> getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map<String, String> getOptions() { + return options; + } + + @Override + public CatalogBaseTable copy() { + return this; + } + + @Override + public CatalogTable copy(Map<String, String> options) { Review comment: API-wise it strikes me a bit odd to have another `copy` signature which allows overriding one specific argument. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java ########## @@ -40,6 +65,404 @@ */ public static final String FLINK_PROPERTY_PREFIX = "flink."; + /** Serializes the given {@link ResolvedCatalogTable} into a map of string properties. */ + public static Map<String, String> serializeCatalogTable(ResolvedCatalogTable resolvedTable) { + try { + final Map<String, String> properties = new HashMap<>(); + + serializeResolvedSchema(properties, resolvedTable.getResolvedSchema()); + + properties.put(COMMENT, resolvedTable.getComment()); + + serializePartitionKeys(properties, resolvedTable.getPartitionKeys()); + + properties.putAll(resolvedTable.getOptions()); + + properties.remove(IS_GENERIC); // reserved option + + return properties; + } catch (Exception e) { + throw new CatalogException("Error in serializing catalog table.", e); + } + } + + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ + public static CatalogTable deserializeCatalogTable(Map<String, String> properties) { + try { + final Schema schema = deserializeSchema(properties); + + final @Nullable String comment = properties.get(COMMENT); + + final List<String> partitionKeys = deserializePartitionKeys(properties); + + final Map<String, String> options = deserializeOptions(properties); + + return CatalogTable.of(schema, comment, partitionKeys, options); + } catch (Exception e) { + throw new CatalogException("Error in deserializing catalog table.", e); + } + } + + // -------------------------------------------------------------------------------------------- + // Helper methods and constants + // -------------------------------------------------------------------------------------------- + + private static final String SCHEMA = "schema"; + + private static final String NAME = "name"; + + private static final String DATA_TYPE = "data-type"; + + private static final String EXPR = "expr"; + + private static final String METADATA = "metadata"; + + private static final String VIRTUAL = "virtual"; + + private static final String PARTITION_KEYS = "partition.keys"; + + private static final String WATERMARK = "watermark"; + + private static final String WATERMARK_ROWTIME = "rowtime"; + + private static final String WATERMARK_STRATEGY = "strategy"; + + private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR; + + private static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE; + + private static final String PRIMARY_KEY_NAME = "primary-key.name"; + + private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns"; + + private static final String COMMENT = "comment"; + + private static Map<String, String> deserializeOptions(Map<String, String> map) { + return map.entrySet().stream() + .filter( + e -> { + final String key = e.getKey(); + return !key.startsWith(SCHEMA + '.') + && !key.startsWith(PARTITION_KEYS + '.') + && !key.equals(COMMENT); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private static List<String> deserializePartitionKeys(Map<String, String> map) { + final int partitionCount = getCount(map, PARTITION_KEYS, NAME); + final List<String> partitionKeys = new ArrayList<>(); + for (int i = 0; i < partitionCount; i++) { + final String partitionNameKey = PARTITION_KEYS + '.' + i + '.' + NAME; + final String partitionName = getValue(map, partitionNameKey); + partitionKeys.add(partitionName); + } + return partitionKeys; + } + + private static Schema deserializeSchema(Map<String, String> map) { + final Builder builder = Schema.newBuilder(); + + deserializeColumns(map, builder); + + deserializeWatermark(map, builder); + + deserializePrimaryKey(map, builder); + + return builder.build(); + } + + private static void deserializePrimaryKey(Map<String, String> map, Builder builder) { + final String constraintNameKey = SCHEMA + '.' + PRIMARY_KEY_NAME; + final String columnsKey = SCHEMA + '.' + PRIMARY_KEY_COLUMNS; + if (map.containsKey(constraintNameKey)) { + final String constraintName = getValue(map, constraintNameKey); + final String[] columns = getValue(map, columnsKey, s -> s.split(",")); + builder.primaryKeyNamed(constraintName, columns); + } + } + + private static void deserializeWatermark(Map<String, String> map, Builder builder) { + final String watermarkKey = SCHEMA + '.' + WATERMARK; + final int watermarkCount = getCount(map, watermarkKey, WATERMARK_ROWTIME); + for (int i = 0; i < watermarkCount; i++) { + final String rowtimeKey = watermarkKey + '.' + i + '.' + WATERMARK_ROWTIME; + final String exprKey = watermarkKey + '.' + i + '.' + WATERMARK_STRATEGY_EXPR; + + final String rowtime = getValue(map, rowtimeKey); + final String expr = getValue(map, exprKey); + builder.watermark(rowtime, expr); + } + } + + private static void deserializeColumns(Map<String, String> map, Builder builder) { + final int fieldCount = getCount(map, SCHEMA, NAME); + + for (int i = 0; i < fieldCount; i++) { + final String nameKey = SCHEMA + '.' + i + '.' + NAME; + final String dataTypeKey = SCHEMA + '.' + i + '.' + DATA_TYPE; + final String exprKey = SCHEMA + '.' + i + '.' + EXPR; + final String metadataKey = SCHEMA + '.' + i + '.' + METADATA; + final String virtualKey = SCHEMA + '.' + i + '.' + VIRTUAL; + + final String name = getValue(map, nameKey); + + // computed column + if (map.containsKey(exprKey)) { + final String expr = getValue(map, exprKey); + builder.columnByExpression(name, expr); + } + // metadata column + else if (map.containsKey(metadataKey)) { + final String metadata = getValue(map, metadataKey); + final String dataType = getValue(map, dataTypeKey); + final boolean isVirtual = getValue(map, virtualKey, Boolean::parseBoolean); + if (metadata.equals(name)) { + builder.columnByMetadata(name, dataType, isVirtual); + } else { + builder.columnByMetadata(name, dataType, metadata, isVirtual); + } + } + // physical column + else { + final String dataType = getValue(map, dataTypeKey); + builder.column(name, dataType); + } + } + } + + private static void serializePartitionKeys(Map<String, String> map, List<String> keys) { + checkNotNull(keys); + + putIndexedProperties( + map, + PARTITION_KEYS, + Collections.singletonList(NAME), + keys.stream().map(Collections::singletonList).collect(Collectors.toList())); + } + + private static void serializeResolvedSchema(Map<String, String> map, ResolvedSchema schema) { + checkNotNull(schema); + + serializeColumns(map, schema.getColumns()); + + serializeWatermarkSpecs(map, schema.getWatermarkSpecs()); + + schema.getPrimaryKey().ifPresent(pk -> serializePrimaryKey(map, pk)); + } + + private static void serializePrimaryKey(Map<String, String> map, UniqueConstraint constraint) { + map.put(SCHEMA + '.' + PRIMARY_KEY_NAME, constraint.getName()); + map.put(SCHEMA + '.' + PRIMARY_KEY_COLUMNS, String.join(",", constraint.getColumns())); + } + + private static void serializeWatermarkSpecs( + Map<String, String> map, List<WatermarkSpec> specs) { + if (!specs.isEmpty()) { + final List<List<String>> watermarkValues = new ArrayList<>(); + for (WatermarkSpec spec : specs) { + watermarkValues.add( + Arrays.asList( + spec.getRowtimeAttribute(), + serializeResolvedExpression(spec.getWatermarkExpression()), + serializeDataType( + spec.getWatermarkExpression().getOutputDataType()))); + } + putIndexedProperties( + map, + SCHEMA + '.' + WATERMARK, + Arrays.asList( + WATERMARK_ROWTIME, + WATERMARK_STRATEGY_EXPR, + WATERMARK_STRATEGY_DATA_TYPE), + watermarkValues); + } + } + + private static void serializeColumns(Map<String, String> map, List<Column> columns) { + final String[] names = serializeColumnNames(columns); + final String[] dataTypes = serializeColumnDataTypes(columns); + final String[] expressions = serializeColumnComputations(columns); + final String[] metadata = serializeColumnMetadataKeys(columns); + final String[] virtual = serializeColumnVirtuality(columns); + + final List<List<String>> values = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + values.add( + Arrays.asList(names[i], dataTypes[i], expressions[i], metadata[i], virtual[i])); + } + + putIndexedProperties( + map, SCHEMA, Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, VIRTUAL), values); + } + + private static String serializeResolvedExpression(ResolvedExpression resolvedExpression) { + try { + return resolvedExpression.asSerializableString(); + } catch (Exception e) { Review comment: Maybe this warrants a more specific exception given that we want to deduce a specific error message from it? Same question applies a few lines below. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java ########## @@ -16,18 +16,31 @@ * limitations under the License. */ -package org.apache.flink.table.catalog.config; +package org.apache.flink.table.catalog; -/** Config for catalog and catalog meta-objects. */ -public class CatalogConfig { +import org.apache.flink.annotation.Internal; - /** Flag to distinguish if a meta-object is generic Flink object or not. */ +/** Utilities for de/serializing {@link Catalog} objects into a map of string properties. */ +@Internal +public final class CatalogPropertiesUtil { + + /** + * Flag to distinguish if a meta-object is a generic Flink object or not. + * + * <p>It is used to distinguish between Flink's generic connector discovery logic or specialized + * catalog connectors. + */ public static final String IS_GENERIC = "is_generic"; - // Globally reserved prefix for catalog properties. - // User defined properties should not with this prefix. - // Used to distinguish properties created by Hive and Flink, - // as Hive metastore has its own properties created upon table creation and migration between - // different versions of metastore. + /** + * Globally reserved prefix for catalog properties. User defined properties should not with this Review comment: nit: this is pre-existing, but maybe we fix the missing verb in "should not with this"? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java ########## @@ -40,6 +65,404 @@ */ public static final String FLINK_PROPERTY_PREFIX = "flink."; + /** Serializes the given {@link ResolvedCatalogTable} into a map of string properties. */ + public static Map<String, String> serializeCatalogTable(ResolvedCatalogTable resolvedTable) { + try { + final Map<String, String> properties = new HashMap<>(); + + serializeResolvedSchema(properties, resolvedTable.getResolvedSchema()); + + properties.put(COMMENT, resolvedTable.getComment()); + + serializePartitionKeys(properties, resolvedTable.getPartitionKeys()); + + properties.putAll(resolvedTable.getOptions()); + + properties.remove(IS_GENERIC); // reserved option + + return properties; + } catch (Exception e) { + throw new CatalogException("Error in serializing catalog table.", e); + } + } + + /** Deserializes the given map of string properties into an unresolved {@link CatalogTable}. */ + public static CatalogTable deserializeCatalogTable(Map<String, String> properties) { + try { + final Schema schema = deserializeSchema(properties); + + final @Nullable String comment = properties.get(COMMENT); + + final List<String> partitionKeys = deserializePartitionKeys(properties); + + final Map<String, String> options = deserializeOptions(properties); + + return CatalogTable.of(schema, comment, partitionKeys, options); + } catch (Exception e) { + throw new CatalogException("Error in deserializing catalog table.", e); + } + } + + // -------------------------------------------------------------------------------------------- + // Helper methods and constants + // -------------------------------------------------------------------------------------------- + + private static final String SCHEMA = "schema"; + + private static final String NAME = "name"; + + private static final String DATA_TYPE = "data-type"; + + private static final String EXPR = "expr"; + + private static final String METADATA = "metadata"; + + private static final String VIRTUAL = "virtual"; + + private static final String PARTITION_KEYS = "partition.keys"; + + private static final String WATERMARK = "watermark"; + + private static final String WATERMARK_ROWTIME = "rowtime"; + + private static final String WATERMARK_STRATEGY = "strategy"; + + private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR; + + private static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE; + + private static final String PRIMARY_KEY_NAME = "primary-key.name"; + + private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns"; + + private static final String COMMENT = "comment"; + + private static Map<String, String> deserializeOptions(Map<String, String> map) { Review comment: I would prefer serializing the options with a specific prefix as well, not so much to remove this condition on all prefixes here, but also to rule out collisions in the naming. Maybe it would even make sense to deviate from the `.` separator for this prefix and use something "stronger" to denote the type like `$`, i.e. `schema$primary-key.name$foo`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
