[
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16184156#comment-16184156
]
ASF GitHub Bot commented on FLINK-7632:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4696#discussion_r141614919
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,77 +96,189 @@ Note that that enabling this feature will have an
adverse impact on latency.
<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:
The write-ahead log functionality is currently experimental. In many cases it
is sufficent to use the connector without enabling it. Please report problems
to the development mailing list.</p>
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once
delivery of action requests to C* instance.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>:
However, current Cassandra Sink implementation does not flush the pending
mutations before the checkpoint was triggered. Thus, some in-flight mutations
might not be replayed when the job recovered. </p>
+
+More details on [checkpoints docs]({{ site.baseurl
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{
site.baseurl }}/dev/connectors/guarantees.html)
-#### Example
+## Examples
+
+The Cassandra sinks currently support both Tuple and POJO data types, and
Flink automatically detects which type of input is used. For general use case
of those streaming data type, please refer to [Supported Data Types]({{
site.baseurl }}/dev/api_concepts.html). We show two implementations based on
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
for Pojo and Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and
Table `wordcount` have been created.
+
+<div class="codetabs" markdown="1">
+<div data-lang="CQL" markdown="1">
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+ WITH replication = {'class': 'SimpleStrategy', 'replication_factor':
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+ word text,
+ count bigint,
+ PRIMARY KEY(word)
+ );
+{% endhighlight %}
+</div>
+</div>
+
+### Cassandra Sink Example for Streaming Tuple Data Type
+While storing the result with Java/Scala Tuple data type to a Cassandra
sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to
persist each record back to database. With the upsert query cached as
`PreparedStatement`, Cassandra connector internally converts each Tuple
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit
[DataStax Java Driver
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-CassandraSink.addSink(input)
- .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .setClusterBuilder(new ClusterBuilder() {
- @Override
- public Cluster buildCluster(Cluster.Builder builder) {
- return builder.addContactPoint("127.0.0.1").build();
- }
- })
- .build();
+// get the execution environment
+final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream<String> text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream<Tuple2<String, Long>> result = text
+ .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String,
Long>> out) {
+ // normalize and split the line
+ String[] words = value.toLowerCase().split("\\s");
+
+ // emit the pairs
+ for (String word : words) {
+ //Do not accept empty word, since word is defined as
primary key in C* table
+ if (!word.isEmpty()) {
+ out.collect(new Tuple2<String, Long>(word, 1L));
+ }
+ }
+ }
+ })
+ .keyBy(0)
+ .timeWindow(Time.seconds(5))
+ .sum(1)
+ ;
+
+CassandraSink.addSink(result)
+ .setQuery("INSERT INTO example.wordcount(word, count) values (?,
?);")
+ .setHost("127.0.0.1")
+ .build();
{% endhighlight %}
</div>
+
<div data-lang="scala" markdown="1">
{% highlight scala %}
-CassandraSink.addSink(input)
- .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
- .setClusterBuilder(new ClusterBuilder() {
- override def buildCluster(builder: Cluster.Builder): Cluster = {
- builder.addContactPoint("127.0.0.1").build()
- }
- })
+val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+
+// get input data by connecting to the socket
+val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
+
+// parse the data, group it, window it, and aggregate the counts
+val result: DataStream[(String, Long)] = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ .flatMap(_.toLowerCase.split("\\s"))
+ .filter(_.nonEmpty)
+ .map((_, 1L))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0)
+ .timeWindow(Time.seconds(5))
+ .sum(1)
+
+CassandraSink.addSink(result)
+ .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
+ .setHost("127.0.0.1")
.build()
+
+result.print().setParallelism(1)
{% endhighlight %}
</div>
+
</div>
-The Cassandra sinks support both tuples and POJO's that use DataStax
annotations.
-Flink automatically detects which type of input is used.
-Example for such a Pojo:
+### Cassandra Sink Example for Streaming POJO Data Type
+An example of streaming a POJO data type and store the same POJO entity
back to Cassandra. In addition, this POJO implementation needs to follow
[DataStax Java Driver
Manual](http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/creating/)
to annotate the class as Cassandra connector internally maps each field of
this entity to an associated column of the desginated Table using
`com.datastax.driver.mapping.Mapper` class of DataStax Java Driver.
--- End diff --
-> to annotate the class **as each field of this entity is mapped to an
associated column of the designated table using the DataStax Java Driver
`com.datastax.driver.mapping.Mapper` class.**
> Better documentation and examples on C* sink usage for Pojo and Tuples data
> types
> ---------------------------------------------------------------------------------
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
> Issue Type: Sub-task
> Components: Cassandra Connector, Documentation
> Reporter: Michael Fong
> Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve
> documentation on its usage as well as some concrete / meaningful examples for
> both cases.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)