This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8212230414f [FLINK-27196][table] Remove SerializationSchemaFactory, 
DeserializationSchemaFactory and TableFormatFactory
8212230414f is described below

commit 8212230414f0c49a9f5b636bfc5101417e921a66
Author: slinkydeveloper <[email protected]>
AuthorDate: Mon Apr 11 17:50:37 2022 +0200

    [FLINK-27196][table] Remove SerializationSchemaFactory, 
DeserializationSchemaFactory and TableFormatFactory
    
    Signed-off-by: slinkydeveloper <[email protected]>
---
 .../flink/table/descriptors/SchemaValidator.java   |  49 -----
 .../factories/DeserializationSchemaFactory.java    |  45 -----
 .../factories/SerializationSchemaFactory.java      |  45 -----
 .../flink/table/factories/TableFactoryService.java |  23 +--
 .../flink/table/factories/TableFormatFactory.java  |  66 -------
 .../table/factories/TableFormatFactoryBase.java    | 200 ---------------------
 .../factories/TableFormatFactoryBaseTest.java      |  85 ---------
 7 files changed, 2 insertions(+), 511 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
index 6326cca1167..d4c04ac272d 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.factories.TableFormatFactory;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
 import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
@@ -41,20 +40,10 @@ import java.util.Optional;
 
 import static java.lang.String.format;
 import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
 import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
 import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
 import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
 import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
@@ -139,44 +128,6 @@ public class SchemaValidator implements 
DescriptorValidator {
         }
     }
 
-    /**
-     * Returns keys for a {@link TableFormatFactory#supportedProperties()} 
method that are accepted
-     * for schema derivation using {@code 
deriveFormatFields(DescriptorProperties)}.
-     */
-    public static List<String> getSchemaDerivationKeys() {
-        List<String> keys = new ArrayList<>();
-
-        // schema
-        keys.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
-        keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
-        keys.add(SCHEMA + ".#." + SCHEMA_NAME);
-        keys.add(SCHEMA + ".#." + SCHEMA_FROM);
-        // computed column
-        keys.add(SCHEMA + ".#." + EXPR);
-
-        // time attributes
-        keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
-        keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
-        keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
-        keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
-        keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
-        keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
-        keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
-        keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
-        keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
-
-        // watermark
-        keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
-        keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
-        keys.add(SCHEMA + "." + WATERMARK + ".#." + 
WATERMARK_STRATEGY_DATA_TYPE);
-
-        // table constraint
-        keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
-        keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
-
-        return keys;
-    }
-
     /** Finds the proctime attribute if defined. */
     public static Optional<String> 
deriveProctimeAttribute(DescriptorProperties properties) {
         Map<String, String> names = properties.getIndexedProperty(SCHEMA, 
SCHEMA_NAME);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
deleted file mode 100644
index a2e7f739af0..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-
-import java.util.Map;
-
-/**
- * Factory for creating configured instances of {@link DeserializationSchema}.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link 
DeserializationFormatFactory}, used in the
- *     new sink/source stack. See FLIP-95 for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface DeserializationSchemaFactory<T> extends TableFormatFactory<T> 
{
-
-    /**
-     * Creates and configures a {@link DeserializationSchema} using the given 
properties.
-     *
-     * @param properties normalized properties describing the format
-     * @return the configured serialization schema or null if the factory 
cannot provide an instance
-     *     of this class
-     */
-    DeserializationSchema<T> createDeserializationSchema(Map<String, String> 
properties);
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
deleted file mode 100644
index 05151e3e38c..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-import java.util.Map;
-
-/**
- * Factory for creating configured instances of {@link SerializationSchema}.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link 
SerializationFormatFactory}, used in the
- *     new sink/source stack. See FLIP-95 for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface SerializationSchemaFactory<T> extends TableFormatFactory<T> {
-
-    /**
-     * Creates and configures a [[SerializationSchema]] using the given 
properties.
-     *
-     * @param properties normalized properties describing the format
-     * @return the configured serialization schema or null if the factory 
cannot provide an instance
-     *     of this class
-     */
-    SerializationSchema<T> createSerializationSchema(Map<String, String> 
properties);
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
index 50ada7b4102..221c3d7cb6c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -389,12 +388,10 @@ public class TableFactoryService {
                     plainGivenKeys.stream()
                             .filter(p -> !requiredContextKeys.contains(p))
                             .collect(Collectors.toList());
-            List<String> givenFilteredKeys =
-                    filterSupportedPropertiesFactorySpecific(factory, 
givenContextFreeKeys);
 
             boolean allTrue = true;
             List<String> unsupportedKeys = new ArrayList<>();
-            for (String k : givenFilteredKeys) {
+            for (String k : givenContextFreeKeys) {
                 if (!(tuple2.f0.contains(k) || 
tuple2.f1.stream().anyMatch(k::startsWith))) {
                     allTrue = false;
                     unsupportedKeys.add(k);
@@ -462,22 +459,6 @@ public class TableFactoryService {
      */
     private static List<String> filterSupportedPropertiesFactorySpecific(
             TableFactory factory, List<String> keys) {
-
-        if (factory instanceof TableFormatFactory) {
-            boolean includeSchema = ((TableFormatFactory) 
factory).supportsSchemaDerivation();
-            return keys.stream()
-                    .filter(
-                            k -> {
-                                if (includeSchema) {
-                                    return k.startsWith(Schema.SCHEMA + ".")
-                                            || k.startsWith(FORMAT + ".");
-                                } else {
-                                    return k.startsWith(FORMAT + ".");
-                                }
-                            })
-                    .collect(Collectors.toList());
-        } else {
-            return keys;
-        }
+        return keys;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
deleted file mode 100644
index 99bbff64d47..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.util.List;
-
-/**
- * A factory to create configured table format instances based on string-based 
properties. See also
- * {@link TableFactory} for more information.
- *
- * @see DeserializationSchemaFactory
- * @see SerializationSchemaFactory
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link 
EncodingFormatFactory} and {@link
- *     DecodingFormatFactory}, used in the new sink/source stack. See FLIP-95 
for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface TableFormatFactory<T> extends TableFactory {
-
-    /**
-     * Flag to indicate if the given format supports deriving information from 
a schema. If the
-     * format can handle schema information, those properties must be added to 
the list of supported
-     * properties.
-     */
-    boolean supportsSchemaDerivation();
-
-    /**
-     * List of format property keys that this factory can handle. This method 
will be used for
-     * validation. If a property is passed that this factory cannot handle, an 
exception will be
-     * thrown. The list must not contain the keys that are specified by the 
context.
-     *
-     * <p>Example format properties might be: - format.line-delimiter - 
format.ignore-parse-errors -
-     * format.fields.#.type - format.fields.#.name
-     *
-     * <p>If schema derivation is enabled, the list must include schema 
properties: - schema.#.name
-     * - schema.#.type
-     *
-     * <p>Note: All supported format properties must be prefixed with 
"format.". If schema
-     * derivation is enabled, also properties with "schema." prefix can be 
used.
-     *
-     * <p>Use "#" to denote an array of values where "#" represents one or 
more digits. Property
-     * versions like "format.property-version" must not be part of the 
supported properties.
-     *
-     * <p>See also {@link TableFactory#supportedProperties()} for more 
information.
-     */
-    List<String> supportedProperties();
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
deleted file mode 100644
index 8fe4d514f76..00000000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.types.DataType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-
-/**
- * Base class for {@link TableFormatFactory}s.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This base class is not required anymore, implement either {@link
- *     DynamicTableSourceFactory} or {@link DynamicTableSinkFactory} directly. 
See FLIP-95 for more
- *     information.
- */
-@Deprecated
-@PublicEvolving
-public abstract class TableFormatFactoryBase<T> implements 
TableFormatFactory<T> {
-
-    // Constants for schema derivation
-    // TODO drop constants once SchemaValidator has been ported to 
flink-table-common
-    private static final String SCHEMA = "schema";
-    private static final String SCHEMA_NAME = "name";
-    private static final String SCHEMA_DATA_TYPE = "data-type";
-    /**
-     * @deprecated {@link #SCHEMA_TYPE} will be removed in future version as 
it uses old type
-     *     system. Please use {@link #SCHEMA_DATA_TYPE} instead.
-     */
-    @Deprecated private static final String SCHEMA_TYPE = "type";
-
-    private static final String SCHEMA_PROCTIME = "proctime";
-    private static final String SCHEMA_FROM = "from";
-    private static final String ROWTIME_TIMESTAMPS_TYPE = 
"rowtime.timestamps.type";
-    private static final String ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = 
"from-field";
-    private static final String ROWTIME_TIMESTAMPS_FROM = 
"rowtime.timestamps.from";
-    private static final String ROWTIME_TIMESTAMPS_CLASS = 
"rowtime.timestamps.class";
-    private static final String ROWTIME_TIMESTAMPS_SERIALIZED = 
"rowtime.timestamps.serialized";
-    private static final String ROWTIME_WATERMARKS_TYPE = 
"rowtime.watermarks.type";
-    private static final String ROWTIME_WATERMARKS_CLASS = 
"rowtime.watermarks.class";
-    private static final String ROWTIME_WATERMARKS_SERIALIZED = 
"rowtime.watermarks.serialized";
-    private static final String ROWTIME_WATERMARKS_DELAY = 
"rowtime.watermarks.delay";
-
-    private String type;
-
-    private String version;
-
-    private boolean supportsSchemaDerivation;
-
-    public TableFormatFactoryBase(String type, int version, boolean 
supportsSchemaDerivation) {
-        this.type = type;
-        this.version = Integer.toString(version);
-        this.supportsSchemaDerivation = supportsSchemaDerivation;
-    }
-
-    @Override
-    public final Map<String, String> requiredContext() {
-        final Map<String, String> context = new HashMap<>();
-        context.put(TableFactoryService.FORMAT_TYPE, type);
-        context.put(TableFactoryService.FORMAT_PROPERTY_VERSION, version);
-        context.putAll(requiredFormatContext());
-        return context;
-    }
-
-    @Override
-    public final boolean supportsSchemaDerivation() {
-        return supportsSchemaDerivation;
-    }
-
-    @Override
-    public final List<String> supportedProperties() {
-        final List<String> properties = new ArrayList<>();
-        if (supportsSchemaDerivation) {
-            properties.add(TableFactoryService.FORMAT_DERIVE_SCHEMA);
-            // schema
-            properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
-            properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
-            properties.add(SCHEMA + ".#." + SCHEMA_NAME);
-            properties.add(SCHEMA + ".#." + SCHEMA_FROM);
-            // computed column
-            properties.add(SCHEMA + ".#." + EXPR);
-            // time attributes
-            properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
-            properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
-            properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
-            properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
-            properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
-            properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
-            properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
-            properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
-            properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
-            // watermark
-            properties.add(SCHEMA + "." + WATERMARK + ".#." + 
WATERMARK_ROWTIME);
-            properties.add(SCHEMA + "." + WATERMARK + ".#." + 
WATERMARK_STRATEGY_EXPR);
-            properties.add(SCHEMA + "." + WATERMARK + ".#." + 
WATERMARK_STRATEGY_DATA_TYPE);
-            // table constraint
-            properties.add(SCHEMA + "." + 
DescriptorProperties.PRIMARY_KEY_NAME);
-            properties.add(SCHEMA + "." + 
DescriptorProperties.PRIMARY_KEY_COLUMNS);
-        }
-        properties.addAll(supportedFormatProperties());
-        return properties;
-    }
-
-    /**
-     * Format specific context.
-     *
-     * <p>This method can be used if format type and a property version is not 
enough.
-     */
-    protected Map<String, String> requiredFormatContext() {
-        return Collections.emptyMap();
-    }
-
-    /**
-     * Format specific supported properties.
-     *
-     * <p>This method can be used if schema derivation is not enough.
-     */
-    protected List<String> supportedFormatProperties() {
-        return Collections.emptyList();
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-
-    /**
-     * Finds the table schema that can be used for a format schema (without 
time attributes and
-     * generated columns).
-     */
-    public static TableSchema deriveSchema(Map<String, String> properties) {
-        final DescriptorProperties descriptorProperties = new 
DescriptorProperties();
-        descriptorProperties.putProperties(properties);
-
-        final TableSchema.Builder builder = TableSchema.builder();
-
-        final TableSchema tableSchema = 
descriptorProperties.getTableSchema(SCHEMA);
-        for (int i = 0; i < tableSchema.getFieldCount(); i++) {
-            final TableColumn tableColumn = 
tableSchema.getTableColumns().get(i);
-            final String fieldName = tableColumn.getName();
-            final DataType dataType = tableColumn.getType();
-            if (!tableColumn.isPhysical()) {
-                // skip non-physical columns
-                continue;
-            }
-            final boolean isProctime =
-                    descriptorProperties
-                            .getOptionalBoolean(SCHEMA + '.' + i + '.' + 
SCHEMA_PROCTIME)
-                            .orElse(false);
-            final String timestampKey = SCHEMA + '.' + i + '.' + 
ROWTIME_TIMESTAMPS_TYPE;
-            final boolean isRowtime = 
descriptorProperties.containsKey(timestampKey);
-            if (!isProctime && !isRowtime) {
-                // check for aliasing
-                final String aliasName =
-                        descriptorProperties
-                                .getOptionalString(SCHEMA + '.' + i + '.' + 
SCHEMA_FROM)
-                                .orElse(fieldName);
-                builder.field(aliasName, dataType);
-            }
-            // only use the rowtime attribute if it references a field
-            else if (isRowtime
-                    && descriptorProperties.isValue(
-                            timestampKey, 
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)) {
-                final String aliasName =
-                        descriptorProperties.getString(
-                                SCHEMA + '.' + i + '.' + 
ROWTIME_TIMESTAMPS_FROM);
-                builder.field(aliasName, dataType);
-            }
-        }
-
-        return builder.build();
-    }
-}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
deleted file mode 100644
index 609b7e17941..00000000000
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.TableSchema;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link TableFormatFactoryBase}. */
-public class TableFormatFactoryBaseTest {
-
-    @Test
-    public void testSchemaDerivation() {
-        final Map<String, String> properties = new HashMap<>();
-        properties.put("schema.0.name", "otherField");
-        properties.put("schema.0.type", "VARCHAR");
-        properties.put("schema.0.from", "csvField");
-        properties.put("schema.1.name", "abcField");
-        properties.put("schema.1.type", "VARCHAR");
-        properties.put("schema.2.name", "p");
-        properties.put("schema.2.type", "TIMESTAMP");
-        properties.put("schema.2.proctime", "true");
-        properties.put("schema.3.name", "r");
-        properties.put("schema.3.type", "TIMESTAMP");
-        properties.put("schema.3.rowtime.timestamps.type", "from-source");
-        properties.put("schema.3.rowtime.watermarks.type", "from-source");
-
-        final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
-        final TableSchema expectedSchema =
-                TableSchema.builder()
-                        .field("csvField", Types.STRING) // aliased
-                        .field("abcField", Types.STRING)
-                        .build();
-        assertThat(actualSchema).isEqualTo(expectedSchema);
-    }
-
-    @Test
-    public void testSchemaDerivationWithRowtime() {
-        final Map<String, String> properties = new HashMap<>();
-        properties.put("schema.0.name", "otherField");
-        properties.put("schema.0.type", "VARCHAR");
-        properties.put("schema.0.from", "csvField");
-        properties.put("schema.1.name", "abcField");
-        properties.put("schema.1.type", "VARCHAR");
-        properties.put("schema.2.name", "p");
-        properties.put("schema.2.type", "TIMESTAMP");
-        properties.put("schema.2.proctime", "true");
-        properties.put("schema.3.name", "r");
-        properties.put("schema.3.type", "TIMESTAMP");
-        properties.put("schema.3.rowtime.timestamps.type", "from-field"); // 
from-field strategy
-        properties.put("schema.3.rowtime.timestamps.from", "myTime");
-        properties.put("schema.3.rowtime.watermarks.type", "from-source");
-
-        final TableSchema actualSchema = 
TableFormatFactoryBase.deriveSchema(properties);
-        final TableSchema expectedSchema =
-                TableSchema.builder()
-                        .field("csvField", Types.STRING) // aliased
-                        .field("abcField", Types.STRING)
-                        .field("myTime", Types.SQL_TIMESTAMP)
-                        .build();
-        assertThat(actualSchema).isEqualTo(expectedSchema);
-    }
-}

Reply via email to