liyubin117 commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1411842995
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case DataGenConnectorOptionsUtil.RANDOM:
+ validateFieldOptions(name, type, options);
return type.getLogicalType().accept(new
RandomGeneratorVisitor(name, options));
case DataGenConnectorOptionsUtil.SEQUENCE:
return type.getLogicalType().accept(new
SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " +
kind);
}
}
+
+ private void validateFieldOptions(String name, DataType type,
ReadableConfig options) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .noDefaultValue();
+ options.getOptional(lenOption)
+ .ifPresent(
+ option -> {
+ LogicalType logicalType = type.getLogicalType();
+ if (logicalType instanceof CharType
+ || logicalType instanceof BinaryType) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of the
fixed-length field %s is not supported.",
+ name));
+ }
+ if (logicalType instanceof VarCharType) {
+ int length = ((VarCharType)
logicalType).getLength();
+ if (option > length) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of
the VARCHAR field %s should be shorter than the schema definition.",
+ name));
+ }
+ }
+ if (logicalType instanceof VarBinaryType) {
+ int length = ((VarBinaryType)
logicalType).getLength();
+ if (option > length) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of
the VARBINARY field %s should be shorter than the schema definition.",
+ name));
+ }
+ }
+ });
+ }
Review Comment:
done
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case DataGenConnectorOptionsUtil.RANDOM:
+ validateFieldOptions(name, type, options);
return type.getLogicalType().accept(new
RandomGeneratorVisitor(name, options));
case DataGenConnectorOptionsUtil.SEQUENCE:
return type.getLogicalType().accept(new
SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " +
kind);
}
}
+
+ private void validateFieldOptions(String name, DataType type,
ReadableConfig options) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .noDefaultValue();
+ options.getOptional(lenOption)
+ .ifPresent(
+ option -> {
+ LogicalType logicalType = type.getLogicalType();
+ if (logicalType instanceof CharType
+ || logicalType instanceof BinaryType) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of the
fixed-length field %s is not supported.",
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]