This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new cd888c2f9e2 [FLINK-37649][datagen] Datagen connector cannot set length
for collection type
cd888c2f9e2 is described below
commit cd888c2f9e227097c35de3059e080b7e562fd033
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Apr 14 11:25:27 2025 +0800
[FLINK-37649][datagen] Datagen connector cannot set length for collection
type
---
.../datagen/table/RandomGeneratorVisitor.java | 5 ++-
.../factories/DataGenTableSourceFactoryTest.java | 43 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index 254278471a7..4c3a399c11b 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -74,7 +74,7 @@ public class RandomGeneratorVisitor extends
DataGenVisitorBase {
public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;
- private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
+ public static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
private static final float NULL_RATE_DEFAULT = 0f;
@@ -352,6 +352,7 @@ public class RandomGeneratorVisitor extends
DataGenVisitorBase {
RandomGenerator.arrayGenerator(container.getGenerator(),
config.get(lenOption));
Set<ConfigOption<?>> options = container.getOptions();
options.add(nr);
+ options.add(lenOption);
return DataGeneratorContainer.of(
new DataGeneratorMapper<>(generator, (GenericArrayData::new),
config.get(nr)),
options.toArray(new ConfigOption<?>[0]));
@@ -374,6 +375,7 @@ public class RandomGeneratorVisitor extends
DataGenVisitorBase {
Set<ConfigOption<?>> options = container.getOptions();
ConfigOption<Float> nr =
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
options.add(nr);
+ options.add(lenOption);
return DataGeneratorContainer.of(
new DataGeneratorMapper<>(mapGenerator, GenericMapData::new,
config.get(nr)),
@@ -397,6 +399,7 @@ public class RandomGeneratorVisitor extends
DataGenVisitorBase {
Set<ConfigOption<?>> options = keyContainer.getOptions();
options.addAll(valContainer.getOptions());
options.add(nr);
+ options.add(lenOption);
DataGenerator<Map<Object, Object>> mapGenerator =
RandomGenerator.mapGenerator(
diff --git
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 687d479839c..886d883fa1f 100644
---
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -56,6 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThat;
@@ -81,6 +82,11 @@ class DataGenTableSourceFactoryTest {
Column.physical("f2", DataTypes.VARCHAR(30)),
Column.physical("f3", DataTypes.VARBINARY(20)),
Column.physical("f4", DataTypes.STRING()));
+ private static final ResolvedSchema COLLECTION_SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("f0", DataTypes.ARRAY(DataTypes.STRING())),
+ Column.physical("f1", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT())),
+ Column.physical("f2",
DataTypes.MULTISET(DataTypes.INT())));
@Test
void testDataTypeCoverage() throws Exception {
@@ -347,6 +353,43 @@ class DataGenTableSourceFactoryTest {
"Custom length '21' for variable-length type
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined
in the schema.");
}
+ @Test
+ void testLengthForCollectionType() throws Exception {
+ DescriptorProperties descriptor = new DescriptorProperties();
+ final int rowsNumber = 200;
+ final int collectionSize = 10;
+ descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+ descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(),
rowsNumber);
+ // test for default length.
+ List<RowData> results = runGenerator(COLLECTION_SCHEMA, descriptor);
+ assertThat(results).hasSize(rowsNumber);
+ for (RowData row : results) {
+
assertThat(row.getArray(0).size()).isEqualTo(RANDOM_COLLECTION_LENGTH_DEFAULT);
+ assertThat(row.getMap(1).size())
+
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
+ assertThat(row.getMap(2).size())
+
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
+ }
+
+ // test for provided length.
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f0." +
DataGenConnectorOptionsUtil.LENGTH,
+ collectionSize);
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f1." +
DataGenConnectorOptionsUtil.LENGTH,
+ collectionSize);
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f2." +
DataGenConnectorOptionsUtil.LENGTH,
+ collectionSize);
+ results = runGenerator(COLLECTION_SCHEMA, descriptor);
+ assertThat(results).hasSize(rowsNumber);
+ for (RowData row : results) {
+ assertThat(row.getArray(0).size()).isEqualTo(collectionSize);
+ assertThat(row.getMap(1).size()).isEqualTo(collectionSize);
+ assertThat(row.getMap(2).size()).isEqualTo(collectionSize);
+ }
+ }
+
@Test
void testFixedLengthDataType() throws Exception {
DescriptorProperties descriptor = new DescriptorProperties();