[ 
https://issues.apache.org/jira/browse/BEAM-4049?focusedWorklogId=90873&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90873
 ]

ASF GitHub Bot logged work on BEAM-4049:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/18 16:34
            Start Date: 13/Apr/18 16:34
    Worklog Time Spent: 10m 
      Work Description: adejanovski commented on a change in pull request 
#5112: [BEAM-4049] Improve CassandraIO write throughput by performing async 
queries
URL: https://github.com/apache/beam/pull/5112#discussion_r181443693
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
 ##########
 @@ -373,11 +378,22 @@ public WriterImpl(CassandraIO.Write<T> spec) {
     @Override
     public void write(T entity) {
       Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
-      mapper.save(entity);
+      this.writeFutures.add(mapper.saveAsync(entity));
+      if (this.writeFutures.size() % CONCURRENT_ASYNC_QUERIES == 0) {
+        // We reached the max number of allowed in flight queries
+        LOG.debug("Waiting for a batch of 100 Cassandra writes to be 
executed...");
+        Futures.successfulAsList(this.writeFutures);
 
 Review comment:
   Yes, we definitely should and successfulAsList() won't do this properly.
   
   I'll loop over the futures instead and call `.get()` so that we can throw 
the exceptions that would be triggered.
   What's the way of dealing with exceptions in Beam IOs ? Should I log the 
error and re-throw the exception or just not catch it and let it propagate up 
the stack ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 90873)
    Time Spent: 50m  (was: 40m)

> Improve write throughput of CassandraIO
> ---------------------------------------
>
>                 Key: BEAM-4049
>                 URL: https://issues.apache.org/jira/browse/BEAM-4049
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-cassandra
>    Affects Versions: 2.4.0
>            Reporter: Alexander Dejanovski
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>              Labels: performance
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The CassandraIO currently uses the mapper to perform writes in a synchronous 
> fashion. 
> This implies that writes are serialized and is a very suboptimal way of 
> writing to Cassandra.
> The IO should use the saveAsync() method instead of save() and should wait 
> for completion each time 100 queries are in flight, in order to avoid 
> overwhelming clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to