This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8212230414f [FLINK-27196][table] Remove SerializationSchemaFactory,
DeserializationSchemaFactory and TableFormatFactory
8212230414f is described below
commit 8212230414f0c49a9f5b636bfc5101417e921a66
Author: slinkydeveloper <[email protected]>
AuthorDate: Mon Apr 11 17:50:37 2022 +0200
[FLINK-27196][table] Remove SerializationSchemaFactory,
DeserializationSchemaFactory and TableFormatFactory
Signed-off-by: slinkydeveloper <[email protected]>
---
.../flink/table/descriptors/SchemaValidator.java | 49 -----
.../factories/DeserializationSchemaFactory.java | 45 -----
.../factories/SerializationSchemaFactory.java | 45 -----
.../flink/table/factories/TableFactoryService.java | 23 +--
.../flink/table/factories/TableFormatFactory.java | 66 -------
.../table/factories/TableFormatFactoryBase.java | 200 ---------------------
.../factories/TableFormatFactoryBaseTest.java | 85 ---------
7 files changed, 2 insertions(+), 511 deletions(-)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
index 6326cca1167..d4c04ac272d 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.factories.TableFormatFactory;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
@@ -41,20 +40,10 @@ import java.util.Optional;
import static java.lang.String.format;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
@@ -139,44 +128,6 @@ public class SchemaValidator implements
DescriptorValidator {
}
}
- /**
- * Returns keys for a {@link TableFormatFactory#supportedProperties()}
method that are accepted
- * for schema derivation using {@code
deriveFormatFields(DescriptorProperties)}.
- */
- public static List<String> getSchemaDerivationKeys() {
- List<String> keys = new ArrayList<>();
-
- // schema
- keys.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
- keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
- keys.add(SCHEMA + ".#." + SCHEMA_NAME);
- keys.add(SCHEMA + ".#." + SCHEMA_FROM);
- // computed column
- keys.add(SCHEMA + ".#." + EXPR);
-
- // time attributes
- keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
-
- // watermark
- keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
- keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
- keys.add(SCHEMA + "." + WATERMARK + ".#." +
WATERMARK_STRATEGY_DATA_TYPE);
-
- // table constraint
- keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
- keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
-
- return keys;
- }
-
/** Finds the proctime attribute if defined. */
public static Optional<String>
deriveProctimeAttribute(DescriptorProperties properties) {
Map<String, String> names = properties.getIndexedProperty(SCHEMA,
SCHEMA_NAME);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
deleted file mode 100644
index a2e7f739af0..00000000000
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-
-import java.util.Map;
-
-/**
- * Factory for creating configured instances of {@link DeserializationSchema}.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link
DeserializationFormatFactory}, used in the
- * new sink/source stack. See FLIP-95 for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface DeserializationSchemaFactory<T> extends TableFormatFactory<T>
{
-
- /**
- * Creates and configures a {@link DeserializationSchema} using the given
properties.
- *
- * @param properties normalized properties describing the format
- * @return the configured serialization schema or null if the factory
cannot provide an instance
- * of this class
- */
- DeserializationSchema<T> createDeserializationSchema(Map<String, String>
properties);
-}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
deleted file mode 100644
index 05151e3e38c..00000000000
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationSchemaFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-import java.util.Map;
-
-/**
- * Factory for creating configured instances of {@link SerializationSchema}.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link
SerializationFormatFactory}, used in the
- * new sink/source stack. See FLIP-95 for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface SerializationSchemaFactory<T> extends TableFormatFactory<T> {
-
- /**
- * Creates and configures a [[SerializationSchema]] using the given
properties.
- *
- * @param properties normalized properties describing the format
- * @return the configured serialization schema or null if the factory
cannot provide an instance
- * of this class
- */
- SerializationSchema<T> createSerializationSchema(Map<String, String>
properties);
-}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
index 50ada7b4102..221c3d7cb6c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -389,12 +388,10 @@ public class TableFactoryService {
plainGivenKeys.stream()
.filter(p -> !requiredContextKeys.contains(p))
.collect(Collectors.toList());
- List<String> givenFilteredKeys =
- filterSupportedPropertiesFactorySpecific(factory,
givenContextFreeKeys);
boolean allTrue = true;
List<String> unsupportedKeys = new ArrayList<>();
- for (String k : givenFilteredKeys) {
+ for (String k : givenContextFreeKeys) {
if (!(tuple2.f0.contains(k) ||
tuple2.f1.stream().anyMatch(k::startsWith))) {
allTrue = false;
unsupportedKeys.add(k);
@@ -462,22 +459,6 @@ public class TableFactoryService {
*/
private static List<String> filterSupportedPropertiesFactorySpecific(
TableFactory factory, List<String> keys) {
-
- if (factory instanceof TableFormatFactory) {
- boolean includeSchema = ((TableFormatFactory)
factory).supportsSchemaDerivation();
- return keys.stream()
- .filter(
- k -> {
- if (includeSchema) {
- return k.startsWith(Schema.SCHEMA + ".")
- || k.startsWith(FORMAT + ".");
- } else {
- return k.startsWith(FORMAT + ".");
- }
- })
- .collect(Collectors.toList());
- } else {
- return keys;
- }
+ return keys;
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
deleted file mode 100644
index 99bbff64d47..00000000000
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.util.List;
-
-/**
- * A factory to create configured table format instances based on string-based
properties. See also
- * {@link TableFactory} for more information.
- *
- * @see DeserializationSchemaFactory
- * @see SerializationSchemaFactory
- * @param <T> record type that the format produces or consumes.
- * @deprecated This interface has been replaced by {@link
EncodingFormatFactory} and {@link
- * DecodingFormatFactory}, used in the new sink/source stack. See FLIP-95
for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface TableFormatFactory<T> extends TableFactory {
-
- /**
- * Flag to indicate if the given format supports deriving information from
a schema. If the
- * format can handle schema information, those properties must be added to
the list of supported
- * properties.
- */
- boolean supportsSchemaDerivation();
-
- /**
- * List of format property keys that this factory can handle. This method
will be used for
- * validation. If a property is passed that this factory cannot handle, an
exception will be
- * thrown. The list must not contain the keys that are specified by the
context.
- *
- * <p>Example format properties might be: - format.line-delimiter -
format.ignore-parse-errors -
- * format.fields.#.type - format.fields.#.name
- *
- * <p>If schema derivation is enabled, the list must include schema
properties: - schema.#.name
- * - schema.#.type
- *
- * <p>Note: All supported format properties must be prefixed with
"format.". If schema
- * derivation is enabled, also properties with "schema." prefix can be
used.
- *
- * <p>Use "#" to denote an array of values where "#" represents one or
more digits. Property
- * versions like "format.property-version" must not be part of the
supported properties.
- *
- * <p>See also {@link TableFactory#supportedProperties()} for more
information.
- */
- List<String> supportedProperties();
-}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
deleted file mode 100644
index 8fe4d514f76..00000000000
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.types.DataType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-
-/**
- * Base class for {@link TableFormatFactory}s.
- *
- * @param <T> record type that the format produces or consumes.
- * @deprecated This base class is not required anymore, implement either {@link
- * DynamicTableSourceFactory} or {@link DynamicTableSinkFactory} directly.
See FLIP-95 for more
- * information.
- */
-@Deprecated
-@PublicEvolving
-public abstract class TableFormatFactoryBase<T> implements
TableFormatFactory<T> {
-
- // Constants for schema derivation
- // TODO drop constants once SchemaValidator has been ported to
flink-table-common
- private static final String SCHEMA = "schema";
- private static final String SCHEMA_NAME = "name";
- private static final String SCHEMA_DATA_TYPE = "data-type";
- /**
- * @deprecated {@link #SCHEMA_TYPE} will be removed in future version as
it uses old type
- * system. Please use {@link #SCHEMA_DATA_TYPE} instead.
- */
- @Deprecated private static final String SCHEMA_TYPE = "type";
-
- private static final String SCHEMA_PROCTIME = "proctime";
- private static final String SCHEMA_FROM = "from";
- private static final String ROWTIME_TIMESTAMPS_TYPE =
"rowtime.timestamps.type";
- private static final String ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =
"from-field";
- private static final String ROWTIME_TIMESTAMPS_FROM =
"rowtime.timestamps.from";
- private static final String ROWTIME_TIMESTAMPS_CLASS =
"rowtime.timestamps.class";
- private static final String ROWTIME_TIMESTAMPS_SERIALIZED =
"rowtime.timestamps.serialized";
- private static final String ROWTIME_WATERMARKS_TYPE =
"rowtime.watermarks.type";
- private static final String ROWTIME_WATERMARKS_CLASS =
"rowtime.watermarks.class";
- private static final String ROWTIME_WATERMARKS_SERIALIZED =
"rowtime.watermarks.serialized";
- private static final String ROWTIME_WATERMARKS_DELAY =
"rowtime.watermarks.delay";
-
- private String type;
-
- private String version;
-
- private boolean supportsSchemaDerivation;
-
- public TableFormatFactoryBase(String type, int version, boolean
supportsSchemaDerivation) {
- this.type = type;
- this.version = Integer.toString(version);
- this.supportsSchemaDerivation = supportsSchemaDerivation;
- }
-
- @Override
- public final Map<String, String> requiredContext() {
- final Map<String, String> context = new HashMap<>();
- context.put(TableFactoryService.FORMAT_TYPE, type);
- context.put(TableFactoryService.FORMAT_PROPERTY_VERSION, version);
- context.putAll(requiredFormatContext());
- return context;
- }
-
- @Override
- public final boolean supportsSchemaDerivation() {
- return supportsSchemaDerivation;
- }
-
- @Override
- public final List<String> supportedProperties() {
- final List<String> properties = new ArrayList<>();
- if (supportsSchemaDerivation) {
- properties.add(TableFactoryService.FORMAT_DERIVE_SCHEMA);
- // schema
- properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
- properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
- properties.add(SCHEMA + ".#." + SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + SCHEMA_FROM);
- // computed column
- properties.add(SCHEMA + ".#." + EXPR);
- // time attributes
- properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
- // watermark
- properties.add(SCHEMA + "." + WATERMARK + ".#." +
WATERMARK_ROWTIME);
- properties.add(SCHEMA + "." + WATERMARK + ".#." +
WATERMARK_STRATEGY_EXPR);
- properties.add(SCHEMA + "." + WATERMARK + ".#." +
WATERMARK_STRATEGY_DATA_TYPE);
- // table constraint
- properties.add(SCHEMA + "." +
DescriptorProperties.PRIMARY_KEY_NAME);
- properties.add(SCHEMA + "." +
DescriptorProperties.PRIMARY_KEY_COLUMNS);
- }
- properties.addAll(supportedFormatProperties());
- return properties;
- }
-
- /**
- * Format specific context.
- *
- * <p>This method can be used if format type and a property version is not
enough.
- */
- protected Map<String, String> requiredFormatContext() {
- return Collections.emptyMap();
- }
-
- /**
- * Format specific supported properties.
- *
- * <p>This method can be used if schema derivation is not enough.
- */
- protected List<String> supportedFormatProperties() {
- return Collections.emptyList();
- }
-
- //
--------------------------------------------------------------------------------------------
-
- /**
- * Finds the table schema that can be used for a format schema (without
time attributes and
- * generated columns).
- */
- public static TableSchema deriveSchema(Map<String, String> properties) {
- final DescriptorProperties descriptorProperties = new
DescriptorProperties();
- descriptorProperties.putProperties(properties);
-
- final TableSchema.Builder builder = TableSchema.builder();
-
- final TableSchema tableSchema =
descriptorProperties.getTableSchema(SCHEMA);
- for (int i = 0; i < tableSchema.getFieldCount(); i++) {
- final TableColumn tableColumn =
tableSchema.getTableColumns().get(i);
- final String fieldName = tableColumn.getName();
- final DataType dataType = tableColumn.getType();
- if (!tableColumn.isPhysical()) {
- // skip non-physical columns
- continue;
- }
- final boolean isProctime =
- descriptorProperties
- .getOptionalBoolean(SCHEMA + '.' + i + '.' +
SCHEMA_PROCTIME)
- .orElse(false);
- final String timestampKey = SCHEMA + '.' + i + '.' +
ROWTIME_TIMESTAMPS_TYPE;
- final boolean isRowtime =
descriptorProperties.containsKey(timestampKey);
- if (!isProctime && !isRowtime) {
- // check for aliasing
- final String aliasName =
- descriptorProperties
- .getOptionalString(SCHEMA + '.' + i + '.' +
SCHEMA_FROM)
- .orElse(fieldName);
- builder.field(aliasName, dataType);
- }
- // only use the rowtime attribute if it references a field
- else if (isRowtime
- && descriptorProperties.isValue(
- timestampKey,
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)) {
- final String aliasName =
- descriptorProperties.getString(
- SCHEMA + '.' + i + '.' +
ROWTIME_TIMESTAMPS_FROM);
- builder.field(aliasName, dataType);
- }
- }
-
- return builder.build();
- }
-}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
deleted file mode 100644
index 609b7e17941..00000000000
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TableFormatFactoryBaseTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.TableSchema;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link TableFormatFactoryBase}. */
-public class TableFormatFactoryBaseTest {
-
- @Test
- public void testSchemaDerivation() {
- final Map<String, String> properties = new HashMap<>();
- properties.put("schema.0.name", "otherField");
- properties.put("schema.0.type", "VARCHAR");
- properties.put("schema.0.from", "csvField");
- properties.put("schema.1.name", "abcField");
- properties.put("schema.1.type", "VARCHAR");
- properties.put("schema.2.name", "p");
- properties.put("schema.2.type", "TIMESTAMP");
- properties.put("schema.2.proctime", "true");
- properties.put("schema.3.name", "r");
- properties.put("schema.3.type", "TIMESTAMP");
- properties.put("schema.3.rowtime.timestamps.type", "from-source");
- properties.put("schema.3.rowtime.watermarks.type", "from-source");
-
- final TableSchema actualSchema =
TableFormatFactoryBase.deriveSchema(properties);
- final TableSchema expectedSchema =
- TableSchema.builder()
- .field("csvField", Types.STRING) // aliased
- .field("abcField", Types.STRING)
- .build();
- assertThat(actualSchema).isEqualTo(expectedSchema);
- }
-
- @Test
- public void testSchemaDerivationWithRowtime() {
- final Map<String, String> properties = new HashMap<>();
- properties.put("schema.0.name", "otherField");
- properties.put("schema.0.type", "VARCHAR");
- properties.put("schema.0.from", "csvField");
- properties.put("schema.1.name", "abcField");
- properties.put("schema.1.type", "VARCHAR");
- properties.put("schema.2.name", "p");
- properties.put("schema.2.type", "TIMESTAMP");
- properties.put("schema.2.proctime", "true");
- properties.put("schema.3.name", "r");
- properties.put("schema.3.type", "TIMESTAMP");
- properties.put("schema.3.rowtime.timestamps.type", "from-field"); //
from-field strategy
- properties.put("schema.3.rowtime.timestamps.from", "myTime");
- properties.put("schema.3.rowtime.watermarks.type", "from-source");
-
- final TableSchema actualSchema =
TableFormatFactoryBase.deriveSchema(properties);
- final TableSchema expectedSchema =
- TableSchema.builder()
- .field("csvField", Types.STRING) // aliased
- .field("abcField", Types.STRING)
- .field("myTime", Types.SQL_TIMESTAMP)
- .build();
- assertThat(actualSchema).isEqualTo(expectedSchema);
- }
-}