This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e3a0772 [FLINK-18207][FLINK-18185][table] Fix datagen connector
exactly-once bug and validation message
e3a0772 is described below
commit e3a0772a48fa8a416e139a84e322b2510cfd49b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 9 20:19:07 2020 +0800
[FLINK-18207][FLINK-18185][table] Fix datagen connector exactly-once bug
and validation message
This closes #12544
---
.../api/functions/StatefulSequenceSourceTest.java | 20 ++---
.../source/datagen/DataGeneratorSourceTest.java | 51 +++++++-----
flink-table/flink-table-api-java-bridge/pom.xml | 7 ++
.../table/factories/DataGenTableSourceFactory.java | 20 ++++-
.../factories/DataGenTableSourceFactoryTest.java | 97 ++++++++++++++++++++--
5 files changed, 152 insertions(+), 43 deletions(-)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index a9d5443..5fcb356 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -79,7 +79,7 @@ public class StatefulSequenceSourceTest {
@Override
public void run() {
try {
- source1.run(new
BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
+ source1.run(new
BlockingSourceContext<>("1", latchToTrigger1, latchToWait1, outputCollector,
21));
}
catch (Throwable t) {
t.printStackTrace();
@@ -93,7 +93,7 @@ public class StatefulSequenceSourceTest {
@Override
public void run() {
try {
- source2.run(new
BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
+ source2.run(new
BlockingSourceContext<>("2", latchToTrigger2, latchToWait2, outputCollector,
32));
}
catch (Throwable t) {
t.printStackTrace();
@@ -139,7 +139,7 @@ public class StatefulSequenceSourceTest {
@Override
public void run() {
try {
- source3.run(new
BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
+ source3.run(new
BlockingSourceContext<>("3", latchToTrigger3, latchToWait3, outputCollector,
3));
}
catch (Throwable t) {
t.printStackTrace();
@@ -186,22 +186,22 @@ public class StatefulSequenceSourceTest {
/**
* Test SourceContext.
*/
- public static class BlockingSourceContext implements
SourceFunction.SourceContext<Long> {
+ public static class BlockingSourceContext<T> implements
SourceFunction.SourceContext<T> {
private final String name;
private final Object lock;
private final OneShotLatch latchToTrigger;
private final OneShotLatch latchToWait;
- private final ConcurrentHashMap<String, List<Long>> collector;
+ private final ConcurrentHashMap<String, List<T>> collector;
private final int threshold;
private int counter = 0;
- private final List<Long> localOutput;
+ private final List<T> localOutput;
public BlockingSourceContext(String name, OneShotLatch
latchToTrigger, OneShotLatch latchToWait,
-
ConcurrentHashMap<String, List<Long>> output, int elemToFire) {
+
ConcurrentHashMap<String, List<T>> output, int elemToFire) {
this.name = name;
this.lock = new Object();
this.latchToTrigger = latchToTrigger;
@@ -210,19 +210,19 @@ public class StatefulSequenceSourceTest {
this.threshold = elemToFire;
this.localOutput = new ArrayList<>();
- List<Long> prev = collector.put(name, localOutput);
+ List<T> prev = collector.put(name, localOutput);
if (prev != null) {
Assert.fail();
}
}
@Override
- public void collectWithTimestamp(Long element, long timestamp) {
+ public void collectWithTimestamp(T element, long timestamp) {
collect(element);
}
@Override
- public void collect(Long element) {
+ public void collect(T element) {
localOutput.add(element);
if (++counter == threshold) {
latchToTrigger.trigger();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
index a59bb4a..b11fec0 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
/**
* Tests for {@link DataGeneratorSource}.
@@ -99,39 +100,44 @@ public class DataGeneratorSourceTest {
public void testSequenceCheckpointRestore() throws Exception {
final int initElement = 0;
final int maxElement = 100;
- final int maxParallelsim = 2;
-
final Set<Long> expectedOutput = new HashSet<>();
for (long i = initElement; i <= maxElement; i++) {
expectedOutput.add(i);
}
+ DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(
+ () -> new DataGeneratorSource<>(
+
SequenceGenerator.longGenerator(initElement, maxElement)),
+ expectedOutput);
+ }
- final ConcurrentHashMap<String, List<Long>> outputCollector =
new ConcurrentHashMap<>();
+ public static <T> void innerTestDataGenCheckpointRestore(
+ Supplier<DataGeneratorSource<T>> supplier,
+ Set<T> expectedOutput) throws Exception {
+ final int maxParallelsim = 2;
+ final ConcurrentHashMap<String, List<T>> outputCollector = new
ConcurrentHashMap<>();
final OneShotLatch latchToTrigger1 = new OneShotLatch();
final OneShotLatch latchToWait1 = new OneShotLatch();
final OneShotLatch latchToTrigger2 = new OneShotLatch();
final OneShotLatch latchToWait2 = new OneShotLatch();
- final DataGeneratorSource<Long> source1 = new
DataGeneratorSource<>(
- SequenceGenerator.longGenerator(initElement,
maxElement));
- StreamSource<Long, DataGeneratorSource<Long>> src1 = new
StreamSource<>(source1);
+ final DataGeneratorSource<T> source1 = supplier.get();
+ StreamSource<T, DataGeneratorSource<T>> src1 = new
StreamSource<>(source1);
- final AbstractStreamOperatorTestHarness<Long> testHarness1 =
+ final AbstractStreamOperatorTestHarness<T> testHarness1 =
new AbstractStreamOperatorTestHarness<>(src1,
maxParallelsim, 2, 0);
testHarness1.open();
- final DataGeneratorSource<Long> source2 = new
DataGeneratorSource<>(
- SequenceGenerator.longGenerator(initElement,
maxElement));
- StreamSource<Long, DataGeneratorSource<Long>> src2 = new
StreamSource<>(source2);
+ final DataGeneratorSource<T> source2 = supplier.get();
+ StreamSource<T, DataGeneratorSource<T>> src2 = new
StreamSource<>(source2);
- final AbstractStreamOperatorTestHarness<Long> testHarness2 =
+ final AbstractStreamOperatorTestHarness<T> testHarness2 =
new AbstractStreamOperatorTestHarness<>(src2,
maxParallelsim, 2, 1);
testHarness2.open();
// run the source asynchronously
Thread runner1 = new Thread(() -> {
try {
- source1.run(new BlockingSourceContext(
+ source1.run(new BlockingSourceContext<>(
"1", latchToTrigger1,
latchToWait1, outputCollector, 21));
} catch (Throwable t) {
t.printStackTrace();
@@ -141,7 +147,7 @@ public class DataGeneratorSourceTest {
// run the source asynchronously
Thread runner2 = new Thread(() -> {
try {
- source2.run(new BlockingSourceContext(
+ source2.run(new BlockingSourceContext<>(
"2", latchToTrigger2,
latchToWait2, outputCollector, 32));
}
catch (Throwable t) {
@@ -165,15 +171,14 @@ public class DataGeneratorSourceTest {
testHarness2.snapshot(0L, 0L)
);
- final DataGeneratorSource<Long> source3 = new
DataGeneratorSource<>(
- SequenceGenerator.longGenerator(initElement,
maxElement));
- StreamSource<Long, DataGeneratorSource<Long>> src3 = new
StreamSource<>(source3);
+ final DataGeneratorSource<T> source3 = supplier.get();
+ StreamSource<T, DataGeneratorSource<T>> src3 = new
StreamSource<>(source3);
final OperatorSubtaskState initState =
AbstractStreamOperatorTestHarness.repartitionOperatorState(
snapshot, maxParallelsim, 2, 1,
0);
- final AbstractStreamOperatorTestHarness<Long> testHarness3 =
+ final AbstractStreamOperatorTestHarness<T> testHarness3 =
new AbstractStreamOperatorTestHarness<>(src3,
maxParallelsim, 1, 0);
testHarness3.setup();
testHarness3.initializeState(initState);
@@ -186,7 +191,7 @@ public class DataGeneratorSourceTest {
// run the source asynchronously
Thread runner3 = new Thread(() -> {
try {
- source3.run(new BlockingSourceContext(
+ source3.run(new BlockingSourceContext<>(
"3", latchToTrigger3,
latchToWait3, outputCollector, 3));
}
catch (Throwable t) {
@@ -199,15 +204,15 @@ public class DataGeneratorSourceTest {
Assert.assertEquals(3, outputCollector.size()); // we have 3
tasks.
// test for at-most-once
- Set<Long> dedupRes = new HashSet<>(Math.abs(maxElement -
initElement) + 1);
- for (Map.Entry<String, List<Long>> elementsPerTask:
outputCollector.entrySet()) {
+ Set<T> dedupRes = new HashSet<>(expectedOutput.size());
+ for (Map.Entry<String, List<T>> elementsPerTask:
outputCollector.entrySet()) {
String key = elementsPerTask.getKey();
- List<Long> elements = outputCollector.get(key);
+ List<T> elements = outputCollector.get(key);
// this tests the correctness of the latches in the test
Assert.assertTrue(elements.size() > 0);
- for (Long elem : elements) {
+ for (T elem : elements) {
if (!dedupRes.add(elem)) {
Assert.fail("Duplicate entry: " + elem);
}
@@ -219,7 +224,7 @@ public class DataGeneratorSourceTest {
}
// test for exactly-once
- Assert.assertEquals(Math.abs(initElement - maxElement) + 1,
dedupRes.size());
+ Assert.assertEquals(expectedOutput.size(), dedupRes.size());
latchToWait1.trigger();
latchToWait2.trigger();
diff --git a/flink-table/flink-table-api-java-bridge/pom.xml
b/flink-table/flink-table-api-java-bridge/pom.xml
index 3d79cf2..f8f4302 100644
--- a/flink-table/flink-table-api-java-bridge/pom.xml
+++ b/flink-table/flink-table-api-java-bridge/pom.xml
@@ -53,6 +53,8 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <!-- test dependencies -->
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
@@ -80,5 +82,10 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 27a517b..b2c3c37 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -26,6 +26,7 @@ import
org.apache.flink.configuration.ConfigOptions.OptionBuilder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
@@ -172,13 +173,17 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
}
private DataGenerator createSequenceGenerator(String name, DataType
type, ReadableConfig options) {
- OptionBuilder startKey = key(FIELDS + "." + name + "." + START);
- OptionBuilder endKey = key(FIELDS + "." + name + "." + END);
+ String startKeyStr = FIELDS + "." + name + "." + START;
+ String endKeyStr = FIELDS + "." + name + "." + END;
+ OptionBuilder startKey = key(startKeyStr);
+ OptionBuilder endKey = key(endKeyStr);
options.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
- () -> new ValidationException("Could not find
required property '" + startKey + "'."));
+ () -> new ValidationException(
+ "Could not find required
property '" + startKeyStr + "' for sequence generator."));
options.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
- () -> new ValidationException("Could not find
required property '" + endKey + "'."));
+ () -> new ValidationException(
+ "Could not find required
property '" + endKeyStr + "' for sequence generator."));
switch (type.getLogicalType().getTypeRoot()) {
case CHAR:
@@ -291,6 +296,13 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
}
@Override
+ public void snapshotState(FunctionSnapshotContext context)
throws Exception {
+ for (DataGenerator generator : fieldGenerators) {
+ generator.snapshotState(context);
+ }
+ }
+
+ @Override
public boolean hasNext() {
for (DataGenerator generator : fieldGenerators) {
if (!generator.hasNext()) {
diff --git
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 827e698..db911e3 100644
---
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -21,14 +21,17 @@ package org.apache.flink.table.factories;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSourceTest;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import
org.apache.flink.table.factories.DataGenTableSourceFactory.DataGenTableSource;
@@ -37,8 +40,12 @@ import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
import static
org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
@@ -80,12 +87,8 @@ public class DataGenTableSourceFactoryTest {
descriptor.putLong(FIELDS + ".f2." + START, 50);
descriptor.putLong(FIELDS + ".f2." + END, 60);
- DynamicTableSource source = FactoryUtil.createTableSource(
- null,
- ObjectIdentifier.of("", "", ""),
- new CatalogTableImpl(TEST_SCHEMA,
descriptor.asMap(), ""),
- new Configuration(),
- Thread.currentThread().getContextClassLoader());
+ DynamicTableSource source = createSource(
+ TEST_SCHEMA, descriptor.asMap());
assertTrue(source instanceof DataGenTableSource);
@@ -142,4 +145,86 @@ public class DataGenTableSourceFactoryTest {
Assert.assertEquals(i + 50, row.getLong(2));
}
}
+
+ @Test
+ public void testSequenceCheckpointRestore() throws Exception {
+ DescriptorProperties descriptor = new DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+ descriptor.putLong(FIELDS + ".f0." + START, 0);
+ descriptor.putLong(FIELDS + ".f0." + END, 100);
+
+ DynamicTableSource dynamicTableSource = createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+
+ DataGenTableSource dataGenTableSource = (DataGenTableSource)
dynamicTableSource;
+ DataGeneratorSource<RowData> source =
dataGenTableSource.createSource();
+
+ final int initElement = 0;
+ final int maxElement = 100;
+ final Set<RowData> expectedOutput = new HashSet<>();
+ for (long i = initElement; i <= maxElement; i++) {
+ expectedOutput.add(GenericRowData.of(i));
+ }
+ DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(
+ () -> {
+ try {
+ return
InstantiationUtil.clone(source);
+ } catch (IOException |
ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }, expectedOutput);
+ }
+
+ @Test
+ public void testLackStartForSequence() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+ descriptor.putLong(FIELDS + ".f0." + END, 100);
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof ValidationException);
+ Assert.assertTrue(cause.getMessage().contains(
+ "Could not find required property
'fields.f0.start' for sequence generator."));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
+ @Test
+ public void testLackEndForSequence() {
+ try {
+ DescriptorProperties descriptor = new
DescriptorProperties();
+ descriptor.putString(FactoryUtil.CONNECTOR.key(),
"datagen");
+ descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
+ descriptor.putLong(FIELDS + ".f0." + START, 0);
+
+ createSource(
+ TableSchema.builder().field("f0",
DataTypes.BIGINT()).build(),
+ descriptor.asMap());
+ } catch (ValidationException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof ValidationException);
+ Assert.assertTrue(cause.getMessage().contains(
+ "Could not find required property
'fields.f0.end' for sequence generator."));
+ return;
+ }
+ Assert.fail("Should fail by ValidationException.");
+ }
+
+ private static DynamicTableSource createSource(TableSchema schema,
Map<String, String> options) {
+ return FactoryUtil.createTableSource(
+ null,
+ ObjectIdentifier.of("", "", ""),
+ new CatalogTableImpl(schema, options, ""),
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader());
+ }
}