This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new a51000822 Add blog article: Howto test a batch source with the new 
Source framework (#643)
a51000822 is described below

commit a510008228d41c0185b5b96e0ab872fe79850569
Author: Etienne Chauchot <[email protected]>
AuthorDate: Mon May 22 11:21:43 2023 +0200

    Add blog article: Howto test a batch source with the new Source framework 
(#643)
---
 .../posts/2023-05-12-howto-test-batch-source.md    | 214 +++++++++++++++++++++
 1 file changed, 214 insertions(+)

diff --git a/docs/content/posts/2023-05-12-howto-test-batch-source.md 
b/docs/content/posts/2023-05-12-howto-test-batch-source.md
new file mode 100644
index 000000000..e3ad09181
--- /dev/null
+++ b/docs/content/posts/2023-05-12-howto-test-batch-source.md
@@ -0,0 +1,214 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-05-12T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[Example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. To 
test serde
+we create an object, serialize it using the serializer and then deserialize it 
using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. It is composed of different parts gathered in a test suite:
+
+* [The Flink environment](#flink-environment)
+* [The backend environment](#backend-environment)
+* [The checkpointing semantics](#checkpointing-semantics)
+* [The test context](#test-context)
+
+[Example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For the test to be integrated to Flink CI, the test class must be called 
*ITCAse. But it can be called
+differently if the test belongs to somewhere else.
+The class extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with `@Disabled`:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink environment
+
+We add this annotated field to our ITCase and we're done
+
+```java
+@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+```
+
+### Backend environment
+[Example Cassandra 
TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java)
+
+To test the connector we need a backend to run the connector against. This 
TestEnvironment 
+provides everything related to the backend: the container, its configuration, 
the session to connect to it, 
+and all the elements bound to the whole test case (table space, initialization 
requests ...)  
+
+We add this annotated field to our ITCase
+
+```java
+@TestExternalSystem
+MyBackendTestEnvironment backendTestEnvironment = new 
MyBackendTestEnvironment();
+```
+
+To integrate with JUnit5 BackendTestEnvironment
+implements 
[TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html)
+. This environment is scoped to the test suite, so it is where we setup the 
backend and shared resources (session, tablespace, etc...) by
+implementing 
[startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--)
+and 
[tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--)
+methods. For
+that we advise the use of [testContainers](https://www.testcontainers.org/) 
that relies on docker
+images to provide a real backend
+instance (not a mock) that is representative for integration tests. Several 
backends are supported
+out of the box by testContainers. We need to configure test containers that 
way:
+
+* Redirect container output (error and standard output) to Flink logs
+* Set the different timeouts to cope with CI server load
+* Set retrial mechanisms for connection, initialization requests etc... for 
the same reason
+
+### Checkpointing semantics
+
+In big data execution engines, there are 2 levels of guarantee regarding 
source and sinks:
+
+* At least once: upon failure and recovery, some records may be reflected 
multiple times but none
+  will
+  be lost
+* Exactly once: upon failure and recovery, every record will be reflected 
exactly once
+
+By the following code we verify that the source supports exactly once 
semantics:
+
+```java
+@TestSemantics
+CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+```
+
+That being said, we could encounter a problem while running the tests : the 
default assertions in
+the Flink source test framework assume that the data is read in the same order 
it was written. This
+is untrue for most big data backends where ordering is usually not 
deterministic. To support
+unordered checks and still use all the framework provided tests, we need to 
override
+[SourceTestSuiteBase#checkResultWithSemantic](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html#checkResultWithSemantic-org.apache.flink.util.CloseableIterator-java.util.List-org.apache.flink.streaming.api.CheckpointingMode-java.lang.Integer-)
+in out ITCase:
+
+```java
+@Override
+protected void checkResultWithSemantic(
+  CloseableIterator<Pojo> resultIterator,
+  List<List<Pojo>> testData,
+  CheckpointingMode semantic,
+  Integer limit) {
+    if (limit != null) {
+      Runnable runnable =
+      () -> CollectIteratorAssertions.assertUnordered(resultIterator)
+        .withNumRecordsLimit(limit)
+        .matchesRecordsFromSource(testData, semantic);
+      
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+    } else {
+        CollectIteratorAssertions.assertUnordered(resultIterator)
+                .matchesRecordsFromSource(testData, semantic);
+    }
+}
+```
+
+This is a copy-paste of the parent method where 
_CollectIteratorAssertions.assertOrdered()_
+is
+replaced by _CollectIteratorAssertions.assertUnordered()_.
+
+### Test context
+[Example Cassandra 
TestContext](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java)
+
+The test context provides Flink with means to interact with the backend, like 
inserting test
+data, creating tables or constructing the source. It is scoped to the test 
case (and not to the test
+suite).
+
+It is linked to the ITCase through a factory of TestContext as shown below.
+
+```java
+@TestContext
+TestContextFactory contextFactory = new TestContextFactory(testEnvironment);
+```
+
+TestContext implements 
[DataStreamSourceExternalContext](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html):
+
+* We don't connect to the backend at each test case, so the shared resources 
such as session are
+  created by the backend test environment (test suite scoped). They are then 
passed to the test
+  context by constructor. It is also in the constructor that we initialize 
test case backend
+  resources such as test case table.
+* close() : drop the created test case resources
+* 
[getProducedType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.html#getProducedType--):
+  specify the test output type of the source such as a test Pojo for example
+* 
[getConnectorJarPaths()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalContext.html#getConnectorJarPaths--):
+  provide a list of attached jars. Most of the time, we can return an empty
+  list as maven already adds the jars to the test classpath
+* 
[createSource()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSource-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-):
+  here we create the source as a user would have done. It will be provided to 
the
+  test cases by the Flink test framework
+* 
[createSourceSplitDataWriter()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSourceSplitDataWriter-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-):
+  here we create
+  an 
[ExternalSystemSplitDataWriter](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalSystemSplitDataWriter.html)
+  responsible for
+  writing test data which comes as a list of produced type objects such as 
defined in
+  getProducedType()
+* 
[generateTestData()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#generateTestData-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-int-long-):
+  produce the list of test data that will be given to the
+  ExternalSystemSplitDataWriter. We must make sure that equals() returns false 
when 2 records of
+  this list belong to different splits. The easier for that is to include the 
split id into the
+  produced type and make sure equals() and hashcode() are properly implemented 
to include this
+  field.
+
+## Contributing the source to Flink
+
+Lately, the Flink community has externalized all the connectors to external 
repositories that are
+sub-repositories of the official Apache Flink repository. This is mainly to 
decouple the release of
+Flink to the release of the connectors. To distribute the created source, we 
need to
+follow [this official wiki 
page](https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development)
+.
+
+## Conclusion
+
+This concludes the series of articles about creating a batch source with the 
new Flink framework.
+This was needed as, apart from the javadocs, the documentation about testing 
is missing for now. I
+hope you enjoyed reading and I hope the Flink community will receive a source 
PR from you soon :) 
\ No newline at end of file

Reply via email to