This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 73210cc  [FLINK-18487][table] Datagen and Blackhole factory omits 
unrecognized properties silently
73210cc is described below

commit 73210cc0f712158ec939ef3ad7dec52a921aad7c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jul 24 10:07:17 2020 +0800

    [FLINK-18487][table] Datagen and Blackhole factory omits unrecognized 
properties silently
    
    This closes #12864
---
 .../table/factories/BlackHoleTableSinkFactory.java |   3 +
 .../table/factories/DataGenTableSourceFactory.java | 238 ++++++++++++++-------
 .../table/factories/BlackHoleSinkFactoryTest.java  |  27 ++-
 .../factories/DataGenTableSourceFactoryTest.java   |  92 +++++++-
 .../apache/flink/table/factories/FactoryUtil.java  |  81 ++++---
 5 files changed, 326 insertions(+), 115 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
index e70e73e..fbd33d9 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BlackHoleTableSinkFactory.java
@@ -29,6 +29,8 @@ import org.apache.flink.types.RowKind;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+
 /**
  * Black hole table sink factory swallowing all input records. It is designed 
for:
  * - high performance testing.
@@ -57,6 +59,7 @@ public class BlackHoleTableSinkFactory implements 
DynamicTableSinkFactory {
 
        @Override
        public DynamicTableSink createDynamicTableSink(Context context) {
+               createTableFactoryHelper(this, context).validate();
                return new BlackHoleSink();
        }
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 8f80946..4a6c3a8 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -45,10 +45,12 @@ import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 
 /**
  * Factory for creating configured instances of {@link DataGenTableSource} in 
a stream environment.
@@ -58,6 +60,7 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
 
        public static final String IDENTIFIER = "datagen";
        public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
+       public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
 
        public static final ConfigOption<Long> ROWS_PER_SECOND = 
key("rows-per-second")
                        .longType()
@@ -97,125 +100,171 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                Configuration options = new Configuration();
                
context.getCatalogTable().getOptions().forEach(options::setString);
 
-               TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+               TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+               DataGenerator[] fieldGenerators = new 
DataGenerator[schema.getFieldCount()];
+               Set<ConfigOption<?>> optionalOptions = new HashSet<>();
 
-               DataGenerator[] fieldGenerators = new 
DataGenerator[tableSchema.getFieldCount()];
                for (int i = 0; i < fieldGenerators.length; i++) {
-                       fieldGenerators[i] = createDataGenerator(
-                                       tableSchema.getFieldName(i).get(),
-                                       tableSchema.getFieldDataType(i).get(),
-                                       options);
+                       String name = schema.getFieldNames()[i];
+                       DataType type = schema.getFieldDataTypes()[i];
+
+                       ConfigOption<String> kind = key(FIELDS + "." + name + 
"." + KIND)
+                                       .stringType().defaultValue(RANDOM);
+                       DataGeneratorContainer container = 
createContainer(name, type, options.get(kind), options);
+                       fieldGenerators[i] = container.generator;
+
+                       optionalOptions.add(kind);
+                       optionalOptions.addAll(container.options);
                }
 
-               return new DataGenTableSource(fieldGenerators, tableSchema, 
options.get(ROWS_PER_SECOND));
+               FactoryUtil.validateFactoryOptions(requiredOptions(), 
optionalOptions, options);
+
+               Set<String> consumedOptionKeys = new HashSet<>();
+               consumedOptionKeys.add(CONNECTOR.key());
+               consumedOptionKeys.add(ROWS_PER_SECOND.key());
+               
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
+               FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), 
options.keySet(), consumedOptionKeys);
+
+               return new DataGenTableSource(fieldGenerators, schema, 
options.get(ROWS_PER_SECOND));
        }
 
-       private DataGenerator createDataGenerator(String name, DataType type, 
ReadableConfig options) {
-               String genType = options.get(
-                               key(FIELDS + "." + name + "." + 
KIND).stringType().defaultValue(RANDOM));
-               switch (genType) {
+       private DataGeneratorContainer createContainer(
+                       String name, DataType type, String kind, ReadableConfig 
options) {
+               switch (kind) {
                        case RANDOM:
-                               return createRandomGenerator(name, type, 
options);
+                               return createRandomContainer(name, type, 
options);
                        case SEQUENCE:
-                               return createSequenceGenerator(name, type, 
options);
+                               return createSequenceContainer(name, type, 
options);
                        default:
-                               throw new ValidationException("Unsupported 
generator type: " + genType);
+                               throw new ValidationException("Unsupported 
generator kind: " + kind);
                }
        }
 
-       private DataGenerator createRandomGenerator(String name, DataType type, 
ReadableConfig options) {
-               ConfigOption<Integer> lenKey = key(FIELDS + "." + name + "." + 
LENGTH)
-                               .intType().defaultValue(100);
+       private DataGeneratorContainer createRandomContainer(String name, 
DataType type, ReadableConfig config) {
                OptionBuilder minKey = key(FIELDS + "." + name + "." + MIN);
                OptionBuilder maxKey = key(FIELDS + "." + name + "." + MAX);
                switch (type.getLogicalType().getTypeRoot()) {
-                       case BOOLEAN:
-                               return RandomGenerator.booleanGenerator();
+                       case BOOLEAN: {
+                               return 
DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+                       }
                        case CHAR:
-                       case VARCHAR:
-                               int length = options.get(lenKey);
-                               return getRandomStringGenerator(length);
-                       case TINYINT:
-                               return RandomGenerator.byteGenerator(
-                                               
options.get(minKey.intType().defaultValue((int) Byte.MIN_VALUE)).byteValue(),
-                                               
options.get(maxKey.intType().defaultValue((int) Byte.MAX_VALUE)).byteValue());
-                       case SMALLINT:
-                               return RandomGenerator.shortGenerator(
-                                               
options.get(minKey.intType().defaultValue((int) Short.MIN_VALUE)).shortValue(),
-                                               
options.get(maxKey.intType().defaultValue((int) Short.MAX_VALUE)).shortValue());
-                       case INTEGER:
-                               return RandomGenerator.intGenerator(
-                                               
options.get(minKey.intType().defaultValue(Integer.MIN_VALUE)),
-                                               
options.get(maxKey.intType().defaultValue(Integer.MAX_VALUE)));
-                       case BIGINT:
-                               return RandomGenerator.longGenerator(
-                                               
options.get(minKey.longType().defaultValue(Long.MIN_VALUE)),
-                                               
options.get(maxKey.longType().defaultValue(Long.MAX_VALUE)));
-                       case FLOAT:
-                               return RandomGenerator.floatGenerator(
-                                               
options.get(minKey.floatType().defaultValue(Float.MIN_VALUE)),
-                                               
options.get(maxKey.floatType().defaultValue(Float.MAX_VALUE)));
-                       case DOUBLE:
-                               return RandomGenerator.doubleGenerator(
-                                               
options.get(minKey.doubleType().defaultValue(Double.MIN_VALUE)),
-                                               
options.get(maxKey.doubleType().defaultValue(Double.MAX_VALUE)));
+                       case VARCHAR: {
+                               ConfigOption<Integer> lenOption = key(FIELDS + 
"." + name + "." + LENGTH)
+                                               .intType()
+                                               
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+                               return 
DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), 
lenOption);
+                       }
+                       case TINYINT: {
+                               ConfigOption<Integer> min = 
minKey.intType().defaultValue((int) Byte.MIN_VALUE);
+                               ConfigOption<Integer> max = 
maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.byteGenerator(
+                                                               
config.get(min).byteValue(), config.get(max).byteValue()),
+                                               min, max);
+                       }
+                       case SMALLINT: {
+                               ConfigOption<Integer> min = 
minKey.intType().defaultValue((int) Short.MIN_VALUE);
+                               ConfigOption<Integer> max = 
maxKey.intType().defaultValue((int) Short.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.shortGenerator(
+                                                               
config.get(min).shortValue(),
+                                                               
config.get(max).shortValue()),
+                                               min, max);
+                       }
+                       case INTEGER: {
+                               ConfigOption<Integer> min = 
minKey.intType().defaultValue(Integer.MIN_VALUE);
+                               ConfigOption<Integer> max = 
maxKey.intType().defaultValue(Integer.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.intGenerator(
+                                                               
config.get(min), config.get(max)),
+                                               min, max);
+                       }
+                       case BIGINT: {
+                               ConfigOption<Long> min = 
minKey.longType().defaultValue(Long.MIN_VALUE);
+                               ConfigOption<Long> max = 
maxKey.longType().defaultValue(Long.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.longGenerator(
+                                                               
config.get(min), config.get(max)),
+                                               min, max);
+                       }
+                       case FLOAT: {
+                               ConfigOption<Float> min = 
minKey.floatType().defaultValue(Float.MIN_VALUE);
+                               ConfigOption<Float> max = 
maxKey.floatType().defaultValue(Float.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.floatGenerator(
+                                                               
config.get(min), config.get(max)),
+                                               min, max);
+                       }
+                       case DOUBLE: {
+                               ConfigOption<Double> min = 
minKey.doubleType().defaultValue(Double.MIN_VALUE);
+                               ConfigOption<Double> max = 
maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+                               return DataGeneratorContainer.of(
+                                               RandomGenerator.doubleGenerator(
+                                                               
config.get(min), config.get(max)),
+                                               min, max);
+                       }
                        default:
                                throw new ValidationException("Unsupported 
type: " + type);
                }
        }
 
-       private static RandomGenerator<StringData> getRandomStringGenerator(int 
length) {
-               return new RandomGenerator<StringData>() {
-                       @Override
-                       public StringData next() {
-                               return 
StringData.fromString(random.nextHexString(length));
-                       }
-               };
-       }
-
-       private DataGenerator createSequenceGenerator(String name, DataType 
type, ReadableConfig options) {
+       private DataGeneratorContainer createSequenceContainer(String name, 
DataType type, ReadableConfig config) {
                String startKeyStr = FIELDS + "." + name + "." + START;
                String endKeyStr = FIELDS + "." + name + "." + END;
                OptionBuilder startKey = key(startKeyStr);
                OptionBuilder endKey = key(endKeyStr);
 
-               
options.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
+               
config.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
                                () -> new ValidationException(
                                                "Could not find required 
property '" + startKeyStr + "' for sequence generator."));
-               
options.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
+               
config.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
                                () -> new ValidationException(
                                                "Could not find required 
property '" + endKeyStr + "' for sequence generator."));
 
+               ConfigOption<Integer> intStart = 
startKey.intType().noDefaultValue();
+               ConfigOption<Integer> intEnd = 
endKey.intType().noDefaultValue();
+               ConfigOption<Long> longStart = 
startKey.longType().noDefaultValue();
+               ConfigOption<Long> longEnd = endKey.longType().noDefaultValue();
                switch (type.getLogicalType().getTypeRoot()) {
                        case CHAR:
                        case VARCHAR:
-                       return getSequenceStringGenerator(
-                                       
options.get(startKey.longType().noDefaultValue()),
-                                       
options.get(endKey.longType().noDefaultValue()));
+                               return DataGeneratorContainer.of(
+                                               getSequenceStringGenerator(
+                                                               
config.get(longStart), config.get(longEnd)),
+                                               longStart, longEnd);
                        case TINYINT:
-                               return SequenceGenerator.byteGenerator(
-                                               
options.get(startKey.intType().noDefaultValue()).byteValue(),
-                                               
options.get(endKey.intType().noDefaultValue()).byteValue());
+                               return DataGeneratorContainer.of(
+                                               SequenceGenerator.byteGenerator(
+                                                               
config.get(intStart).byteValue(),
+                                                               
config.get(intEnd).byteValue()),
+                                               intStart, intEnd);
                        case SMALLINT:
-                               return SequenceGenerator.shortGenerator(
-                                               
options.get(startKey.intType().noDefaultValue()).shortValue(),
-                                               
options.get(endKey.intType().noDefaultValue()).shortValue());
+                               return DataGeneratorContainer.of(
+                                               
SequenceGenerator.shortGenerator(
+                                                               
config.get(intStart).shortValue(),
+                                                               
config.get(intEnd).shortValue()),
+                                               intStart, intEnd);
                        case INTEGER:
-                               return SequenceGenerator.intGenerator(
-                                               
options.get(startKey.intType().noDefaultValue()),
-                                               
options.get(endKey.intType().noDefaultValue()));
+                               return DataGeneratorContainer.of(
+                                               SequenceGenerator.intGenerator(
+                                                               
config.get(intStart), config.get(intEnd)),
+                                               intStart, intEnd);
                        case BIGINT:
-                               return SequenceGenerator.longGenerator(
-                                               
options.get(startKey.longType().noDefaultValue()),
-                                               
options.get(endKey.longType().noDefaultValue()));
+                               return DataGeneratorContainer.of(
+                                               SequenceGenerator.longGenerator(
+                                                               
config.get(longStart), config.get(longEnd)),
+                                               longStart, longEnd);
                        case FLOAT:
-                               return SequenceGenerator.floatGenerator(
-                                               
options.get(startKey.intType().noDefaultValue()).shortValue(),
-                                               
options.get(endKey.intType().noDefaultValue()).shortValue());
+                               return DataGeneratorContainer.of(
+                                               
SequenceGenerator.floatGenerator(
+                                                               
config.get(intStart).shortValue(),
+                                                               
config.get(intEnd).shortValue()),
+                                               intStart, intEnd);
                        case DOUBLE:
-                               return SequenceGenerator.doubleGenerator(
-                                               
options.get(startKey.intType().noDefaultValue()),
-                                               
options.get(endKey.intType().noDefaultValue()));
+                               return DataGeneratorContainer.of(
+                                               
SequenceGenerator.doubleGenerator(
+                                                               
config.get(intStart), config.get(intEnd)),
+                                               intStart, intEnd);
                        default:
                                throw new ValidationException("Unsupported 
type: " + type);
                }
@@ -230,6 +279,37 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                };
        }
 
+       private static RandomGenerator<StringData> getRandomStringGenerator(int 
length) {
+               return new RandomGenerator<StringData>() {
+                       @Override
+                       public StringData next() {
+                               return 
StringData.fromString(random.nextHexString(length));
+                       }
+               };
+       }
+
+       // -------------------------------- Help Classes 
-------------------------------
+
+       private static class DataGeneratorContainer {
+
+               private DataGenerator generator;
+
+               /**
+                * Generator config options, for validation.
+                */
+               private Set<ConfigOption<?>> options;
+
+               private DataGeneratorContainer(DataGenerator generator, 
Set<ConfigOption<?>> options) {
+                       this.generator = generator;
+                       this.options = options;
+               }
+
+               private static DataGeneratorContainer of(
+                               DataGenerator generator, ConfigOption<?>... 
options) {
+                       return new DataGeneratorContainer(generator, new 
HashSet<>(Arrays.asList(options)));
+               }
+       }
+
        /**
         * A {@link StreamTableSource} that emits each number from a given 
interval exactly once,
         * possibly in parallel. See {@link StatefulSequenceSource}.
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
index 431e60e..7dc7a35 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/BlackHoleSinkFactoryTest.java
@@ -21,10 +21,12 @@ package org.apache.flink.table.factories;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -48,13 +50,34 @@ public class BlackHoleSinkFactoryTest {
                Map<String, String> properties = new HashMap<>();
                properties.put("connector", "blackhole");
 
-               DynamicTableSink sink = FactoryUtil.createTableSink(
+               DynamicTableSink sink = createSink(properties);
+
+               assertEquals("BlackHole", sink.asSummaryString());
+       }
+
+       private DynamicTableSink createSink(Map<String, String> properties) {
+               return FactoryUtil.createTableSink(
                                null,
                                ObjectIdentifier.of("", "", ""),
                                new CatalogTableImpl(TEST_SCHEMA, properties, 
""),
                                new Configuration(),
                                Thread.currentThread().getContextClassLoader());
+       }
 
-               assertEquals("BlackHole", sink.asSummaryString());
+       @Test
+       public void testWrongKey() {
+               try {
+                       Map<String, String> properties = new HashMap<>();
+                       properties.put("connector", "blackhole");
+                       properties.put("unknown-key", "1");
+                       createSink(properties);
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
+                                       "Unsupported options:\n\nunknown-key"));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
        }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index db911e3..23e96f5 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -190,8 +190,8 @@ public class DataGenTableSourceFactoryTest {
                                        descriptor.asMap());
                } catch (ValidationException e) {
                        Throwable cause = e.getCause();
-                       Assert.assertTrue(cause instanceof ValidationException);
-                       Assert.assertTrue(cause.getMessage().contains(
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
                                        "Could not find required property 
'fields.f0.start' for sequence generator."));
                        return;
                }
@@ -211,14 +211,98 @@ public class DataGenTableSourceFactoryTest {
                                        descriptor.asMap());
                } catch (ValidationException e) {
                        Throwable cause = e.getCause();
-                       Assert.assertTrue(cause instanceof ValidationException);
-                       Assert.assertTrue(cause.getMessage().contains(
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
                                        "Could not find required property 
'fields.f0.end' for sequence generator."));
                        return;
                }
                Assert.fail("Should fail by ValidationException.");
        }
 
+       @Test
+       public void testWrongKey() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putLong("wrong-rows-per-second", 1);
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
+                                       "Unsupported 
options:\n\nwrong-rows-per-second"));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
+       @Test
+       public void testWrongStartInRandom() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
+                       descriptor.putLong(FIELDS + ".f0." + START, 0);
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
+                                       "Unsupported 
options:\n\nfields.f0.start"));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
+       @Test
+       public void testWrongLenInRandomLong() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
+                       descriptor.putInt(FIELDS + ".f0." + LENGTH, 100);
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause.toString(), cause instanceof 
ValidationException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
+                                       "Unsupported 
options:\n\nfields.f0.length"));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
+       @Test
+       public void testWrongTypes() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+                       descriptor.putString(FIELDS + ".f0." + START, "Wrong");
+                       descriptor.putString(FIELDS + ".f0." + END, "Wrong");
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause.toString(), cause instanceof 
IllegalArgumentException);
+                       Assert.assertTrue(cause.getMessage(), 
cause.getMessage().contains(
+                                       "Could not parse value 'Wrong' for key 
'fields.f0.start'"));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
        private static DynamicTableSource createSource(TableSchema schema, 
Map<String, String> options) {
                return FactoryUtil.createTableSource(
                                null,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index e3783c2..95f163d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -271,26 +271,64 @@ public final class FactoryUtil {
         * <p>Note: It does not check for left-over options.
         */
        public static void validateFactoryOptions(Factory factory, 
ReadableConfig options) {
+               validateFactoryOptions(factory.requiredOptions(), 
factory.optionalOptions(), options);
+       }
+
+       /**
+        * Validates the required options and optional options.
+        *
+        * <p>Note: It does not check for left-over options.
+        */
+       public static void validateFactoryOptions(
+                       Set<ConfigOption<?>> requiredOptions,
+                       Set<ConfigOption<?>> optionalOptions,
+                       ReadableConfig options) {
                // currently Flink's options have no validation feature which 
is why we access them eagerly
                // to provoke a parsing error
 
-               final List<String> missingRequiredOptions = 
factory.requiredOptions().stream()
-                       .filter(option -> readOption(options, option) == null)
-                       .map(ConfigOption::key)
-                       .sorted()
-                       .collect(Collectors.toList());
+               final List<String> missingRequiredOptions = 
requiredOptions.stream()
+                               .filter(option -> readOption(options, option) 
== null)
+                               .map(ConfigOption::key)
+                               .sorted()
+                               .collect(Collectors.toList());
 
                if (!missingRequiredOptions.isEmpty()) {
                        throw new ValidationException(
-                               String.format(
-                                       "One or more required options are 
missing.\n\n" +
-                                       "Missing required options are:\n\n" +
-                                       "%s",
-                                       String.join("\n", 
missingRequiredOptions)));
+                                       String.format(
+                                                       "One or more required 
options are missing.\n\n" +
+                                                                       
"Missing required options are:\n\n" +
+                                                                       "%s",
+                                                       String.join("\n", 
missingRequiredOptions)));
                }
 
-               factory.optionalOptions()
-                       .forEach(option -> readOption(options, option));
+               optionalOptions.forEach(option -> readOption(options, option));
+       }
+
+       /**
+        * Validates unconsumed option keys.
+        */
+       public static void validateUnconsumedKeys(
+                       String factoryIdentifier,
+                       Set<String> allOptionKeys,
+                       Set<String> consumedOptionKeys) {
+               final Set<String> remainingOptionKeys = new 
HashSet<>(allOptionKeys);
+               remainingOptionKeys.removeAll(consumedOptionKeys);
+               if (remainingOptionKeys.size() > 0) {
+                       throw new ValidationException(
+                                       String.format(
+                                                       "Unsupported options 
found for connector '%s'.\n\n" +
+                                                                       
"Unsupported options:\n\n" +
+                                                                       
"%s\n\n" +
+                                                                       
"Supported options:\n\n" +
+                                                                       "%s",
+                                                       factoryIdentifier,
+                                                       
remainingOptionKeys.stream()
+                                                                       
.sorted()
+                                                                       
.collect(Collectors.joining("\n")),
+                                                       
consumedOptionKeys.stream()
+                                                                       
.sorted()
+                                                                       
.collect(Collectors.joining("\n"))));
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -480,24 +518,7 @@ public final class FactoryUtil {
                 */
                public void validate() {
                        validateFactoryOptions(tableFactory, allOptions);
-                       final Set<String> remainingOptionKeys = new 
HashSet<>(allOptions.keySet());
-                       remainingOptionKeys.removeAll(consumedOptionKeys);
-                       if (remainingOptionKeys.size() > 0) {
-                               throw new ValidationException(
-                                       String.format(
-                                               "Unsupported options found for 
connector '%s'.\n\n" +
-                                               "Unsupported options:\n\n" +
-                                               "%s\n\n" +
-                                               "Supported options:\n\n" +
-                                               "%s",
-                                               
tableFactory.factoryIdentifier(),
-                                               remainingOptionKeys.stream()
-                                                       .sorted()
-                                                       
.collect(Collectors.joining("\n")),
-                                               consumedOptionKeys.stream()
-                                                       .sorted()
-                                                       
.collect(Collectors.joining("\n"))));
-                       }
+                       
validateUnconsumedKeys(tableFactory.factoryIdentifier(), allOptions.keySet(), 
consumedOptionKeys);
                }
 
                /**

Reply via email to