Airblader commented on a change in pull request #15098:
URL: https://github.com/apache/flink/pull/15098#discussion_r590393225



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of a {@link CatalogTable}. */
+@Internal
+class DefaultCatalogTable implements CatalogTable {
+
+    private final Schema schema;
+    private final @Nullable String comment;
+    private final List<String> partitionKeys;
+    private final Map<String, String> options;
+
+    DefaultCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        this.schema = checkNotNull(schema, "Schema must not be null.");
+        this.comment = comment;
+        this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
+        this.options = checkNotNull(options, "Options must not be null.");
+
+        checkArgument(
+                options.entrySet().stream()
+                        .allMatch(e -> e.getKey() != null && e.getValue() != 
null),
+                "Options cannot have null keys or values.");
+    }
+
+    // TODO uncomment
+    // @Override
+    public Schema getUnresolvedSchema() {
+        return schema;
+    }
+
+    @Override
+    public TableSchema getSchema() {
+        // TODO move to upper class
+        return null;
+    }
+
+    @Override
+    public String getComment() {
+        return comment != null ? comment : "";
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return !partitionKeys.isEmpty();
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public CatalogBaseTable copy() {
+        return this;

Review comment:
       I would expect `copy` to create a new instance, even if it is immutable.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of a {@link CatalogTable}. */
+@Internal
+class DefaultCatalogTable implements CatalogTable {
+
+    private final Schema schema;
+    private final @Nullable String comment;
+    private final List<String> partitionKeys;
+    private final Map<String, String> options;
+
+    DefaultCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        this.schema = checkNotNull(schema, "Schema must not be null.");
+        this.comment = comment;
+        this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
+        this.options = checkNotNull(options, "Options must not be null.");
+
+        checkArgument(
+                options.entrySet().stream()
+                        .allMatch(e -> e.getKey() != null && e.getValue() != 
null),
+                "Options cannot have null keys or values.");
+    }
+
+    // TODO uncomment
+    // @Override
+    public Schema getUnresolvedSchema() {
+        return schema;
+    }
+
+    @Override
+    public TableSchema getSchema() {
+        // TODO move to upper class
+        return null;
+    }
+
+    @Override
+    public String getComment() {
+        return comment != null ? comment : "";
+    }
+
+    @Override
+    public boolean isPartitioned() {
+        return !partitionKeys.isEmpty();
+    }
+
+    @Override
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    @Override
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public CatalogBaseTable copy() {
+        return this;
+    }
+
+    @Override
+    public CatalogTable copy(Map<String, String> options) {

Review comment:
       API-wise it strikes me a bit odd to have another `copy` signature which 
allows overriding one specific argument.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
##########
@@ -40,6 +65,404 @@
      */
     public static final String FLINK_PROPERTY_PREFIX = "flink.";
 
+    /** Serializes the given {@link ResolvedCatalogTable} into a map of string 
properties. */
+    public static Map<String, String> 
serializeCatalogTable(ResolvedCatalogTable resolvedTable) {
+        try {
+            final Map<String, String> properties = new HashMap<>();
+
+            serializeResolvedSchema(properties, 
resolvedTable.getResolvedSchema());
+
+            properties.put(COMMENT, resolvedTable.getComment());
+
+            serializePartitionKeys(properties, 
resolvedTable.getPartitionKeys());
+
+            properties.putAll(resolvedTable.getOptions());
+
+            properties.remove(IS_GENERIC); // reserved option
+
+            return properties;
+        } catch (Exception e) {
+            throw new CatalogException("Error in serializing catalog table.", 
e);
+        }
+    }
+
+    /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
+    public static CatalogTable deserializeCatalogTable(Map<String, String> 
properties) {
+        try {
+            final Schema schema = deserializeSchema(properties);
+
+            final @Nullable String comment = properties.get(COMMENT);
+
+            final List<String> partitionKeys = 
deserializePartitionKeys(properties);
+
+            final Map<String, String> options = deserializeOptions(properties);
+
+            return CatalogTable.of(schema, comment, partitionKeys, options);
+        } catch (Exception e) {
+            throw new CatalogException("Error in deserializing catalog 
table.", e);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods and constants
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String SCHEMA = "schema";
+
+    private static final String NAME = "name";
+
+    private static final String DATA_TYPE = "data-type";
+
+    private static final String EXPR = "expr";
+
+    private static final String METADATA = "metadata";
+
+    private static final String VIRTUAL = "virtual";
+
+    private static final String PARTITION_KEYS = "partition.keys";
+
+    private static final String WATERMARK = "watermark";
+
+    private static final String WATERMARK_ROWTIME = "rowtime";
+
+    private static final String WATERMARK_STRATEGY = "strategy";
+
+    private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + 
'.' + EXPR;
+
+    private static final String WATERMARK_STRATEGY_DATA_TYPE = 
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+    private static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+    private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+    private static final String COMMENT = "comment";
+
+    private static Map<String, String> deserializeOptions(Map<String, String> 
map) {
+        return map.entrySet().stream()
+                .filter(
+                        e -> {
+                            final String key = e.getKey();
+                            return !key.startsWith(SCHEMA + '.')
+                                    && !key.startsWith(PARTITION_KEYS + '.')
+                                    && !key.equals(COMMENT);
+                        })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    private static List<String> deserializePartitionKeys(Map<String, String> 
map) {
+        final int partitionCount = getCount(map, PARTITION_KEYS, NAME);
+        final List<String> partitionKeys = new ArrayList<>();
+        for (int i = 0; i < partitionCount; i++) {
+            final String partitionNameKey = PARTITION_KEYS + '.' + i + '.' + 
NAME;
+            final String partitionName = getValue(map, partitionNameKey);
+            partitionKeys.add(partitionName);
+        }
+        return partitionKeys;
+    }
+
+    private static Schema deserializeSchema(Map<String, String> map) {
+        final Builder builder = Schema.newBuilder();
+
+        deserializeColumns(map, builder);
+
+        deserializeWatermark(map, builder);
+
+        deserializePrimaryKey(map, builder);
+
+        return builder.build();
+    }
+
+    private static void deserializePrimaryKey(Map<String, String> map, Builder 
builder) {
+        final String constraintNameKey = SCHEMA + '.' + PRIMARY_KEY_NAME;
+        final String columnsKey = SCHEMA + '.' + PRIMARY_KEY_COLUMNS;
+        if (map.containsKey(constraintNameKey)) {
+            final String constraintName = getValue(map, constraintNameKey);
+            final String[] columns = getValue(map, columnsKey, s -> 
s.split(","));
+            builder.primaryKeyNamed(constraintName, columns);
+        }
+    }
+
+    private static void deserializeWatermark(Map<String, String> map, Builder 
builder) {
+        final String watermarkKey = SCHEMA + '.' + WATERMARK;
+        final int watermarkCount = getCount(map, watermarkKey, 
WATERMARK_ROWTIME);
+        for (int i = 0; i < watermarkCount; i++) {
+            final String rowtimeKey = watermarkKey + '.' + i + '.' + 
WATERMARK_ROWTIME;
+            final String exprKey = watermarkKey + '.' + i + '.' + 
WATERMARK_STRATEGY_EXPR;
+
+            final String rowtime = getValue(map, rowtimeKey);
+            final String expr = getValue(map, exprKey);
+            builder.watermark(rowtime, expr);
+        }
+    }
+
+    private static void deserializeColumns(Map<String, String> map, Builder 
builder) {
+        final int fieldCount = getCount(map, SCHEMA, NAME);
+
+        for (int i = 0; i < fieldCount; i++) {
+            final String nameKey = SCHEMA + '.' + i + '.' + NAME;
+            final String dataTypeKey = SCHEMA + '.' + i + '.' + DATA_TYPE;
+            final String exprKey = SCHEMA + '.' + i + '.' + EXPR;
+            final String metadataKey = SCHEMA + '.' + i + '.' + METADATA;
+            final String virtualKey = SCHEMA + '.' + i + '.' + VIRTUAL;
+
+            final String name = getValue(map, nameKey);
+
+            // computed column
+            if (map.containsKey(exprKey)) {
+                final String expr = getValue(map, exprKey);
+                builder.columnByExpression(name, expr);
+            }
+            // metadata column
+            else if (map.containsKey(metadataKey)) {
+                final String metadata = getValue(map, metadataKey);
+                final String dataType = getValue(map, dataTypeKey);
+                final boolean isVirtual = getValue(map, virtualKey, 
Boolean::parseBoolean);
+                if (metadata.equals(name)) {
+                    builder.columnByMetadata(name, dataType, isVirtual);
+                } else {
+                    builder.columnByMetadata(name, dataType, metadata, 
isVirtual);
+                }
+            }
+            // physical column
+            else {
+                final String dataType = getValue(map, dataTypeKey);
+                builder.column(name, dataType);
+            }
+        }
+    }
+
+    private static void serializePartitionKeys(Map<String, String> map, 
List<String> keys) {
+        checkNotNull(keys);
+
+        putIndexedProperties(
+                map,
+                PARTITION_KEYS,
+                Collections.singletonList(NAME),
+                
keys.stream().map(Collections::singletonList).collect(Collectors.toList()));
+    }
+
+    private static void serializeResolvedSchema(Map<String, String> map, 
ResolvedSchema schema) {
+        checkNotNull(schema);
+
+        serializeColumns(map, schema.getColumns());
+
+        serializeWatermarkSpecs(map, schema.getWatermarkSpecs());
+
+        schema.getPrimaryKey().ifPresent(pk -> serializePrimaryKey(map, pk));
+    }
+
+    private static void serializePrimaryKey(Map<String, String> map, 
UniqueConstraint constraint) {
+        map.put(SCHEMA + '.' + PRIMARY_KEY_NAME, constraint.getName());
+        map.put(SCHEMA + '.' + PRIMARY_KEY_COLUMNS, String.join(",", 
constraint.getColumns()));
+    }
+
+    private static void serializeWatermarkSpecs(
+            Map<String, String> map, List<WatermarkSpec> specs) {
+        if (!specs.isEmpty()) {
+            final List<List<String>> watermarkValues = new ArrayList<>();
+            for (WatermarkSpec spec : specs) {
+                watermarkValues.add(
+                        Arrays.asList(
+                                spec.getRowtimeAttribute(),
+                                
serializeResolvedExpression(spec.getWatermarkExpression()),
+                                serializeDataType(
+                                        
spec.getWatermarkExpression().getOutputDataType())));
+            }
+            putIndexedProperties(
+                    map,
+                    SCHEMA + '.' + WATERMARK,
+                    Arrays.asList(
+                            WATERMARK_ROWTIME,
+                            WATERMARK_STRATEGY_EXPR,
+                            WATERMARK_STRATEGY_DATA_TYPE),
+                    watermarkValues);
+        }
+    }
+
+    private static void serializeColumns(Map<String, String> map, List<Column> 
columns) {
+        final String[] names = serializeColumnNames(columns);
+        final String[] dataTypes = serializeColumnDataTypes(columns);
+        final String[] expressions = serializeColumnComputations(columns);
+        final String[] metadata = serializeColumnMetadataKeys(columns);
+        final String[] virtual = serializeColumnVirtuality(columns);
+
+        final List<List<String>> values = new ArrayList<>();
+        for (int i = 0; i < columns.size(); i++) {
+            values.add(
+                    Arrays.asList(names[i], dataTypes[i], expressions[i], 
metadata[i], virtual[i]));
+        }
+
+        putIndexedProperties(
+                map, SCHEMA, Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, 
VIRTUAL), values);
+    }
+
+    private static String serializeResolvedExpression(ResolvedExpression 
resolvedExpression) {
+        try {
+            return resolvedExpression.asSerializableString();
+        } catch (Exception e) {

Review comment:
       Maybe this warrants a more specific exception given that we want to 
deduce a specific error message from it? Same question applies a few lines 
below.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
##########
@@ -16,18 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.config;
+package org.apache.flink.table.catalog;
 
-/** Config for catalog and catalog meta-objects. */
-public class CatalogConfig {
+import org.apache.flink.annotation.Internal;
 
-    /** Flag to distinguish if a meta-object is generic Flink object or not. */
+/** Utilities for de/serializing {@link Catalog} objects into a map of string 
properties. */
+@Internal
+public final class CatalogPropertiesUtil {
+
+    /**
+     * Flag to distinguish if a meta-object is a generic Flink object or not.
+     *
+     * <p>It is used to distinguish between Flink's generic connector 
discovery logic or specialized
+     * catalog connectors.
+     */
     public static final String IS_GENERIC = "is_generic";
 
-    // Globally reserved prefix for catalog properties.
-    // User defined properties should not with this prefix.
-    // Used to distinguish properties created by Hive and Flink,
-    // as Hive metastore has its own properties created upon table creation 
and migration between
-    // different versions of metastore.
+    /**
+     * Globally reserved prefix for catalog properties. User defined 
properties should not with this

Review comment:
       nit: this is pre-existing, but maybe we fix the missing verb in "should 
not with this"?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
##########
@@ -40,6 +65,404 @@
      */
     public static final String FLINK_PROPERTY_PREFIX = "flink.";
 
+    /** Serializes the given {@link ResolvedCatalogTable} into a map of string 
properties. */
+    public static Map<String, String> 
serializeCatalogTable(ResolvedCatalogTable resolvedTable) {
+        try {
+            final Map<String, String> properties = new HashMap<>();
+
+            serializeResolvedSchema(properties, 
resolvedTable.getResolvedSchema());
+
+            properties.put(COMMENT, resolvedTable.getComment());
+
+            serializePartitionKeys(properties, 
resolvedTable.getPartitionKeys());
+
+            properties.putAll(resolvedTable.getOptions());
+
+            properties.remove(IS_GENERIC); // reserved option
+
+            return properties;
+        } catch (Exception e) {
+            throw new CatalogException("Error in serializing catalog table.", 
e);
+        }
+    }
+
+    /** Deserializes the given map of string properties into an unresolved 
{@link CatalogTable}. */
+    public static CatalogTable deserializeCatalogTable(Map<String, String> 
properties) {
+        try {
+            final Schema schema = deserializeSchema(properties);
+
+            final @Nullable String comment = properties.get(COMMENT);
+
+            final List<String> partitionKeys = 
deserializePartitionKeys(properties);
+
+            final Map<String, String> options = deserializeOptions(properties);
+
+            return CatalogTable.of(schema, comment, partitionKeys, options);
+        } catch (Exception e) {
+            throw new CatalogException("Error in deserializing catalog 
table.", e);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods and constants
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String SCHEMA = "schema";
+
+    private static final String NAME = "name";
+
+    private static final String DATA_TYPE = "data-type";
+
+    private static final String EXPR = "expr";
+
+    private static final String METADATA = "metadata";
+
+    private static final String VIRTUAL = "virtual";
+
+    private static final String PARTITION_KEYS = "partition.keys";
+
+    private static final String WATERMARK = "watermark";
+
+    private static final String WATERMARK_ROWTIME = "rowtime";
+
+    private static final String WATERMARK_STRATEGY = "strategy";
+
+    private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + 
'.' + EXPR;
+
+    private static final String WATERMARK_STRATEGY_DATA_TYPE = 
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+    private static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+    private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+    private static final String COMMENT = "comment";
+
+    private static Map<String, String> deserializeOptions(Map<String, String> 
map) {

Review comment:
       I would prefer serializing the options with a specific prefix as well, 
not so much to remove this condition on all prefixes here, but also to rule out 
collisions in the naming.
   
   Maybe it would even make sense to deviate from the `.` separator for this 
prefix and use something "stronger" to denote the type like `$`, i.e. 
`schema$primary-key.name$foo`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to