echauchot commented on code in PR #643:
URL: https://github.com/apache/flink-web/pull/643#discussion_r1192167315


##########
docs/content/posts/howto-test-batch-source.md:
##########
@@ -0,0 +1,202 @@
+---
+title:  "Howto test a batch source with the new Source framework"
+date: "2023-04-14T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+
+---
+
+## Introduction
+
+The Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. This article is the
+continuation of
+the [howto create a batch source with the new Source framework 
article](https://flink.apache.org/2023/04/14/howto-create-batch-source/)
+. Now it is
+time to test the created source ! As the previous article, this one was built 
while implementing the
+[Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html).
+
+## Unit testing the source
+
+### Testing the serializers
+
+[example Cassandra 
SplitSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java)
+and 
[SplitEnumeratorStateSerializer](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java)
+
+In the previous article, we
+created 
[serializers](https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers)
+for Split and SplitEnumeratorState. We should now test them in unit tests. As 
usual, to test serde
+we just create an object, serialize it using the serializer and then 
deserialize it using the same
+serializer and finally assert on the equality of the two objects. Thus, 
hascode() and equals() need
+to be implemented for the serialized objects.
+
+### Other unit tests
+
+Of course, we also need to unit test low level processing such as query 
building for example or any
+processing that does not require a running backend.
+
+## Integration testing the source
+
+[example Cassandra SourceITCase
+](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java)
+
+For tests that require a running backend, Flink provides a JUnit5 source test 
framework. To use it
+we create an *ITCase named class that
+extends 
[SourceTestSuiteBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html)
+. This test suite provides all
+the necessary tests already (single split, multiple splits, idle reader, 
etc...). It is targeted for
+batch and streaming sources, so for our batch source case here, the tests 
below need to be disabled
+as they are targeted for streaming sources. They can be disabled by overriding 
them in the ITCase
+and annotating them with @Disabled:
+
+* testSourceMetrics
+* testSavepoint
+* testScaleUp
+* testScaleDown
+* testTaskManagerFailure
+
+Of course we can add our own integration tests cases for example tests on 
limits, tests on low level
+splitting or any test that requires a running backend. But for most cases we 
only need to provide
+Flink test environment classes to configure the ITCase:
+
+### Flink runtime environment
+
+We add this annotated field to our ITCase and we're done
+
+`@TestEnv
+MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+`
+
+### Backend runtime environment
+
+[example Cassandra 
TestEnvironment](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java)
+
+We add this annotated field to our ITCase
+
+`@TestExternalSystem
+BackendTestEnvironment backendTestEnvironment = new BackendTestEnvironment();
+`
+
+BackendTestEnvironment
+implements 
[TestResource](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html)
+. This environment is scoped to the test suite so it is where we setup the 
backend runtime and
+shared resources (session, tablespace, etc...) by
+implementing 
[startup()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--)
+and 
[tearDown()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--)
+methods. For
+that we advice the use of [testContainers](https://www.testcontainers.org/) 
that relies on docker
+images to provide a real backend
+instance (not a mock) that is representative for integration tests. Several 
backends are supported
+out of the box by testContainers. We need to configure test containers that 
way:
+
+* Redirect container output (error and standard output) to Flink logs
+* Set the different timeouts to cope with CI server load
+* Set retrial mechanisms for connection, initialization requests etc... for 
the same reason
+
+### Checkpointing semantics
+
+In big data execution engines, there are 2 levels of guaranty regarding source 
and sinks:
+
+* At least once: upon failure and recovery, some records may be reflected 
multiple times but none
+  will
+  be lost
+* Exactly once: upon failure and recovery, every record will be reflected 
exactly once
+  By the following code we verify that the source supports exactly once 
semantics:
+
+`@TestSemantics
+CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+`
+
+That being said, we could encounter a problem while running the tests : the 
default assertions in
+the Flink source test framework assume that the data is read in the same order 
it was written. This
+is untrue for most big data backends where ordering is usually not 
deterministic. To support
+unordered checks and still use all the framework provided tests, we need to 
override
+[SourceTestSuiteBase#checkResultWithSemantic](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html#checkResultWithSemantic-org.apache.flink.util.CloseableIterator-java.util.List-org.apache.flink.streaming.api.CheckpointingMode-java.lang.Integer-)
+in out ITCase:
+
+`@Override
+protected void checkResultWithSemantic(
+CloseableIterator<Pojo> resultIterator,
+List<List<Pojo>> testData,
+CheckpointingMode semantic,
+Integer limit) {
+if (limit != null) {
+Runnable runnable =
+() ->
+CollectIteratorAssertions.assertUnordered(resultIterator)
+.withNumRecordsLimit(limit)
+.matchesRecordsFromSource(testData, semantic);
+
+        
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+    } else {
+        CollectIteratorAssertions.assertUnordered(resultIterator)
+                .matchesRecordsFromSource(testData, semantic);
+    }
+
+}`
+
+This is simply a copy-paste of the parent method where 
_CollectIteratorAssertions.assertOrdered()_
+is
+replaced by _CollectIteratorAssertions.assertUnordered()_.
+
+### Test context
+
+[example Cassandra 
TestContext](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java)
+
+The test context is scoped to the test case. So it is where we do things like 
creating test table,
+creating the source or writing test data.

Review Comment:
   Agree with the rephrasing but I'd like also to keep the important notion of 
test case scope of the TestContext (compared to test suite scope of the backend 
environment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to