[
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15288709#comment-15288709
]
ASF GitHub Bot commented on FLINK-3311:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1771#discussion_r63675917
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints
within a separate table in a cassandra
+ * database.
+ * <p/>
+ * Entries are in the form |operator_id | subtask_id |
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+ private ClusterBuilder builder;
+ private transient Cluster cluster;
+ private transient Session session;
+
+ private String keySpace = "flink_auxiliary";
+ private String table = "checkpoints_";
+
+ private transient PreparedStatement deleteStatement;
+ private transient PreparedStatement updateStatement;
+ private transient PreparedStatement selectStatement;
+
+ public CassandraCommitter(ClusterBuilder builder) {
+ this.builder = builder;
+ ClosureCleaner.clean(builder, true);
+ }
+
+ public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+ this(builder);
+ this.keySpace = keySpace;
+ }
+
+ /**
+ * Internally used to set the job ID after instantiation.
+ *
+ * @param id
+ * @throws Exception
+ */
+ public void setJobId(String id) throws Exception {
+ super.setJobId(id);
+ table += id;
+ }
+
+ /**
+ * Generates the necessary tables to store information.
+ *
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public void createResource() throws Exception {
+ cluster = builder.getCluster();
+ session = cluster.connect();
+
+ session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s
with replication={'class':'SimpleStrategy', 'replication_factor':3};",
keySpace));
+ session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id,
sub_id));", keySpace, table));
+
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ if (builder == null) {
+ throw new RuntimeException("No ClusterBuilder was
set.");
+ }
+ cluster = builder.getCluster();
+ session = cluster.connect();
+
+ deleteStatement = session.prepare(String.format("DELETE FROM
%s.%s where sink_id='%s' and sub_id=%d;", keySpace, table, operatorId,
subtaskId));
+ updateStatement = session.prepare(String.format("UPDATE %s.%s
set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table,
operatorId, subtaskId));
+ selectStatement = session.prepare(String.format("SELECT
checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table,
operatorId, subtaskId));
+
+ session.execute(String.format("INSERT INTO %s.%s (sink_id,
sub_id, checkpoint_id) values ('%s', %d, " + -1 + ");", keySpace, table,
operatorId, subtaskId));
+ }
+
+ @Override
+ public void close() throws Exception {
+ session.executeAsync(deleteStatement.bind());
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing session.", e);
+ }
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing cluster.", e);
+ }
+ }
+
+ @Override
+ public void commitCheckpoint(long checkpointID) {
+ session.execute(updateStatement.bind(checkpointID));
--- End diff --
What happens if the `execute` call fails? Can we check the `ResultSet` for
the success of the operation? If the commit did not succeed, then I think we
should throw an exception.
> Add a connector for streaming data into Cassandra
> -------------------------------------------------
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
> Issue Type: New Feature
> Components: Streaming Connectors
> Reporter: Robert Metzger
> Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance
> (for the tests):
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store
> POJOs in Cassandra (there are annotations for defining the primary key and
> types)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)