twalthr commented on a change in pull request #15098:
URL: https://github.com/apache/flink/pull/15098#discussion_r591304471
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
##########
@@ -40,6 +65,404 @@
*/
public static final String FLINK_PROPERTY_PREFIX = "flink.";
+ /** Serializes the given {@link ResolvedCatalogTable} into a map of string
properties. */
+ public static Map<String, String>
serializeCatalogTable(ResolvedCatalogTable resolvedTable) {
+ try {
+ final Map<String, String> properties = new HashMap<>();
+
+ serializeResolvedSchema(properties,
resolvedTable.getResolvedSchema());
+
+ properties.put(COMMENT, resolvedTable.getComment());
+
+ serializePartitionKeys(properties,
resolvedTable.getPartitionKeys());
+
+ properties.putAll(resolvedTable.getOptions());
+
+ properties.remove(IS_GENERIC); // reserved option
+
+ return properties;
+ } catch (Exception e) {
+ throw new CatalogException("Error in serializing catalog table.",
e);
+ }
+ }
+
+ /** Deserializes the given map of string properties into an unresolved
{@link CatalogTable}. */
+ public static CatalogTable deserializeCatalogTable(Map<String, String>
properties) {
+ try {
+ final Schema schema = deserializeSchema(properties);
+
+ final @Nullable String comment = properties.get(COMMENT);
+
+ final List<String> partitionKeys =
deserializePartitionKeys(properties);
+
+ final Map<String, String> options = deserializeOptions(properties);
+
+ return CatalogTable.of(schema, comment, partitionKeys, options);
+ } catch (Exception e) {
+ throw new CatalogException("Error in deserializing catalog
table.", e);
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Helper methods and constants
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String SCHEMA = "schema";
+
+ private static final String NAME = "name";
+
+ private static final String DATA_TYPE = "data-type";
+
+ private static final String EXPR = "expr";
+
+ private static final String METADATA = "metadata";
+
+ private static final String VIRTUAL = "virtual";
+
+ private static final String PARTITION_KEYS = "partition.keys";
+
+ private static final String WATERMARK = "watermark";
+
+ private static final String WATERMARK_ROWTIME = "rowtime";
+
+ private static final String WATERMARK_STRATEGY = "strategy";
+
+ private static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY +
'.' + EXPR;
+
+ private static final String WATERMARK_STRATEGY_DATA_TYPE =
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+ private static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+ private static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+ private static final String COMMENT = "comment";
+
+ private static Map<String, String> deserializeOptions(Map<String, String>
map) {
Review comment:
I totally agree. I don't like the current design because it allows for
collisions. But since these properties are potentially stored in existing
catalogs, I didn't want to touch this here. This can be a separate issue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]