This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 26112ba [FLINK-32014][doc][hotfix] Add Cassandra source documentation
to website (#15)
26112ba is described below
commit 26112bab974011b58198449d00f7aa563329fd72
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu May 11 17:33:36 2023 +0200
[FLINK-32014][doc][hotfix] Add Cassandra source documentation to website
(#15)
---
.../docs/connectors/datastream/cassandra.md | 33 ++++++++++++++++++++++
.../docs/connectors/datastream/cassandra.md | 33 ++++++++++++++++++++++
2 files changed, 66 insertions(+)
diff --git a/docs/content.zh/docs/connectors/datastream/cassandra.md
b/docs/content.zh/docs/connectors/datastream/cassandra.md
index 4969027..a3aca81 100644
--- a/docs/content.zh/docs/connectors/datastream/cassandra.md
+++ b/docs/content.zh/docs/connectors/datastream/cassandra.md
@@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on
local machine:
1. Follow the instructions from [Cassandra Getting Started
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
2. Launch a container running Cassandra from [Official Docker
Repository](https://hub.docker.com/_/cassandra/)
+## Cassandra Source
+Flink provides a
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+bounded source to read from Cassandra and return a collection of entities as
`DataStream<Entity>`.
+An entity is built by Cassandra mapper
([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
+based on a POJO containing annotations (as described in Cassandra object
mapper).
+
+To use the source, do the following:
+
+```java
+ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPointsWithPorts(new
InetSocketAddress(HOST,PORT))
+ .withQueryOptions(new
QueryOptions().setConsistencyLevel(CL))
+ .withSocketOptions(new SocketOptions()
+ .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ .setReadTimeoutMillis(READ_TIMEOUT))
+ .build();
+ }
+};
+long maxSplitMemorySize = ... //optional max split size in bytes minimum is
10MB. If not set, maxSplitMemorySize = 64 MB
+Source cassandraSource = new CassandraSource(clusterBuilder,
+ maxSplitMemorySize,
+ Pojo.class,
+ "select ... from KEYSPACE.TABLE
...;",
+ () -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)});
+DataStream<Pojo> stream = env.fromSource(cassandraSource,
WatermarkStrategy.noWatermarks(), "CassandraSource");
+```
+Regarding performances, the source splits table data like this:
+`numSplits = tableSize/maxSplitMemorySize`.
+
+If tableSize cannot be determined or previous numSplits computation makes too
few splits, it falls back to `numSplits = parallelism`
+
## Cassandra Sinks
### Configurations
diff --git a/docs/content/docs/connectors/datastream/cassandra.md
b/docs/content/docs/connectors/datastream/cassandra.md
index 25b7381..a4a1a92 100644
--- a/docs/content/docs/connectors/datastream/cassandra.md
+++ b/docs/content/docs/connectors/datastream/cassandra.md
@@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on
local machine:
1. Follow the instructions from [Cassandra Getting Started
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
2. Launch a container running Cassandra from [Official Docker
Repository](https://hub.docker.com/_/cassandra/)
+## Cassandra Source
+Flink provides a
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+bounded source to read from Cassandra and return a collection of entities as
`DataStream<Entity>`.
+An entity is built by Cassandra mapper
([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
+based on a POJO containing annotations (as described in Cassandra object
mapper).
+
+To use the source, do the following:
+
+```java
+ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder) {
+ return builder.addContactPointsWithPorts(new
InetSocketAddress(HOST,PORT))
+ .withQueryOptions(new
QueryOptions().setConsistencyLevel(CL))
+ .withSocketOptions(new SocketOptions()
+ .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ .setReadTimeoutMillis(READ_TIMEOUT))
+ .build();
+ }
+};
+long maxSplitMemorySize = ... //optional max split size in bytes minimum is
10MB. If not set, maxSplitMemorySize = 64 MB
+Source cassandraSource = new CassandraSource(clusterBuilder,
+ maxSplitMemorySize,
+ Pojo.class,
+ "select ... from KEYSPACE.TABLE
...;",
+ () -> new Mapper.Option[]
{Mapper.Option.saveNullFields(true)});
+DataStream<Pojo> stream = env.fromSource(cassandraSource,
WatermarkStrategy.noWatermarks(), "CassandraSource");
+```
+Regarding performances, the source splits table data like this:
+`numSplits = tableSize/maxSplitMemorySize`.
+
+If tableSize cannot be determined or previous numSplits computation makes too
few splits, it falls back to `numSplits = parallelism`
+
## Cassandra Sinks
### Configurations