This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new cd888c2f9e2 [FLINK-37649][datagen] Datagen connector cannot set length 
for collection type
cd888c2f9e2 is described below

commit cd888c2f9e227097c35de3059e080b7e562fd033
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Apr 14 11:25:27 2025 +0800

    [FLINK-37649][datagen] Datagen connector cannot set length for collection 
type
---
 .../datagen/table/RandomGeneratorVisitor.java      |  5 ++-
 .../factories/DataGenTableSourceFactoryTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index 254278471a7..4c3a399c11b 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -74,7 +74,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;
 
-    private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
+    public static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
 
     private static final float NULL_RATE_DEFAULT = 0f;
 
@@ -352,6 +352,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
                 RandomGenerator.arrayGenerator(container.getGenerator(), 
config.get(lenOption));
         Set<ConfigOption<?>> options = container.getOptions();
         options.add(nr);
+        options.add(lenOption);
         return DataGeneratorContainer.of(
                 new DataGeneratorMapper<>(generator, (GenericArrayData::new), 
config.get(nr)),
                 options.toArray(new ConfigOption<?>[0]));
@@ -374,6 +375,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
         Set<ConfigOption<?>> options = container.getOptions();
         ConfigOption<Float> nr = 
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
         options.add(nr);
+        options.add(lenOption);
 
         return DataGeneratorContainer.of(
                 new DataGeneratorMapper<>(mapGenerator, GenericMapData::new, 
config.get(nr)),
@@ -397,6 +399,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
         Set<ConfigOption<?>> options = keyContainer.getOptions();
         options.addAll(valContainer.getOptions());
         options.add(nr);
+        options.add(lenOption);
 
         DataGenerator<Map<Object, Object>> mapGenerator =
                 RandomGenerator.mapGenerator(
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 687d479839c..886d883fa1f 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -81,6 +82,11 @@ class DataGenTableSourceFactoryTest {
                     Column.physical("f2", DataTypes.VARCHAR(30)),
                     Column.physical("f3", DataTypes.VARBINARY(20)),
                     Column.physical("f4", DataTypes.STRING()));
+    private static final ResolvedSchema COLLECTION_SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("f0", DataTypes.ARRAY(DataTypes.STRING())),
+                    Column.physical("f1", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT())),
+                    Column.physical("f2", 
DataTypes.MULTISET(DataTypes.INT())));
 
     @Test
     void testDataTypeCoverage() throws Exception {
@@ -347,6 +353,43 @@ class DataGenTableSourceFactoryTest {
                 "Custom length '21' for variable-length type 
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined 
in the schema.");
     }
 
+    @Test
+    void testLengthForCollectionType() throws Exception {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        final int rowsNumber = 200;
+        final int collectionSize = 10;
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+        // test for default length.
+        List<RowData> results = runGenerator(COLLECTION_SCHEMA, descriptor);
+        assertThat(results).hasSize(rowsNumber);
+        for (RowData row : results) {
+            
assertThat(row.getArray(0).size()).isEqualTo(RANDOM_COLLECTION_LENGTH_DEFAULT);
+            assertThat(row.getMap(1).size())
+                    
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
+            assertThat(row.getMap(2).size())
+                    
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
+        }
+
+        // test for provided length.
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.LENGTH,
+                collectionSize);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f1." + 
DataGenConnectorOptionsUtil.LENGTH,
+                collectionSize);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.LENGTH,
+                collectionSize);
+        results = runGenerator(COLLECTION_SCHEMA, descriptor);
+        assertThat(results).hasSize(rowsNumber);
+        for (RowData row : results) {
+            assertThat(row.getArray(0).size()).isEqualTo(collectionSize);
+            assertThat(row.getMap(1).size()).isEqualTo(collectionSize);
+            assertThat(row.getMap(2).size()).isEqualTo(collectionSize);
+        }
+    }
+
     @Test
     void testFixedLengthDataType() throws Exception {
         DescriptorProperties descriptor = new DescriptorProperties();

Reply via email to