[
https://issues.apache.org/jira/browse/BEAM-6079?focusedWorklogId=166913&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166913
]
ASF GitHub Bot logged work on BEAM-6079:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Nov/18 15:14
Start Date: 16/Nov/18 15:14
Worklog Time Spent: 10m
Work Description: iemejia commented on a change in pull request #7064:
[BEAM-6079] Add ability for CassandraIO to delete data
URL: https://github.com/apache/beam/pull/7064#discussion_r234239515
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
##########
@@ -471,4 +471,83 @@ private void waitForFuturesToFinish() throws
ExecutionException, InterruptedExce
public Writer createWriter(CassandraIO.Write<T> spec) {
return new WriterImpl(spec);
}
+
+ /** Deleter storing an entity into Apache Cassandra database. */
+ protected class DeleterImpl implements Deleter<T> {
+ /**
+ * The threshold of 100 concurrent async queries is a heuristic commonly
used by the Apache
+ * Cassandra community. There is no real gain to expect in tuning this
value.
+ */
+ private static final int CONCURRENT_ASYNC_QUERIES = 100;
+
+ private final CassandraIO.Delete<T> spec;
+
+ private final Cluster cluster;
+ private final Session session;
+ private final MappingManager mappingManager;
+ private List<ListenableFuture<Void>> deleteFutures;
+
+ DeleterImpl(CassandraIO.Delete<T> spec) {
+ this.spec = spec;
+ this.cluster =
+ getCluster(
+ spec.hosts(),
+ spec.port(),
+ spec.username(),
+ spec.password(),
+ spec.localDc(),
+ spec.consistencyLevel());
+ this.session = cluster.connect(spec.keyspace());
+ this.mappingManager = new MappingManager(session);
+ this.deleteFutures = new ArrayList<>();
+ }
+
+ /**
+ * Delete the entity to the Cassandra instance, using {@link Mapper}
obtained with the {@link
+ * MappingManager}. This method uses {@link Mapper#deleteAsync(Object)}
method, which is
+ * asynchronous. Beam will wait for all futures to complete, to guarantee
all writes have
+ * succeeded.
+ */
+ @Override
+ public void delete(T entity) throws ExecutionException,
InterruptedException {
+ Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
+ this.deleteFutures.add(mapper.deleteAsync(entity));
Review comment:
Similar to the previous comment. Do you think we can make current Writer
somehow extensible for this case? I have the impression this line is the only
difference between the write and the delete, or I am missing something else?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 166913)
Time Spent: 20m (was: 10m)
> Add ability for CassandraIO to delete data
> ------------------------------------------
>
> Key: BEAM-6079
> URL: https://issues.apache.org/jira/browse/BEAM-6079
> Project: Beam
> Issue Type: Improvement
> Components: io-java-cassandra
> Reporter: Fabien Rousseau
> Assignee: Fabien Rousseau
> Priority: Minor
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Currently, it's possible to read & write data using CassandraIO.
> It would be nice to be able to delete using a CassandraIO.delete().
>
> I can provide a PR
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)