zentol commented on code in PR #641: URL: https://github.com/apache/flink-web/pull/641#discussion_r1183386313
########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. Review Comment: ```suggestion If you are interested in contributing or migrating connectors, this blog post is for you! ``` ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) Review Comment: ```suggestion [Example: Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) ``` The lower-case "example" looks odd. ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers Review Comment: ```suggestion the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) (which we will call simply readers ``` This caused empty space after the first brace. ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers +in the continuation of this article) run in parallel in task managers to read the actual data which +is divided into [Splits](#split-and-splitstate). Readers request splits from +the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the resulting splits are +assigned to them in return. + +Flink provides +the [SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +implementation that takes care of all the threading. Flink also provides a useful extension to +this class for most +cases: [SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html) +. This class has the threading model already configured: +each [SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html) +instance reads splits using one thread (but there are several SplitReader instances that live among +task +managers). + +What we have left to do in the SourceReader class is: + +* Provide a [SplitReader](#splitreader) supplier +* Create a [RecordEmitter](#recordemitter) +* Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier + is + created in the SourceReader constructor in a super() call, using a SourceReader factory to create + the shared resources and pass them to the supplier is a good idea. +* Implement [start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--): +here we should ask the enumerator for our first split +* Override [close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--) +in SourceReaderBase parent class to free up any created resources (the shared +resources for example) +* Implement [initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-) +to create a mutable [SplitState](#split-and-splitstate) from a Split +* Implement [toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-) +to create a Split from the mutable SplitState +* Implement [onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-): +here, as it is a batch source (finite data), we should ask the +Enumerator for next split + +### Split and SplitState + +[example Cassandra Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java) + +The [SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html) +represents a partition of the source data. What defines a split depends on the +backend we are reading from. It could be a _(partition start, partition end)_ tuple or an _(offset, +split size)_ tuple for example. + +In any case, the Split object should be seen as an immutable object: any update to it should be done +on the +associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +. The split state is the one that will be stored inside the Flink +[checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) +. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we +must store in the split state the current state of the reading process. This current state needs to +be something serializable (because it will be part of a checkpoint) and something that the backend +source can resume from. That way, in case of failover, the reading could be resumed from where it +was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records +reading order is deterministic in the backend, then the split state can store the number _n_ of +already read records to restart at _n+1_ after failover. + +### SplitEnumerator and SplitEnumeratorState + +[example Cassandra SplitEnumerator](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java) +and [SplitEnumeratorState](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java) + +The [SplitEnumerator](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html) +is responsible for creating the splits and serving them to the readers. Whenever +possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the +enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For +that we +implement [SplitEnumerator#handleSplitRequest()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-) +. Lazy splits generation is preferable to +splits discovery, in which we pre-generate all the splits and store them waiting to assign them to +the readers. Indeed, in some situations, the number of splits can be enormous at consume a lot a +memory which could be problematic in case of straggling readers. The framework offers the ability to +act upon reader registration by +implementing [addReader()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addReader-int-) +but, as we do lazy splits generation, we +have nothing to do there. In some case, generating a split is too costly, so we can pre-generate a Review Comment: ```suggestion have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a ``` ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers +in the continuation of this article) run in parallel in task managers to read the actual data which +is divided into [Splits](#split-and-splitstate). Readers request splits from +the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the resulting splits are +assigned to them in return. + +Flink provides +the [SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +implementation that takes care of all the threading. Flink also provides a useful extension to +this class for most +cases: [SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html) +. This class has the threading model already configured: +each [SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html) +instance reads splits using one thread (but there are several SplitReader instances that live among +task +managers). + +What we have left to do in the SourceReader class is: + +* Provide a [SplitReader](#splitreader) supplier +* Create a [RecordEmitter](#recordemitter) +* Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier + is + created in the SourceReader constructor in a super() call, using a SourceReader factory to create + the shared resources and pass them to the supplier is a good idea. +* Implement [start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--): +here we should ask the enumerator for our first split +* Override [close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--) +in SourceReaderBase parent class to free up any created resources (the shared +resources for example) +* Implement [initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-) +to create a mutable [SplitState](#split-and-splitstate) from a Split +* Implement [toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-) +to create a Split from the mutable SplitState +* Implement [onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-): +here, as it is a batch source (finite data), we should ask the +Enumerator for next split + +### Split and SplitState + +[example Cassandra Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java) + +The [SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html) +represents a partition of the source data. What defines a split depends on the +backend we are reading from. It could be a _(partition start, partition end)_ tuple or an _(offset, +split size)_ tuple for example. + +In any case, the Split object should be seen as an immutable object: any update to it should be done +on the +associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +. The split state is the one that will be stored inside the Flink +[checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) +. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we +must store in the split state the current state of the reading process. This current state needs to +be something serializable (because it will be part of a checkpoint) and something that the backend +source can resume from. That way, in case of failover, the reading could be resumed from where it +was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records +reading order is deterministic in the backend, then the split state can store the number _n_ of +already read records to restart at _n+1_ after failover. + +### SplitEnumerator and SplitEnumeratorState + +[example Cassandra SplitEnumerator](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java) +and [SplitEnumeratorState](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java) + +The [SplitEnumerator](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html) +is responsible for creating the splits and serving them to the readers. Whenever +possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the +enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For +that we +implement [SplitEnumerator#handleSplitRequest()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-) +. Lazy splits generation is preferable to +splits discovery, in which we pre-generate all the splits and store them waiting to assign them to +the readers. Indeed, in some situations, the number of splits can be enormous at consume a lot a Review Comment: ```suggestion the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a ``` ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,221 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" +aliases: +- /2023/04/13/howto-create-batch-source.html +--- + +## Introduction + +Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is an howto create a batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). I felt it could be useful to people +interested in contributing or migrating connectors. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. Review Comment: like "don't read the article". I think I'd just remove the entire paragraph? ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers +in the continuation of this article) run in parallel in task managers to read the actual data which +is divided into [Splits](#split-and-splitstate). Readers request splits from +the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the resulting splits are +assigned to them in return. + +Flink provides +the [SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +implementation that takes care of all the threading. Flink also provides a useful extension to +this class for most +cases: [SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html) +. This class has the threading model already configured: +each [SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html) +instance reads splits using one thread (but there are several SplitReader instances that live among +task +managers). + +What we have left to do in the SourceReader class is: + +* Provide a [SplitReader](#splitreader) supplier +* Create a [RecordEmitter](#recordemitter) +* Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier + is + created in the SourceReader constructor in a super() call, using a SourceReader factory to create + the shared resources and pass them to the supplier is a good idea. +* Implement [start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--): +here we should ask the enumerator for our first split +* Override [close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--) +in SourceReaderBase parent class to free up any created resources (the shared +resources for example) +* Implement [initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-) +to create a mutable [SplitState](#split-and-splitstate) from a Split +* Implement [toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-) +to create a Split from the mutable SplitState +* Implement [onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-): +here, as it is a batch source (finite data), we should ask the +Enumerator for next split + +### Split and SplitState + +[example Cassandra Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java) + +The [SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html) +represents a partition of the source data. What defines a split depends on the +backend we are reading from. It could be a _(partition start, partition end)_ tuple or an _(offset, +split size)_ tuple for example. + +In any case, the Split object should be seen as an immutable object: any update to it should be done +on the +associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +. The split state is the one that will be stored inside the Flink +[checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) +. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we +must store in the split state the current state of the reading process. This current state needs to +be something serializable (because it will be part of a checkpoint) and something that the backend +source can resume from. That way, in case of failover, the reading could be resumed from where it +was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records +reading order is deterministic in the backend, then the split state can store the number _n_ of +already read records to restart at _n+1_ after failover. Review Comment: ```suggestion was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records reading order is deterministic in the backend, then the split state can store the number _n_ of already read records to restart at _n+1_ after failover. ``` whitespace again ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers +in the continuation of this article) run in parallel in task managers to read the actual data which +is divided into [Splits](#split-and-splitstate). Readers request splits from +the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the resulting splits are +assigned to them in return. + +Flink provides +the [SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +implementation that takes care of all the threading. Flink also provides a useful extension to +this class for most +cases: [SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html) +. This class has the threading model already configured: +each [SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html) +instance reads splits using one thread (but there are several SplitReader instances that live among +task +managers). + +What we have left to do in the SourceReader class is: + +* Provide a [SplitReader](#splitreader) supplier +* Create a [RecordEmitter](#recordemitter) +* Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier + is + created in the SourceReader constructor in a super() call, using a SourceReader factory to create + the shared resources and pass them to the supplier is a good idea. +* Implement [start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--): +here we should ask the enumerator for our first split +* Override [close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--) +in SourceReaderBase parent class to free up any created resources (the shared +resources for example) +* Implement [initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-) +to create a mutable [SplitState](#split-and-splitstate) from a Split +* Implement [toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-) +to create a Split from the mutable SplitState +* Implement [onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-): +here, as it is a batch source (finite data), we should ask the +Enumerator for next split + +### Split and SplitState + +[example Cassandra Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java) + +The [SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html) +represents a partition of the source data. What defines a split depends on the +backend we are reading from. It could be a _(partition start, partition end)_ tuple or an _(offset, +split size)_ tuple for example. + +In any case, the Split object should be seen as an immutable object: any update to it should be done +on the +associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +. The split state is the one that will be stored inside the Flink Review Comment: ```suggestion associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html). The split state is the one that will be stored inside the Flink ``` ########## docs/content/posts/2023-04-13-howto-create-batch-source.md: ########## @@ -0,0 +1,280 @@ +--- +title: "Howto create a batch source with the new Source framework" +date: "2023-04-13T08:00:00.000Z" +authors: + +- echauchot: + name: "Etienne Chauchot" + twitter: "echauchot" + +--- + +## Introduction + +The Flink community has +designed [a new Source framework](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/) +based +on [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +lately. Some connectors have migrated to this new framework. This article is a how-to for creating a +batch +source using this new framework. It was built while implementing +the [Flink batch source](https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca) +for [Cassandra](https://cassandra.apache.org/_/index.html). +If you are interested in contributing or migrating connectors, this blog post is for you!. + +## Implementing the source components + +The aim here is not to duplicate the official documentation. For details you should read the +documentation, the javadocs or the Cassandra connector code. The links are above. The goal here is +to give field feedback on how to implement the different components. + +The source architecture is depicted in the diagrams below: + + + + + +### Source + +[example Cassandra Source](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java) + +The source interface only does the "glue" between all the other components. Its role is to +instantiate all of them and to define the +source [Boundedness](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html) +. We also do the source configuration +here along with user configuration validation. + +### SourceReader + +[example Cassandra SourceReader](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java) + +As shown in the graphic above, the instances of +the [SourceReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html) ( +which we will call simply readers +in the continuation of this article) run in parallel in task managers to read the actual data which +is divided into [Splits](#split-and-splitstate). Readers request splits from +the [SplitEnumerator](#splitenumerator-and-splitenumeratorstate) and the resulting splits are +assigned to them in return. + +Flink provides +the [SourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +implementation that takes care of all the threading. Flink also provides a useful extension to +this class for most +cases: [SingleThreadMultiplexSourceReaderBase](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html) +. This class has the threading model already configured: +each [SplitReader](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html) +instance reads splits using one thread (but there are several SplitReader instances that live among +task +managers). + +What we have left to do in the SourceReader class is: + +* Provide a [SplitReader](#splitreader) supplier +* Create a [RecordEmitter](#recordemitter) +* Create the shared resources for the SplitReaders (sessions, etc...). As the SplitReader supplier + is + created in the SourceReader constructor in a super() call, using a SourceReader factory to create + the shared resources and pass them to the supplier is a good idea. +* Implement [start()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--): +here we should ask the enumerator for our first split +* Override [close()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--) +in SourceReaderBase parent class to free up any created resources (the shared +resources for example) +* Implement [initializedState()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-) +to create a mutable [SplitState](#split-and-splitstate) from a Split +* Implement [toSplitType()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-) +to create a Split from the mutable SplitState +* Implement [onSplitFinished()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-): +here, as it is a batch source (finite data), we should ask the +Enumerator for next split + +### Split and SplitState + +[example Cassandra Split](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java) + +The [SourceSplit](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html) +represents a partition of the source data. What defines a split depends on the +backend we are reading from. It could be a _(partition start, partition end)_ tuple or an _(offset, +split size)_ tuple for example. + +In any case, the Split object should be seen as an immutable object: any update to it should be done +on the +associated [SplitState](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html) +. The split state is the one that will be stored inside the Flink +[checkpoints](https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing) +. A checkpoint may happen between 2 fetches for 1 split. So, if we're reading a split, we +must store in the split state the current state of the reading process. This current state needs to +be something serializable (because it will be part of a checkpoint) and something that the backend +source can resume from. That way, in case of failover, the reading could be resumed from where it +was left off. Thus we ensure there will be no duplicates or lost data. For example, if the records +reading order is deterministic in the backend, then the split state can store the number _n_ of +already read records to restart at _n+1_ after failover. + +### SplitEnumerator and SplitEnumeratorState + +[example Cassandra SplitEnumerator](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java) +and [SplitEnumeratorState](https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java) + +The [SplitEnumerator](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html) +is responsible for creating the splits and serving them to the readers. Whenever +possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the +enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For +that we +implement [SplitEnumerator#handleSplitRequest()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-) +. Lazy splits generation is preferable to +splits discovery, in which we pre-generate all the splits and store them waiting to assign them to +the readers. Indeed, in some situations, the number of splits can be enormous at consume a lot a +memory which could be problematic in case of straggling readers. The framework offers the ability to +act upon reader registration by +implementing [addReader()](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addReader-int-) +but, as we do lazy splits generation, we +have nothing to do there. In some case, generating a split is too costly, so we can pre-generate a +batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken +into account to avoid consuming too much memory. + +Long story short, the tricky part of the source implementation is splitting the source data. The +good equilibrium to find is not to have too many splits (which could lead to too much memory +consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this +equilibrium is to evaluate the size of the source data upfront and allow the user to specify the +maximum memory a split will take. That way they can configure this parameter accordingly to the +memory +available on the task managers. This parameter is optional so the source needs to provide a default +value. Also, the source needs to control that the user provided max-split-size is not too little +which would +lead to too many splits. The general rule of thumb is to let the user some freedom but protect him +from unwanted behavior. +For these safety measures, rigid thresholds +don't work well as the source may start to fail when the thresholds are suddenly exceeded. For +example if we enforce the number of splits is below twice the parallelism, if Review Comment: ```suggestion don't work well as the source may start to fail when the thresholds are suddenly exceeded. For example if we enforce that the number of splits is below twice the parallelism, if ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
