This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 73210cc [FLINK-18487][table] Datagen and Blackhole factory omits
unrecognized properties silently
73210cc is described below
commit 73210cc0f712158ec939ef3ad7dec52a921aad7c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jul 24 10:07:17 2020 +0800
[FLINK-18487][table] Datagen and Blackhole factory omits unrecognized
properties silently
This closes #12864
---
.../table/factories/BlackHoleTableSinkFactory.java | 3 +
.../table/factories/DataGenTableSourceFactory.java | 238 ++++++++++++++-------
.../table/factories/BlackHoleSinkFactoryTest.java | 27 ++-
.../factories/DataGenTableSourceFactoryTest.java | 92 +++++++-
.../apache/flink/table/factories/FactoryUtil.java | 81 ++++---
5 files changed, 326 insertions(+), 115 deletions(-)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
index e70e73e..fbd33d9 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
@@ -29,6 +29,8 @@ import org.apache.flink.types.RowKind;
import java.util.HashSet;
import java.util.Set;
+import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+
/**
* Black hole table sink factory swallowing all input records. It is designed
for:
* - high performance testing.
@@ -57,6 +59,7 @@ public class BlackHoleTableSinkFactory implements
DynamicTableSinkFactory {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ createTableFactoryHelper(this, context).validate();
return new BlackHoleSink();
}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 8f80946..4a6c3a8 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -45,10 +45,12 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
/**
* Factory for creating configured instances of {@link DataGenTableSource} in
a stream environment.
@@ -58,6 +60,7 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
public static final String IDENTIFIER = "datagen";
public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
+ public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
public static final ConfigOption<Long> ROWS_PER_SECOND =
key("rows-per-second")
.longType()
@@ -97,125 +100,171 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
Configuration options = new Configuration();
context.getCatalogTable().getOptions().forEach(options::setString);
- TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ DataGenerator[] fieldGenerators = new
DataGenerator[schema.getFieldCount()];
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
- DataGenerator[] fieldGenerators = new
DataGenerator[tableSchema.getFieldCount()];
for (int i = 0; i < fieldGenerators.length; i++) {
- fieldGenerators[i] = createDataGenerator(
- tableSchema.getFieldName(i).get(),
- tableSchema.getFieldDataType(i).get(),
- options);
+ String name = schema.getFieldNames()[i];
+ DataType type = schema.getFieldDataTypes()[i];
+
+ ConfigOption<String> kind = key(FIELDS + "." + name +
"." + KIND)
+ .stringType().defaultValue(RANDOM);
+ DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
+ fieldGenerators[i] = container.generator;
+
+ optionalOptions.add(kind);
+ optionalOptions.addAll(container.options);
}
- return new DataGenTableSource(fieldGenerators, tableSchema,
options.get(ROWS_PER_SECOND));
+ FactoryUtil.validateFactoryOptions(requiredOptions(),
optionalOptions, options);
+
+ Set<String> consumedOptionKeys = new HashSet<>();
+ consumedOptionKeys.add(CONNECTOR.key());
+ consumedOptionKeys.add(ROWS_PER_SECOND.key());
+
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
+ FactoryUtil.validateUnconsumedKeys(factoryIdentifier(),
options.keySet(), consumedOptionKeys);
+
+ return new DataGenTableSource(fieldGenerators, schema,
options.get(ROWS_PER_SECOND));
}
- private DataGenerator createDataGenerator(String name, DataType type,
ReadableConfig options) {
- String genType = options.get(
- key(FIELDS + "." + name + "." +
KIND).stringType().defaultValue(RANDOM));
- switch (genType) {
+ private DataGeneratorContainer createContainer(
+ String name, DataType type, String kind, ReadableConfig
options) {
+ switch (kind) {
case RANDOM:
- return createRandomGenerator(name, type,
options);
+ return createRandomContainer(name, type,
options);
case SEQUENCE:
- return createSequenceGenerator(name, type,
options);
+ return createSequenceContainer(name, type,
options);
default:
- throw new ValidationException("Unsupported
generator type: " + genType);
+ throw new ValidationException("Unsupported
generator kind: " + kind);
}
}
- private DataGenerator createRandomGenerator(String name, DataType type,
ReadableConfig options) {
- ConfigOption<Integer> lenKey = key(FIELDS + "." + name + "." +
LENGTH)
- .intType().defaultValue(100);
+ private DataGeneratorContainer createRandomContainer(String name,
DataType type, ReadableConfig config) {
OptionBuilder minKey = key(FIELDS + "." + name + "." + MIN);
OptionBuilder maxKey = key(FIELDS + "." + name + "." + MAX);
switch (type.getLogicalType().getTypeRoot()) {
- case BOOLEAN:
- return RandomGenerator.booleanGenerator();
+ case BOOLEAN: {
+ return
DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+ }
case CHAR:
- case VARCHAR:
- int length = options.get(lenKey);
- return getRandomStringGenerator(length);
- case TINYINT:
- return RandomGenerator.byteGenerator(
-
options.get(minKey.intType().defaultValue((int) Byte.MIN_VALUE)).byteValue(),
-
options.get(maxKey.intType().defaultValue((int) Byte.MAX_VALUE)).byteValue());
- case SMALLINT:
- return RandomGenerator.shortGenerator(
-
options.get(minKey.intType().defaultValue((int) Short.MIN_VALUE)).shortValue(),
-
options.get(maxKey.intType().defaultValue((int) Short.MAX_VALUE)).shortValue());
- case INTEGER:
- return RandomGenerator.intGenerator(
-
options.get(minKey.intType().defaultValue(Integer.MIN_VALUE)),
-
options.get(maxKey.intType().defaultValue(Integer.MAX_VALUE)));
- case BIGINT:
- return RandomGenerator.longGenerator(
-
options.get(minKey.longType().defaultValue(Long.MIN_VALUE)),
-
options.get(maxKey.longType().defaultValue(Long.MAX_VALUE)));
- case FLOAT:
- return RandomGenerator.floatGenerator(
-
options.get(minKey.floatType().defaultValue(Float.MIN_VALUE)),
-
options.get(maxKey.floatType().defaultValue(Float.MAX_VALUE)));
- case DOUBLE:
- return RandomGenerator.doubleGenerator(
-
options.get(minKey.doubleType().defaultValue(Double.MIN_VALUE)),
-
options.get(maxKey.doubleType().defaultValue(Double.MAX_VALUE)));
+ case VARCHAR: {
+ ConfigOption<Integer> lenOption = key(FIELDS +
"." + name + "." + LENGTH)
+ .intType()
+
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+ return
DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)),
lenOption);
+ }
+ case TINYINT: {
+ ConfigOption<Integer> min =
minKey.intType().defaultValue((int) Byte.MIN_VALUE);
+ ConfigOption<Integer> max =
maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.byteGenerator(
+
config.get(min).byteValue(), config.get(max).byteValue()),
+ min, max);
+ }
+ case SMALLINT: {
+ ConfigOption<Integer> min =
minKey.intType().defaultValue((int) Short.MIN_VALUE);
+ ConfigOption<Integer> max =
maxKey.intType().defaultValue((int) Short.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.shortGenerator(
+
config.get(min).shortValue(),
+
config.get(max).shortValue()),
+ min, max);
+ }
+ case INTEGER: {
+ ConfigOption<Integer> min =
minKey.intType().defaultValue(Integer.MIN_VALUE);
+ ConfigOption<Integer> max =
maxKey.intType().defaultValue(Integer.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.intGenerator(
+
config.get(min), config.get(max)),
+ min, max);
+ }
+ case BIGINT: {
+ ConfigOption<Long> min =
minKey.longType().defaultValue(Long.MIN_VALUE);
+ ConfigOption<Long> max =
maxKey.longType().defaultValue(Long.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.longGenerator(
+
config.get(min), config.get(max)),
+ min, max);
+ }
+ case FLOAT: {
+ ConfigOption<Float> min =
minKey.floatType().defaultValue(Float.MIN_VALUE);
+ ConfigOption<Float> max =
maxKey.floatType().defaultValue(Float.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.floatGenerator(
+
config.get(min), config.get(max)),
+ min, max);
+ }
+ case DOUBLE: {
+ ConfigOption<Double> min =
minKey.doubleType().defaultValue(Double.MIN_VALUE);
+ ConfigOption<Double> max =
maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+ return DataGeneratorContainer.of(
+ RandomGenerator.doubleGenerator(
+
config.get(min), config.get(max)),
+ min, max);
+ }
default:
throw new ValidationException("Unsupported
type: " + type);
}
}
- private static RandomGenerator<StringData> getRandomStringGenerator(int
length) {
- return new RandomGenerator<StringData>() {
- @Override
- public StringData next() {
- return
StringData.fromString(random.nextHexString(length));
- }
- };
- }
-
- private DataGenerator createSequenceGenerator(String name, DataType
type, ReadableConfig options) {
+ private DataGeneratorContainer createSequenceContainer(String name,
DataType type, ReadableConfig config) {
String startKeyStr = FIELDS + "." + name + "." + START;
String endKeyStr = FIELDS + "." + name + "." + END;
OptionBuilder startKey = key(startKeyStr);
OptionBuilder endKey = key(endKeyStr);
-
options.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
+
config.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
() -> new ValidationException(
"Could not find required
property '" + startKeyStr + "' for sequence generator."));
-
options.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
+
config.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
() -> new ValidationException(
"Could not find required
property '" + endKeyStr + "' for sequence generator."));
+ ConfigOption<Integer> intStart =
startKey.intType().noDefaultValue();
+ ConfigOption<Integer> intEnd =
endKey.intType().noDefaultValue();
+ ConfigOption<Long> longStart =
startKey.longType().noDefaultValue();
+ ConfigOption<Long> longEnd = endKey.longType().noDefaultValue();
switch (type.getLogicalType().getTypeRoot()) {
case CHAR:
case VARCHAR:
- return getSequenceStringGenerator(
-
options.get(startKey.longType().noDefaultValue()),
-
options.get(endKey.longType().noDefaultValue()));
+ return DataGeneratorContainer.of(
+ getSequenceStringGenerator(
+
config.get(longStart), config.get(longEnd)),
+ longStart, longEnd);
case TINYINT:
- return SequenceGenerator.byteGenerator(
-
options.get(startKey.intType().noDefaultValue()).byteValue(),
-
options.get(endKey.intType().noDefaultValue()).byteValue());
+ return DataGeneratorContainer.of(
+ SequenceGenerator.byteGenerator(
+
config.get(intStart).byteValue(),
+
config.get(intEnd).byteValue()),
+ intStart, intEnd);
case SMALLINT:
- return SequenceGenerator.shortGenerator(
-
options.get(startKey.intType().noDefaultValue()).shortValue(),
-
options.get(endKey.intType().noDefaultValue()).shortValue());
+ return DataGeneratorContainer.of(
+
SequenceGenerator.shortGenerator(
+
config.get(intStart).shortValue(),
+
config.get(intEnd).shortValue()),
+ intStart, intEnd);
case INTEGER:
- return SequenceGenerator.intGenerator(
-
options.get(startKey.intType().noDefaultValue()),
-
options.get(endKey.intType().noDefaultValue()));
+ return DataGeneratorContainer.of(
+ SequenceGenerator.intGenerator(
+
config.get(intStart), config.get(intEnd)),
+ intStart, intEnd);
case BIGINT:
- return SequenceGenerator.longGenerator(
-
options.get(startKey.longType().noDefaultValue()),
-
options.get(endKey.longType().noDefaultValue()));
+ return DataGeneratorContainer.of(
+ SequenceGenerator.longGenerator(
+
config.get(longStart), config.get(longEnd)),
+ longStart, longEnd);
case FLOAT:
- return SequenceGenerator.floatGenerator(
-
options.get(startKey.intType().noDefaultValue()).shortValue(),
-
options.get(endKey.intType().noDefaultValue()).shortValue());
+ return DataGeneratorContainer.of(
+
SequenceGenerator.floatGenerator(
+
config.get(intStart).shortValue(),
+
config.get(intEnd).shortValue()),
+ intStart, intEnd);
case DOUBLE:
- return SequenceGenerator.doubleGenerator(
-
options.get(startKey.intType().noDefaultValue()),
-
options.get(endKey.intType().noDefaultValue()));
+ return DataGeneratorContainer.of(
+
SequenceGenerator.doubleGenerator(
+
config.get(intStart), config.get(intEnd)),
+ intStart, intEnd);
default:
throw new ValidationException("Unsupported
type: " + type);
}
@@ -230,6 +279,37 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
};
}
+ private static RandomGenerator<StringData> getRandomStringGenerator(int
length) {
+ return new RandomGenerator<StringData>() {
+ @Override
+ public StringData next() {
+ return
StringData.fromString(random.nextHexString(length));
+ }
+ };
+ }
+
+ // -------------------------------- Help Classes
-------------------------------
+
+ private static class DataGeneratorContainer {
+
+ private DataGenerator generator;
+
+ /**
+ * Generator config options, for validation.
+ */
+ private Set<ConfigOption<?>> options;
+
+ private DataGeneratorContainer(DataGenerator generator,
Set<ConfigOption<?>> options) {
+ this.generator = generator;
+ this.options = options;
+ }
+
+ private static DataGeneratorContainer of(
+ DataGenerator generator, ConfigOption<?>...
options) {
+ return new DataGeneratorContainer(generator, new
HashSet<>(Arrays.asList(options)));
+ }
+ }
+
/**
* A {@link StreamTableSource} that emits each number from a given
interval exactly once,
* possibly in parallel. See {@link StatefulSequenceSource}.
diff --git
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
index 431e60e..7dc7a35 100644
---
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
+++
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
@@ -21,10 +21,12 @@ package org.apache.flink.table.factories;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
@@ -48,13 +50,34 @@ public class BlackHoleSinkFactoryTest {
Map<String, String> properties = new HashMap<>();
properties.put("connector", "blackhole");
- DynamicTableSink sink = FactoryUtil.createTableSink(
+ DynamicTableSink sink = createSink(properties);
+
+ assertEquals("BlackHole", sink.asSummaryString());
+ }
+
+ private DynamicTableSink createSink(Map<String, String> properties) {
+ return FactoryUtil.createTableSink(
null,
ObjectIdentifier.of("", "", ""),
new CatalogTableImpl(TEST_SCHEMA, properties,
""),
new Configuration(),
Thread.currentThread().getContextClassLoader());
+ }
- assertEquals("BlackHole", sink.asSummaryString());
+ @Test
+ public void testWrongKey() {
+ try {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("connector", "blackhole");
+ properties.put("unknown-key", "1");
+ createSink(properties);
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
+ "Unsupported options:\n\nunknown-key"));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
}
}
diff --git
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index db911e3..23e96f5 100644
---
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -190,8 +190,8 @@ public class DataGenTableSourceFactoryTest {
descriptor.asMap());
} catch (ValidationException e) {
Throwable cause = e.getCause();
- Assert.assertTrue(cause instanceof ValidationException);
- Assert.assertTrue(cause.getMessage().contains(
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
"Could not find required property
'fields.f0.start' for sequence generator."));
return;
}
@@ -211,14 +211,98 @@ public class DataGenTableSourceFactoryTest {
descriptor.asMap());
} catch (ValidationException e) {
Throwable cause = e.getCause();
- Assert.assertTrue(cause instanceof ValidationException);
- Assert.assertTrue(cause.getMessage().contains(
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
"Could not find required property
'fields.f0.end' for sequence generator."));
return;
}
Assert.fail("Should fail by ValidationException.");
}
+ @Test
+ public void testWrongKey() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putLong("wrong-rows-per-second", 1);
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
+ "Unsupported
options:\n\nwrong-rows-per-second"));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
+ @Test
+ public void testWrongStartInRandom() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
+ descriptor.putLong(FIELDS + ".f0." + START, 0);
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
+ "Unsupported
options:\n\nfields.f0.start"));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
+ @Test
+ public void testWrongLenInRandomLong() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
+ descriptor.putInt(FIELDS + ".f0." + LENGTH, 100);
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause.toString(), cause instanceof
ValidationException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
+ "Unsupported
options:\n\nfields.f0.length"));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
+ @Test
+ public void testWrongTypes() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+ descriptor.putString(FIELDS + ".f0." + START, "Wrong");
+ descriptor.putString(FIELDS + ".f0." + END, "Wrong");
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause.toString(), cause instanceof
IllegalArgumentException);
+ Assert.assertTrue(cause.getMessage(),
cause.getMessage().contains(
+ "Could not parse value 'Wrong' for key
'fields.f0.start'"));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
private static DynamicTableSource createSource(TableSchema schema,
Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index e3783c2..95f163d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -271,26 +271,64 @@ public final class FactoryUtil {
* <p>Note: It does not check for left-over options.
*/
public static void validateFactoryOptions(Factory factory,
ReadableConfig options) {
+ validateFactoryOptions(factory.requiredOptions(),
factory.optionalOptions(), options);
+ }
+
+ /**
+ * Validates the required options and optional options.
+ *
+ * <p>Note: It does not check for left-over options.
+ */
+ public static void validateFactoryOptions(
+ Set<ConfigOption<?>> requiredOptions,
+ Set<ConfigOption<?>> optionalOptions,
+ ReadableConfig options) {
// currently Flink's options have no validation feature which
is why we access them eagerly
// to provoke a parsing error
- final List<String> missingRequiredOptions =
factory.requiredOptions().stream()
- .filter(option -> readOption(options, option) == null)
- .map(ConfigOption::key)
- .sorted()
- .collect(Collectors.toList());
+ final List<String> missingRequiredOptions =
requiredOptions.stream()
+ .filter(option -> readOption(options, option)
== null)
+ .map(ConfigOption::key)
+ .sorted()
+ .collect(Collectors.toList());
if (!missingRequiredOptions.isEmpty()) {
throw new ValidationException(
- String.format(
- "One or more required options are
missing.\n\n" +
- "Missing required options are:\n\n" +
- "%s",
- String.join("\n",
missingRequiredOptions)));
+ String.format(
+ "One or more required
options are missing.\n\n" +
+
"Missing required options are:\n\n" +
+ "%s",
+ String.join("\n",
missingRequiredOptions)));
}
- factory.optionalOptions()
- .forEach(option -> readOption(options, option));
+ optionalOptions.forEach(option -> readOption(options, option));
+ }
+
+ /**
+ * Validates unconsumed option keys.
+ */
+ public static void validateUnconsumedKeys(
+ String factoryIdentifier,
+ Set<String> allOptionKeys,
+ Set<String> consumedOptionKeys) {
+ final Set<String> remainingOptionKeys = new
HashSet<>(allOptionKeys);
+ remainingOptionKeys.removeAll(consumedOptionKeys);
+ if (remainingOptionKeys.size() > 0) {
+ throw new ValidationException(
+ String.format(
+ "Unsupported options
found for connector '%s'.\n\n" +
+
"Unsupported options:\n\n" +
+
"%s\n\n" +
+
"Supported options:\n\n" +
+ "%s",
+ factoryIdentifier,
+
remainingOptionKeys.stream()
+
.sorted()
+
.collect(Collectors.joining("\n")),
+
consumedOptionKeys.stream()
+
.sorted()
+
.collect(Collectors.joining("\n"))));
+ }
}
//
--------------------------------------------------------------------------------------------
@@ -480,24 +518,7 @@ public final class FactoryUtil {
*/
public void validate() {
validateFactoryOptions(tableFactory, allOptions);
- final Set<String> remainingOptionKeys = new
HashSet<>(allOptions.keySet());
- remainingOptionKeys.removeAll(consumedOptionKeys);
- if (remainingOptionKeys.size() > 0) {
- throw new ValidationException(
- String.format(
- "Unsupported options found for
connector '%s'.\n\n" +
- "Unsupported options:\n\n" +
- "%s\n\n" +
- "Supported options:\n\n" +
- "%s",
-
tableFactory.factoryIdentifier(),
- remainingOptionKeys.stream()
- .sorted()
-
.collect(Collectors.joining("\n")),
- consumedOptionKeys.stream()
- .sorted()
-
.collect(Collectors.joining("\n"))));
- }
+
validateUnconsumedKeys(tableFactory.factoryIdentifier(), allOptions.keySet(),
consumedOptionKeys);
}
/**