echauchot commented on code in PR #641:
URL: https://github.com/apache/flink-web/pull/641#discussion_r1183597414


##########
docs/content/posts/2023-04-13-howto-create-batch-source.md:
##########
@@ -0,0 +1,221 @@
+---
+title:  "Howto create a batch source with the new Source framework"
+date: "2023-04-13T08:00:00.000Z"
+authors:
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+aliases:
+- /2023/04/13/howto-create-batch-source.html
+---
+
+## Introduction
+
+Flink community has
+designed [a new Source 
framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/)
+based
+on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+lately. Some connectors have migrated to this new framework. This article is 
an howto create a batch
+source using this new framework. It was built while implementing
+the [Flink batch 
source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca)
+for [Cassandra](https://cassandra.apache.org/_/index.html). I felt it could be 
useful to people
+interested in contributing or migrating connectors.
+
+## Implementing the source components
+
+The aim here is not to duplicate the official documentation. For details you 
should read the
+documentation, the javadocs or the Cassandra connector code. The links are 
above. The goal here is
+to give field feedback on how to implement the different components.
+
+The source architecture is depicted in the diagrams below:
+
+![](/img/blog/2023-04-13-howto-create-batch-source/source_components.svg)
+
+![](/img/blog/2023-04-13-howto-create-batch-source/source_reader.svg)
+
+### Source
+[example Cassandra 
Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java)
+
+The source interface only does the "glue" between all the other components. 
Its role is to
+instantiate all of them and to define the source 
[Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html).
 We also do the source configuration
+here along with user configuration validation.
+
+### SourceReader
+[example Cassandra 
SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java)
+
+As shown in the graphic above, the instances of the 
[SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html)
 (which we will call simply readers
+in the continuation of this article) run in parallel in task managers to read 
the actual data which
+is divided into [Splits](#split-and-splitstate). Readers request splits from 
the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the 
resulting splits are
+assigned to them in return.
+
+Luckily for us Flink provides the 
[SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html)
 implementation that takes care of the
+synchronization between the readers and the main thread. Flink also provides a 
useful extension to
+this class for most cases: 
[SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html).
 This class avoids having to
+specify the threading model as it is already configured to read splits with 
one thread using one
+[SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html).
 This is not harmful to the performance of the source as the SourceReaders run 
in
+parallel among the task managers.
+
+What we have left to do in the SourceReader class is:
+* Provide a [SplitReader](#splitreader) supplier
+* Create a [RecordEmitter](#recordemitter)
+* Create the shared resources for the SplitReaders (sessions, etc...). As the 
SplitReader supplier is
+created in the SourceReader constructor in a super() call, using a 
SourceReader factory to create
+the shared resources and pass them to the supplier seems a good idea.
+* Implement 
[start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--):
 here we should ask the enumerator for our first split
+* Override 
[close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--)
 in SourceReaderBase parent class to free up any created resources (the shared
+resources for example)
+* Implement 
[initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-)
 to create a mutable [SplitState](#split-and-splitstate) from a Split
+* Implement 
[toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-)
 to create a Split from the mutable SplitState
+* Implement 
[onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-):
 here, as it is a batch source (finite data), we should ask the
+Enumerator for next split
+
+### Split and SplitState
+[example Cassandra 
Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java)
+
+The 
[SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html)
 represents a partition of the source data. What defines a split depends on the
+backend we are reading from. It could be a _(partition start, partition end)_ 
tuple or an _(offset,
+split size)_ tuple for example.
+
+In any case, the Split object should be seen as an immutable object: any 
update to it should be done
+on the associated 
[SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html).
 The split state is the one that will be stored inside the Flink
+[checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing).
 A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a 
split, we
+must store in the split state the current state of the reading process. This 
current state needs to
+be something serializable (because it will be part of a checkpoint) and 
something that the backend
+source can resume from. That way, in case of failover, the reading could be 
resumed from where it
+was left off. Thus we ensure there will be no duplicates or lost data. For 
example, if the records
+reading order is deterministic in the backend, then the split state can store 
the number _n_ of
+already read records to restart at _n+1_ after failover. 
+
+### SplitEnumerator and SplitEnumeratorState
+[example Cassandra 
SplitEnumerator](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java)
 and 
[SplitEnumeratorState](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java)
+
+The 
[SplitEnumerator](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html)
 is responsible for creating the splits and serving them to the readers. 
Whenever
+possible, it is preferable to generate the splits lazily, meaning that each 
time a reader asks the
+enumerator for a split, the enumerator generates one on demand and assigns it 
to the reader. For
+that we implement 
[SplitEnumerator#handleSplitRequest()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-).
 Lazy splits generation is preferable to
+splits discovery, in which we pre-generate all the splits and store them 
waiting to assign them to
+the readers. Indeed, in some situations, the number of splits can be enormous 
at consume a lot a
+memory which could be problematic in case of straggling readers. The framework 
offers the ability to
+act upon reader registration by implementing 
[addReader()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addReader-int-)
 but, as we do lazy splits generation, we
+have nothing to do there. In some case, generating a split is too costly, so 
we can pre-generate a
+batch (not all) of splits to amortize this cost. the number/size of batched 
splits need to be taken
+into account to avoid consuming too much memory.
+
+Long story short, the tricky part of the source implementation is splitting 
the source data. The
+good equilibrium to find is not to have to many splits (which could lead to 
too much memory
+consumption) nor too few (which could lead to sub-optimal parallelism). One 
good way to meet this
+equilibrium is to evaluate the size of the source data upfront and allow the 
user to specify the
+maximum memory a split will take. That way he can configure this parameter 
accordingly to the memory
+available on the task managers. Of course, the size of the source data needs 
to be evaluated without
+reading the actual data. For the Cassandra connector it was done [like 
this](https://echauchot.blogspot.com/2023/03/cassandra-evaluate-table-size-without.html).
 The general rule of thumb
+regarding splitting is to let the user some freedom but protect him from 
unwanted behavior: if he
+wants to process a 1GB table in 1MB splits with 1 subtask he should be free to 
do so. But, as we
+allow the user to set a max split size, we should still ensure it is not too 
small to avoid having
+too many splits, we must also provide a default maximum. For these safety 
measures, rigid thresholds
+don't work well as the source my start to fail when the thresholds are 
suddenly exceeded.
+
+Another important topic is state. If the job manager fails, the split 
enumerator needs to recover.
+For that, as for the split, we need to provide a state for the enumerator 
(that will be part of a
+checkpoint) and return it when 
[SplitEnumerator#snapshotState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#snapshotState-long-)
 is called. Here also the
+SplitEnumerator is to be considered immutable. Any update to the state of the 
SplitEnumerator, must
+be done in the associated 
[SplitEnumeratorState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html).
 So, we don't create the state when snapshotState()
+is called but rather in the enumerator's constructor and return it in 
snapshotState(). The state

Review Comment:
   ah, just saw you approved the PR, so I guess this is resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to