LadyForest commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1410315513
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case DataGenConnectorOptionsUtil.RANDOM:
+ validateFieldOptions(name, type, options);
return type.getLogicalType().accept(new
RandomGeneratorVisitor(name, options));
case DataGenConnectorOptionsUtil.SEQUENCE:
return type.getLogicalType().accept(new
SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " +
kind);
}
}
+
+ private void validateFieldOptions(String name, DataType type,
ReadableConfig options) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .noDefaultValue();
+ options.getOptional(lenOption)
+ .ifPresent(
+ option -> {
+ LogicalType logicalType = type.getLogicalType();
+ if (logicalType instanceof CharType
+ || logicalType instanceof BinaryType) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of the
fixed-length field %s is not supported.",
Review Comment:
Nit: Custom length for fixed-length type field '%s' is not supported.
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case DataGenConnectorOptionsUtil.RANDOM:
+ validateFieldOptions(name, type, options);
return type.getLogicalType().accept(new
RandomGeneratorVisitor(name, options));
case DataGenConnectorOptionsUtil.SEQUENCE:
return type.getLogicalType().accept(new
SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " +
kind);
}
}
+
+ private void validateFieldOptions(String name, DataType type,
ReadableConfig options) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .noDefaultValue();
+ options.getOptional(lenOption)
+ .ifPresent(
+ option -> {
+ LogicalType logicalType = type.getLogicalType();
+ if (logicalType instanceof CharType
+ || logicalType instanceof BinaryType) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of the
fixed-length field %s is not supported.",
+ name));
+ }
+ if (logicalType instanceof VarCharType) {
+ int length = ((VarCharType)
logicalType).getLength();
+ if (option > length) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of
the VARCHAR field %s should be shorter than the schema definition.",
+ name));
+ }
+ }
+ if (logicalType instanceof VarBinaryType) {
+ int length = ((VarBinaryType)
logicalType).getLength();
+ if (option > length) {
+ throw new ValidationException(
+ String.format(
+ "User-defined length of
the VARBINARY field %s should be shorter than the schema definition.",
+ name));
+ }
+ }
+ });
+ }
Review Comment:
Nit: I think we don't need to differentiate the `VARCHAR` v.s. `VARBINARY`
here.
How about
```java
String.format("Custom length '%d' for variable-length type VARCHAR/VARBINARY
should be shorter than '%d' defined in the schema.", option, length);
```
##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -215,6 +216,100 @@ void testSource() throws Exception {
}
}
+ @Test
+ void testVariableLengthDataType() throws Exception {
+ DescriptorProperties descriptor = new DescriptorProperties();
Review Comment:
Nit: `DescriptorProperties` is deprecated already. If you're willing, you
can open a new subtask under FLINK-31596 to migrate the usage to
`CatalogPropertiesUtil`.
##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java:
##########
@@ -152,25 +142,20 @@ public DataGeneratorContainer visit(VarCharType
varCharType) {
+ "."
+ DataGenConnectorOptionsUtil.LENGTH)
.intType()
- .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+ .defaultValue(varCharType.getLength());
Review Comment:
Nit: maybe we can extract a util method like
```java
private ConfigOption<Integer> getLengthOption(Supplier<Integer>
defaultLengthSupplier) {
return key(String.join(
".",
DataGenConnectorOptionsUtil.FIELDS,
name,
DataGenConnectorOptionsUtil.LENGTH))
.intType()
.defaultValue(defaultLengthSupplier.get());
}
```
```java
@Override
public DataGeneratorContainer visit(VarCharType varCharType) {
ConfigOption<Integer> lenOption =
getLengthOption(varCharType::getLength);
int length =
config.get(lenOption) == VarCharType.MAX_LENGTH
? RANDOM_STRING_LENGTH_DEFAULT
: config.get(lenOption);
ConfigOption<Float> nr =
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
return DataGeneratorContainer.of(
getRandomStringGenerator(length).withNullRate(config.get(nr)), lenOption, nr);
}
@Override
public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
ConfigOption<Integer> lenOption =
getLengthOption(varBinaryType::getLength);
int length =
config.get(lenOption) == VarBinaryType.MAX_LENGTH
? RANDOM_BYTES_LENGTH_DEFAULT
: config.get(lenOption);
return DataGeneratorContainer.of(getRandomBytesGenerator(length),
lenOption);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]