JingsongLi commented on a change in pull request #12864:
URL: https://github.com/apache/flink/pull/12864#discussion_r459326425
##########
File path:
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
##########
@@ -94,128 +97,176 @@ public String factoryIdentifier() {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
+ createTableFactoryHelper(this, context).validateExcept(FIELDS);
+
Configuration options = new Configuration();
context.getCatalogTable().getOptions().forEach(options::setString);
- TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ DataGenerator[] fieldGenerators = new
DataGenerator[schema.getFieldCount()];
+ Set<ConfigOption<?>> optionalOptions = new HashSet<>();
- DataGenerator[] fieldGenerators = new
DataGenerator[tableSchema.getFieldCount()];
for (int i = 0; i < fieldGenerators.length; i++) {
- fieldGenerators[i] = createDataGenerator(
- tableSchema.getFieldName(i).get(),
- tableSchema.getFieldDataType(i).get(),
- options);
+ String name = schema.getFieldNames()[i];
+ DataType type = schema.getFieldDataTypes()[i];
+
+ ConfigOption<String> kind = key(FIELDS + "." + name +
"." + KIND)
+ .stringType().defaultValue(RANDOM);
+ DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
+ fieldGenerators[i] = container.generator;
+
+ optionalOptions.add(kind);
+ optionalOptions.addAll(container.options);
}
- return new DataGenTableSource(fieldGenerators, tableSchema,
options.get(ROWS_PER_SECOND));
+ FactoryUtil.validateFactoryOptions(new HashSet<>(),
optionalOptions, options);
+
+ Set<String> consumedOptionKeys = new HashSet<>();
+ consumedOptionKeys.add(CONNECTOR.key());
+ consumedOptionKeys.add(ROWS_PER_SECOND.key());
Review comment:
No need. We don't have another version.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]