This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e3a0772  [FLINK-18207][FLINK-18185][table] Fix datagen connector 
exactly-once bug and validation message
e3a0772 is described below

commit e3a0772a48fa8a416e139a84e322b2510cfd49b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 9 20:19:07 2020 +0800

    [FLINK-18207][FLINK-18185][table] Fix datagen connector exactly-once bug 
and validation message
    
    
    This closes #12544
---
 .../api/functions/StatefulSequenceSourceTest.java  | 20 ++---
 .../source/datagen/DataGeneratorSourceTest.java    | 51 +++++++-----
 flink-table/flink-table-api-java-bridge/pom.xml    |  7 ++
 .../table/factories/DataGenTableSourceFactory.java | 20 ++++-
 .../factories/DataGenTableSourceFactoryTest.java   | 97 ++++++++++++++++++++--
 5 files changed, 152 insertions(+), 43 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index a9d5443..5fcb356 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -79,7 +79,7 @@ public class StatefulSequenceSourceTest {
                        @Override
                        public void run() {
                                try {
-                                       source1.run(new 
BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
+                                       source1.run(new 
BlockingSourceContext<>("1", latchToTrigger1, latchToWait1, outputCollector, 
21));
                                }
                                catch (Throwable t) {
                                        t.printStackTrace();
@@ -93,7 +93,7 @@ public class StatefulSequenceSourceTest {
                        @Override
                        public void run() {
                                try {
-                                       source2.run(new 
BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
+                                       source2.run(new 
BlockingSourceContext<>("2", latchToTrigger2, latchToWait2, outputCollector, 
32));
                                }
                                catch (Throwable t) {
                                        t.printStackTrace();
@@ -139,7 +139,7 @@ public class StatefulSequenceSourceTest {
                        @Override
                        public void run() {
                                try {
-                                       source3.run(new 
BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
+                                       source3.run(new 
BlockingSourceContext<>("3", latchToTrigger3, latchToWait3, outputCollector, 
3));
                                }
                                catch (Throwable t) {
                                        t.printStackTrace();
@@ -186,22 +186,22 @@ public class StatefulSequenceSourceTest {
        /**
         * Test SourceContext.
         */
-       public static class BlockingSourceContext implements 
SourceFunction.SourceContext<Long> {
+       public static class BlockingSourceContext<T> implements 
SourceFunction.SourceContext<T> {
 
                private final String name;
 
                private final Object lock;
                private final OneShotLatch latchToTrigger;
                private final OneShotLatch latchToWait;
-               private final ConcurrentHashMap<String, List<Long>> collector;
+               private final ConcurrentHashMap<String, List<T>> collector;
 
                private final int threshold;
                private int counter = 0;
 
-               private final List<Long> localOutput;
+               private final List<T> localOutput;
 
                public BlockingSourceContext(String name, OneShotLatch 
latchToTrigger, OneShotLatch latchToWait,
-                                                                       
ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
+                                                                       
ConcurrentHashMap<String, List<T>> output, int elemToFire) {
                        this.name = name;
                        this.lock = new Object();
                        this.latchToTrigger = latchToTrigger;
@@ -210,19 +210,19 @@ public class StatefulSequenceSourceTest {
                        this.threshold = elemToFire;
 
                        this.localOutput = new ArrayList<>();
-                       List<Long> prev = collector.put(name, localOutput);
+                       List<T> prev = collector.put(name, localOutput);
                        if (prev != null) {
                                Assert.fail();
                        }
                }
 
                @Override
-               public void collectWithTimestamp(Long element, long timestamp) {
+               public void collectWithTimestamp(T element, long timestamp) {
                        collect(element);
                }
 
                @Override
-               public void collect(Long element) {
+               public void collect(T element) {
                        localOutput.add(element);
                        if (++counter == threshold) {
                                latchToTrigger.trigger();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
index a59bb4a..b11fec0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 
 /**
  * Tests for {@link DataGeneratorSource}.
@@ -99,39 +100,44 @@ public class DataGeneratorSourceTest {
        public void testSequenceCheckpointRestore() throws Exception {
                final int initElement = 0;
                final int maxElement = 100;
-               final int maxParallelsim = 2;
-
                final Set<Long> expectedOutput = new HashSet<>();
                for (long i = initElement; i <= maxElement; i++) {
                        expectedOutput.add(i);
                }
+               DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(
+                               () -> new DataGeneratorSource<>(
+                                               
SequenceGenerator.longGenerator(initElement, maxElement)),
+                               expectedOutput);
+       }
 
-               final ConcurrentHashMap<String, List<Long>> outputCollector = 
new ConcurrentHashMap<>();
+       public static <T> void innerTestDataGenCheckpointRestore(
+                       Supplier<DataGeneratorSource<T>> supplier,
+                       Set<T> expectedOutput) throws Exception {
+               final int maxParallelsim = 2;
+               final ConcurrentHashMap<String, List<T>> outputCollector = new 
ConcurrentHashMap<>();
                final OneShotLatch latchToTrigger1 = new OneShotLatch();
                final OneShotLatch latchToWait1 = new OneShotLatch();
                final OneShotLatch latchToTrigger2 = new OneShotLatch();
                final OneShotLatch latchToWait2 = new OneShotLatch();
 
-               final DataGeneratorSource<Long> source1 = new 
DataGeneratorSource<>(
-                               SequenceGenerator.longGenerator(initElement, 
maxElement));
-               StreamSource<Long, DataGeneratorSource<Long>> src1 = new 
StreamSource<>(source1);
+               final DataGeneratorSource<T> source1 = supplier.get();
+               StreamSource<T, DataGeneratorSource<T>> src1 = new 
StreamSource<>(source1);
 
-               final AbstractStreamOperatorTestHarness<Long> testHarness1 =
+               final AbstractStreamOperatorTestHarness<T> testHarness1 =
                        new AbstractStreamOperatorTestHarness<>(src1, 
maxParallelsim, 2, 0);
                testHarness1.open();
 
-               final DataGeneratorSource<Long> source2 = new 
DataGeneratorSource<>(
-                               SequenceGenerator.longGenerator(initElement, 
maxElement));
-               StreamSource<Long, DataGeneratorSource<Long>> src2 = new 
StreamSource<>(source2);
+               final DataGeneratorSource<T> source2 = supplier.get();
+               StreamSource<T, DataGeneratorSource<T>> src2 = new 
StreamSource<>(source2);
 
-               final AbstractStreamOperatorTestHarness<Long> testHarness2 =
+               final AbstractStreamOperatorTestHarness<T> testHarness2 =
                        new AbstractStreamOperatorTestHarness<>(src2, 
maxParallelsim, 2, 1);
                testHarness2.open();
 
                // run the source asynchronously
                Thread runner1 = new Thread(() -> {
                        try {
-                               source1.run(new BlockingSourceContext(
+                               source1.run(new BlockingSourceContext<>(
                                                "1", latchToTrigger1, 
latchToWait1, outputCollector, 21));
                        } catch (Throwable t) {
                                t.printStackTrace();
@@ -141,7 +147,7 @@ public class DataGeneratorSourceTest {
                // run the source asynchronously
                Thread runner2 = new Thread(() -> {
                        try {
-                               source2.run(new BlockingSourceContext(
+                               source2.run(new BlockingSourceContext<>(
                                                "2", latchToTrigger2, 
latchToWait2, outputCollector, 32));
                        }
                        catch (Throwable t) {
@@ -165,15 +171,14 @@ public class DataGeneratorSourceTest {
                        testHarness2.snapshot(0L, 0L)
                );
 
-               final DataGeneratorSource<Long> source3 = new 
DataGeneratorSource<>(
-                               SequenceGenerator.longGenerator(initElement, 
maxElement));
-               StreamSource<Long, DataGeneratorSource<Long>> src3 = new 
StreamSource<>(source3);
+               final DataGeneratorSource<T> source3 = supplier.get();
+               StreamSource<T, DataGeneratorSource<T>> src3 = new 
StreamSource<>(source3);
 
                final OperatorSubtaskState initState =
                                
AbstractStreamOperatorTestHarness.repartitionOperatorState(
                                                snapshot, maxParallelsim, 2, 1, 
0);
 
-               final AbstractStreamOperatorTestHarness<Long> testHarness3 =
+               final AbstractStreamOperatorTestHarness<T> testHarness3 =
                        new AbstractStreamOperatorTestHarness<>(src3, 
maxParallelsim, 1, 0);
                testHarness3.setup();
                testHarness3.initializeState(initState);
@@ -186,7 +191,7 @@ public class DataGeneratorSourceTest {
                // run the source asynchronously
                Thread runner3 = new Thread(() -> {
                        try {
-                               source3.run(new BlockingSourceContext(
+                               source3.run(new BlockingSourceContext<>(
                                                "3", latchToTrigger3, 
latchToWait3, outputCollector, 3));
                        }
                        catch (Throwable t) {
@@ -199,15 +204,15 @@ public class DataGeneratorSourceTest {
                Assert.assertEquals(3, outputCollector.size()); // we have 3 
tasks.
 
                // test for at-most-once
-               Set<Long> dedupRes = new HashSet<>(Math.abs(maxElement - 
initElement) + 1);
-               for (Map.Entry<String, List<Long>> elementsPerTask: 
outputCollector.entrySet()) {
+               Set<T> dedupRes = new HashSet<>(expectedOutput.size());
+               for (Map.Entry<String, List<T>> elementsPerTask: 
outputCollector.entrySet()) {
                        String key = elementsPerTask.getKey();
-                       List<Long> elements = outputCollector.get(key);
+                       List<T> elements = outputCollector.get(key);
 
                        // this tests the correctness of the latches in the test
                        Assert.assertTrue(elements.size() > 0);
 
-                       for (Long elem : elements) {
+                       for (T elem : elements) {
                                if (!dedupRes.add(elem)) {
                                        Assert.fail("Duplicate entry: " + elem);
                                }
@@ -219,7 +224,7 @@ public class DataGeneratorSourceTest {
                }
 
                // test for exactly-once
-               Assert.assertEquals(Math.abs(initElement - maxElement) + 1, 
dedupRes.size());
+               Assert.assertEquals(expectedOutput.size(), dedupRes.size());
 
                latchToWait1.trigger();
                latchToWait2.trigger();
diff --git a/flink-table/flink-table-api-java-bridge/pom.xml 
b/flink-table/flink-table-api-java-bridge/pom.xml
index 3d79cf2..f8f4302 100644
--- a/flink-table/flink-table-api-java-bridge/pom.xml
+++ b/flink-table/flink-table-api-java-bridge/pom.xml
@@ -53,6 +53,8 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <!-- test dependencies -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-api-java</artifactId>
@@ -80,5 +82,10 @@ under the License.
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+               </dependency>
        </dependencies>
 </project>
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 27a517b..b2c3c37 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.configuration.ConfigOptions.OptionBuilder;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
@@ -172,13 +173,17 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
        }
 
        private DataGenerator createSequenceGenerator(String name, DataType 
type, ReadableConfig options) {
-               OptionBuilder startKey = key(FIELDS + "." + name + "." + START);
-               OptionBuilder endKey = key(FIELDS + "." + name + "." + END);
+               String startKeyStr = FIELDS + "." + name + "." + START;
+               String endKeyStr = FIELDS + "." + name + "." + END;
+               OptionBuilder startKey = key(startKeyStr);
+               OptionBuilder endKey = key(endKeyStr);
 
                
options.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
-                               () -> new ValidationException("Could not find 
required property '" + startKey + "'."));
+                               () -> new ValidationException(
+                                               "Could not find required 
property '" + startKeyStr + "' for sequence generator."));
                
options.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
-                               () -> new ValidationException("Could not find 
required property '" + endKey + "'."));
+                               () -> new ValidationException(
+                                               "Could not find required 
property '" + endKeyStr + "' for sequence generator."));
 
                switch (type.getLogicalType().getTypeRoot()) {
                        case CHAR:
@@ -291,6 +296,13 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                }
 
                @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+                       for (DataGenerator generator : fieldGenerators) {
+                               generator.snapshotState(context);
+                       }
+               }
+
+               @Override
                public boolean hasNext() {
                        for (DataGenerator generator : fieldGenerators) {
                                if (!generator.hasNext()) {
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 827e698..db911e3 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -21,14 +21,17 @@ package org.apache.flink.table.factories;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSourceTest;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import 
org.apache.flink.table.factories.DataGenTableSourceFactory.DataGenTableSource;
@@ -37,8 +40,12 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
 import static 
org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
@@ -80,12 +87,8 @@ public class DataGenTableSourceFactoryTest {
                descriptor.putLong(FIELDS + ".f2." + START, 50);
                descriptor.putLong(FIELDS + ".f2." + END, 60);
 
-               DynamicTableSource source = FactoryUtil.createTableSource(
-                               null,
-                               ObjectIdentifier.of("", "", ""),
-                               new CatalogTableImpl(TEST_SCHEMA, 
descriptor.asMap(), ""),
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader());
+               DynamicTableSource source = createSource(
+                               TEST_SCHEMA, descriptor.asMap());
 
                assertTrue(source instanceof DataGenTableSource);
 
@@ -142,4 +145,86 @@ public class DataGenTableSourceFactoryTest {
                        Assert.assertEquals(i + 50, row.getLong(2));
                }
        }
+
+       @Test
+       public void testSequenceCheckpointRestore() throws Exception {
+               DescriptorProperties descriptor = new DescriptorProperties();
+               descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+               descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+               descriptor.putLong(FIELDS + ".f0." + START, 0);
+               descriptor.putLong(FIELDS + ".f0." + END, 100);
+
+               DynamicTableSource dynamicTableSource = createSource(
+                               TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                               descriptor.asMap());
+
+               DataGenTableSource dataGenTableSource = (DataGenTableSource) 
dynamicTableSource;
+               DataGeneratorSource<RowData> source = 
dataGenTableSource.createSource();
+
+               final int initElement = 0;
+               final int maxElement = 100;
+               final Set<RowData> expectedOutput = new HashSet<>();
+               for (long i = initElement; i <= maxElement; i++) {
+                       expectedOutput.add(GenericRowData.of(i));
+               }
+               DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(
+                               () -> {
+                                       try {
+                                               return 
InstantiationUtil.clone(source);
+                                       } catch (IOException | 
ClassNotFoundException e) {
+                                               throw new RuntimeException(e);
+                                       }
+                               }, expectedOutput);
+       }
+
+       @Test
+       public void testLackStartForSequence() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+                       descriptor.putLong(FIELDS + ".f0." + END, 100);
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause instanceof ValidationException);
+                       Assert.assertTrue(cause.getMessage().contains(
+                                       "Could not find required property 
'fields.f0.start' for sequence generator."));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
+       @Test
+       public void testLackEndForSequence() {
+               try {
+                       DescriptorProperties descriptor = new 
DescriptorProperties();
+                       descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
+                       descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+                       descriptor.putLong(FIELDS + ".f0." + START, 0);
+
+                       createSource(
+                                       TableSchema.builder().field("f0", 
DataTypes.BIGINT()).build(),
+                                       descriptor.asMap());
+               } catch (ValidationException e) {
+                       Throwable cause = e.getCause();
+                       Assert.assertTrue(cause instanceof ValidationException);
+                       Assert.assertTrue(cause.getMessage().contains(
+                                       "Could not find required property 
'fields.f0.end' for sequence generator."));
+                       return;
+               }
+               Assert.fail("Should fail by ValidationException.");
+       }
+
+       private static DynamicTableSource createSource(TableSchema schema, 
Map<String, String> options) {
+               return FactoryUtil.createTableSource(
+                               null,
+                               ObjectIdentifier.of("", "", ""),
+                               new CatalogTableImpl(schema, options, ""),
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader());
+       }
 }

Reply via email to