This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 267b863 [FLINK-23971][tests] fix connector testing framework error
when compare records in different splits
267b863 is described below
commit 267b863683b23b8b3df29bee55ac58a25ca1fcd0
Author: Hang Ruan <[email protected]>
AuthorDate: Tue Aug 31 15:53:28 2021 +0800
[FLINK-23971][tests] fix connector testing framework error when compare
records in different splits
Add split index parameter to generate test data, make sure T.equals(object)
return false when records come from differernt splits.
---
.../testutils/KafkaSingleTopicExternalContext.java | 8 ++++----
.../pulsar/testutils/PulsarTestContext.java | 4 ++--
.../cases/MultipleTopicConsumingContext.java | 4 ++--
.../cases/SingleTopicConsumingContext.java | 4 ++--
.../test/common/external/ExternalContext.java | 6 +++++-
.../test/common/testsuites/SourceTestSuiteBase.java | 21 ++++++++++++---------
6 files changed, 27 insertions(+), 20 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
index 81240cf..ad5e31d 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
@@ -155,7 +155,7 @@ public class KafkaSingleTopicExternalContext implements
ExternalContext<String>
}
@Override
- public Collection<String> generateTestData(long seed) {
+ public Collection<String> generateTestData(int splitIndex, long seed) {
Random random = new Random(seed);
List<String> randomStringRecords = new ArrayList<>();
int recordNum =
@@ -163,15 +163,15 @@ public class KafkaSingleTopicExternalContext implements
ExternalContext<String>
+ NUM_RECORDS_LOWER_BOUND;
for (int i = 0; i < recordNum; i++) {
int stringLength = random.nextInt(50) + 1;
- randomStringRecords.add(generateRandomString(stringLength,
random));
+ randomStringRecords.add(generateRandomString(splitIndex,
stringLength, random));
}
return randomStringRecords;
}
- private String generateRandomString(int length, Random random) {
+ private String generateRandomString(int splitIndex, int length, Random
random) {
String alphaNumericString =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" +
"0123456789";
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder().append(splitIndex).append("-");
for (int i = 0; i < length; ++i) {
sb.append(alphaNumericString.charAt(random.nextInt(alphaNumericString.length())));
}
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index 6733439..a80d721 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -43,7 +43,7 @@ public abstract class PulsarTestContext<T> implements
ExternalContext<T> {
// Helper methods for generating data.
- protected List<String> generateStringTestData(long seed) {
+ protected List<String> generateStringTestData(int splitIndex, long seed) {
Random random = new Random(seed);
int recordNum =
random.nextInt(NUM_RECORDS_UPPER_BOUND -
NUM_RECORDS_LOWER_BOUND)
@@ -52,7 +52,7 @@ public abstract class PulsarTestContext<T> implements
ExternalContext<T> {
for (int i = 0; i < recordNum; i++) {
int stringLength = random.nextInt(50) + 1;
- records.add(randomAlphanumeric(stringLength));
+ records.add(splitIndex + "-" + randomAlphanumeric(stringLength));
}
return records;
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 60a0bfba..7ce676c 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -98,8 +98,8 @@ public class MultipleTopicConsumingContext extends
PulsarTestContext<String> {
}
@Override
- public Collection<String> generateTestData(long seed) {
- return generateStringTestData(seed);
+ public Collection<String> generateTestData(int splitIndex, long seed) {
+ return generateStringTestData(splitIndex, seed);
}
@Override
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index 288ecf3..cb1b582 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -97,8 +97,8 @@ public class SingleTopicConsumingContext extends
PulsarTestContext<String> {
}
@Override
- public Collection<String> generateTestData(long seed) {
- return generateStringTestData(seed);
+ public Collection<String> generateTestData(int splitIndex, long seed) {
+ return generateStringTestData(splitIndex, seed);
}
@Override
diff --git
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
index dacc3c1..5ed06a9 100644
---
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
+++
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
@@ -59,10 +59,14 @@ public interface ExternalContext<T> extends Serializable,
AutoCloseable {
/**
* Generate test data.
*
+ * <p>Make sure that the {@link T#equals(Object)} returns false when the
records in different
+ * splits.
+ *
+ * @param splitIndex index of the split.
* @param seed Seed for generating random test data set.
* @return Collection of generated test data.
*/
- Collection<T> generateTestData(long seed);
+ Collection<T> generateTestData(int splitIndex, long seed);
/**
* Factory for {@link ExternalContext}.
diff --git
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
index 02e7a41..5aff6e6 100644
---
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
+++
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
@@ -41,7 +41,6 @@ import
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFacto
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
@@ -103,7 +102,7 @@ public abstract class SourceTestSuiteBase<T> {
throws Exception {
// Write test data to external system
- final Collection<T> testRecords =
generateAndWriteTestData(externalContext);
+ final Collection<T> testRecords = generateAndWriteTestData(0,
externalContext);
// Build and execute Flink job
StreamExecutionEnvironment execEnv =
testEnv.createExecutionEnvironment();
@@ -140,7 +139,7 @@ public abstract class SourceTestSuiteBase<T> {
final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
-
testRecordCollections.add(generateAndWriteTestData(externalContext));
+ testRecordCollections.add(generateAndWriteTestData(i,
externalContext));
}
LOG.debug("Build and execute Flink job");
@@ -174,14 +173,13 @@ public abstract class SourceTestSuiteBase<T> {
*/
@TestTemplate
@DisplayName("Test source with at least one idle parallelism")
- @Disabled
public void testIdleReader(TestEnvironment testEnv, ExternalContext<T>
externalContext)
throws Exception {
final int splitNumber = 4;
final List<Collection<T>> testRecordCollections = new ArrayList<>();
for (int i = 0; i < splitNumber; i++) {
-
testRecordCollections.add(generateAndWriteTestData(externalContext));
+ testRecordCollections.add(generateAndWriteTestData(i,
externalContext));
}
try (CloseableIterator<T> resultIterator =
@@ -216,9 +214,11 @@ public abstract class SourceTestSuiteBase<T> {
ExternalContext<T> externalContext,
ClusterControllable controller)
throws Exception {
+ int splitIndex = 0;
final Collection<T> testRecordsBeforeFailure =
-
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+ externalContext.generateTestData(
+ splitIndex, ThreadLocalRandom.current().nextLong());
final SourceSplitDataWriter<T> sourceSplitDataWriter =
externalContext.createSourceSplitDataWriter();
sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
@@ -267,7 +267,8 @@ public abstract class SourceTestSuiteBase<T> {
Deadline.fromNow(Duration.ofSeconds(30)));
final Collection<T> testRecordsAfterFailure =
-
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+ externalContext.generateTestData(
+ splitIndex, ThreadLocalRandom.current().nextLong());
sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
assertThat(
@@ -291,9 +292,11 @@ public abstract class SourceTestSuiteBase<T> {
* @param externalContext External context
* @return Collection of generated test records
*/
- protected Collection<T> generateAndWriteTestData(ExternalContext<T>
externalContext) {
+ protected Collection<T> generateAndWriteTestData(
+ int splitIndex, ExternalContext<T> externalContext) {
final Collection<T> testRecordCollection =
-
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+ externalContext.generateTestData(
+ splitIndex, ThreadLocalRandom.current().nextLong());
LOG.debug("Writing {} records to external system",
testRecordCollection.size());
externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
return testRecordCollection;