This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new 26112ba  [FLINK-32014][doc][hotfix] Add Cassandra source documentation 
to website (#15)
26112ba is described below

commit 26112bab974011b58198449d00f7aa563329fd72
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu May 11 17:33:36 2023 +0200

    [FLINK-32014][doc][hotfix] Add Cassandra source documentation to website 
(#15)
---
 .../docs/connectors/datastream/cassandra.md        | 33 ++++++++++++++++++++++
 .../docs/connectors/datastream/cassandra.md        | 33 ++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/docs/content.zh/docs/connectors/datastream/cassandra.md 
b/docs/content.zh/docs/connectors/datastream/cassandra.md
index 4969027..a3aca81 100644
--- a/docs/content.zh/docs/connectors/datastream/cassandra.md
+++ b/docs/content.zh/docs/connectors/datastream/cassandra.md
@@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on 
local machine:
 1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
 2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
+## Cassandra Source
+Flink provides a 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+bounded source to read from Cassandra and return a collection of entities as 
`DataStream<Entity>`.
+An entity is built by Cassandra mapper 
([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
+based on a POJO containing annotations (as described in Cassandra object 
mapper).
+
+To use the source, do the following:
+
+```java
+ClusterBuilder clusterBuilder = new ClusterBuilder() {    
+    @Override    
+    protected Cluster buildCluster(Cluster.Builder builder) {      
+        return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+                      .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))                    
+                      .withSocketOptions(new SocketOptions()                   
+                      .setConnectTimeoutMillis(CONNECT_TIMEOUT)                
    
+                      .setReadTimeoutMillis(READ_TIMEOUT))                    
+                      .build();    
+    }
+};  
+long maxSplitMemorySize = ... //optional max split size in bytes minimum is 
10MB. If not set, maxSplitMemorySize = 64 MB
+Source cassandraSource = new CassandraSource(clusterBuilder, 
+                                             maxSplitMemorySize, 
+                                             Pojo.class, 
+                                             "select ... from KEYSPACE.TABLE 
...;",
+                                             () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});    
+DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),  "CassandraSource");
+```
+Regarding performances, the source splits table data like this:
+`numSplits = tableSize/maxSplitMemorySize`.
+
+If tableSize cannot be determined or previous numSplits computation makes too 
few splits, it falls back to `numSplits = parallelism`
+
 ## Cassandra Sinks
 
 ### Configurations
diff --git a/docs/content/docs/connectors/datastream/cassandra.md 
b/docs/content/docs/connectors/datastream/cassandra.md
index 25b7381..a4a1a92 100644
--- a/docs/content/docs/connectors/datastream/cassandra.md
+++ b/docs/content/docs/connectors/datastream/cassandra.md
@@ -45,6 +45,39 @@ There are multiple ways to bring up a Cassandra instance on 
local machine:
 1. Follow the instructions from [Cassandra Getting Started 
page](http://cassandra.apache.org/doc/latest/getting_started/index.html).
 2. Launch a container running Cassandra from [Official Docker 
Repository](https://hub.docker.com/_/cassandra/)
 
+## Cassandra Source
+Flink provides a 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
+bounded source to read from Cassandra and return a collection of entities as 
`DataStream<Entity>`.
+An entity is built by Cassandra mapper 
([MappingManager](https://javadoc.io/static/com.datastax.cassandra/cassandra-driver-mapping/3.11.2/com/datastax/driver/mapping/MappingManager.html))
+based on a POJO containing annotations (as described in Cassandra object 
mapper).
+
+To use the source, do the following:
+
+```java
+ClusterBuilder clusterBuilder = new ClusterBuilder() {    
+    @Override    
+    protected Cluster buildCluster(Cluster.Builder builder) {      
+        return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+                      .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))                    
+                      .withSocketOptions(new SocketOptions()                   
+                      .setConnectTimeoutMillis(CONNECT_TIMEOUT)                
    
+                      .setReadTimeoutMillis(READ_TIMEOUT))                    
+                      .build();    
+    }
+};  
+long maxSplitMemorySize = ... //optional max split size in bytes minimum is 
10MB. If not set, maxSplitMemorySize = 64 MB
+Source cassandraSource = new CassandraSource(clusterBuilder, 
+                                             maxSplitMemorySize, 
+                                             Pojo.class, 
+                                             "select ... from KEYSPACE.TABLE 
...;",
+                                             () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});    
+DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),  "CassandraSource");
+```
+Regarding performances, the source splits table data like this: 
+`numSplits = tableSize/maxSplitMemorySize`. 
+
+If tableSize cannot be determined or previous numSplits computation makes too 
few splits, it falls back to `numSplits = parallelism`
+
 ## Cassandra Sinks
 
 ### Configurations

Reply via email to