[ 
https://issues.apache.org/jira/browse/BEAM-4049?focusedWorklogId=92481&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92481
 ]

ASF GitHub Bot logged work on BEAM-4049:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/18 09:29
            Start Date: 19/Apr/18 09:29
    Worklog Time Spent: 10m 
      Work Description: adejanovski commented on a change in pull request 
#5112: [BEAM-4049] Improve CassandraIO write throughput by performing async 
queries
URL: https://github.com/apache/beam/pull/5112#discussion_r182686524
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
 ##########
 @@ -351,33 +354,54 @@ public TokenRange(
    */
   protected class WriterImpl<T> implements Writer<T> {
 
+    /**
+     * The threshold of 100 concurrent async queries is a heuristic commonly 
used
+     * by the Apache Cassandra community. There is no real gain to expect in 
tuning this value.
+     */
+    private static final int CONCURRENT_ASYNC_QUERIES = 100;
+
     private final CassandraIO.Write<T> spec;
 
     private final Cluster cluster;
     private final Session session;
     private final MappingManager mappingManager;
+    private List<ListenableFuture<Void>> writeFutures;
 
     public WriterImpl(CassandraIO.Write<T> spec) {
       this.spec = spec;
       this.cluster = getCluster(spec.hosts(), spec.port(), spec.username(), 
spec.password(),
           spec.localDc(), spec.consistencyLevel());
       this.session = cluster.connect(spec.keyspace());
       this.mappingManager = new MappingManager(session);
+      this.writeFutures = Lists.newArrayList();
     }
 
     /**
      * Write the entity to the Cassandra instance, using {@link Mapper} 
obtained with the
-     * {@link MappingManager}. This method use {@link Mapper#save(Object)} 
method, which is
-     * synchronous. It means the entity is guaranteed to be reliably committed 
to Cassandra.
+     * {@link MappingManager}. This method use {@link 
Mapper#saveAsync(Object)} method, which is
+     * asynchronous. Beam will wait for all futures to complete, to guarantee 
all writes have
+     * succeeded.
      */
     @Override
-    public void write(T entity) {
+    public void write(T entity) throws ExecutionException, 
InterruptedException {
       Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
-      mapper.save(entity);
+      this.writeFutures.add(mapper.saveAsync(entity));
+      if (this.writeFutures.size() % CONCURRENT_ASYNC_QUERIES == 0) {
 
 Review comment:
   1/ you're right, the modulus is useless here and I'll just check the number 
of elements
   
   2/ It's ok as any cluster should be able to handle 10s/100s of thousands of 
writes per seconds (if not more). The value of 100 is a heuristic that allows 
to apply some backpressure per client, and it's expected that the cluster is 
hammered way more than this through concurrent processes. 
   The question is then, if many bundles are being processed at the same time, 
how can one limit the load on the cluster ? Is there a way to limit concurrency 
by lowering the number of concurrent bundles or should we make the number of 
concurrent async queries configurable ?
   I'm still inclined to avoid making this configurable as it would bring an 
additional knob that is specific to Cassandra, and instead deal with 
concurrency with "standard" Beam knobs to limit job pressure on external 
systems.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92481)
    Time Spent: 5h 20m  (was: 5h 10m)

> Improve write throughput of CassandraIO
> ---------------------------------------
>
>                 Key: BEAM-4049
>                 URL: https://issues.apache.org/jira/browse/BEAM-4049
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-cassandra
>    Affects Versions: 2.4.0
>            Reporter: Alexander Dejanovski
>            Assignee: Alexander Dejanovski
>            Priority: Major
>              Labels: performance
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The CassandraIO currently uses the mapper to perform writes in a synchronous 
> fashion. 
> This implies that writes are serialized and is a very suboptimal way of 
> writing to Cassandra.
> The IO should use the saveAsync() method instead of save() and should wait 
> for completion each time 100 queries are in flight, in order to avoid 
> overwhelming clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to