[
https://issues.apache.org/jira/browse/BEAM-4049?focusedWorklogId=93992&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93992
]
ASF GitHub Bot logged work on BEAM-4049:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Apr/18 08:41
Start Date: 23/Apr/18 08:41
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #5112: [BEAM-4049]
Improve CassandraIO write throughput by performing async queries
URL: https://github.com/apache/beam/pull/5112
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pom.xml b/pom.xml
index fd656c45ede..35b85748c9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1951,6 +1951,9 @@
<shadedPattern>
org.apache.${renderedArtifactId}.repackaged.com.google.common
</shadedPattern>
+ <excludes>
+
<exclude>com.google.common.util.concurrent.ListenableFuture</exclude>
+ </excludes>
</relocation>
<relocation>
<pattern>com.google.thirdparty</pattern>
diff --git a/sdks/java/io/cassandra/build.gradle
b/sdks/java/io/cassandra/build.gradle
index 43bc8b3f803..2bd0be3c61a 100644
--- a/sdks/java/io/cassandra/build.gradle
+++ b/sdks/java/io/cassandra/build.gradle
@@ -24,7 +24,7 @@ enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Cassandra"
ext.summary = "IO to read and write with Apache Cassandra database"
-def cassandra_version = "3.2.0"
+def cassandra_version = "3.4.0"
dependencies {
compile library.java.guava
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
index 8b161d3fc8a..9ec45733d37 100644
--- a/sdks/java/io/cassandra/pom.xml
+++ b/sdks/java/io/cassandra/pom.xml
@@ -31,7 +31,7 @@
<description>IO to read and write with Apache Cassandra
database</description>
<properties>
- <cassandra.driver.version>3.2.0</cassandra.driver.version>
+ <cassandra.driver.version>3.4.0</cassandra.driver.version>
</properties>
<dependencies>
@@ -55,12 +55,13 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-mapping</artifactId>
+ <artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.driver.version}</version>
</dependency>
+
<dependency>
<groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
+ <artifactId>cassandra-driver-mapping</artifactId>
<version>${cassandra.driver.version}</version>
</dependency>
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 977b995d993..56db599823c 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -23,6 +23,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -462,7 +463,8 @@ public void setup() throws Exception {
}
@ProcessElement
- public void processElement(ProcessContext processContext) {
+ public void processElement(ProcessContext processContext)
+ throws ExecutionException, InterruptedException {
T entity = processContext.element();
writer.write(entity);
}
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
index 4b6015e2f40..5cac897fc75 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java
@@ -19,6 +19,7 @@
import java.io.Serializable;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.BoundedSource;
/**
@@ -58,7 +59,7 @@
* This method should be synchronous. It means you have to be sure that
the entity is fully
* stored (and committed) into the Cassandra instance when you exit from
this method.
*/
- void write(T entity);
+ void write(T entity) throws ExecutionException, InterruptedException;
}
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index 08346c4e349..5a52d2cbfa5 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -32,12 +32,15 @@
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.BoundedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -351,11 +354,18 @@ public TokenRange(
*/
protected class WriterImpl<T> implements Writer<T> {
+ /**
+ * The threshold of 100 concurrent async queries is a heuristic commonly
used
+ * by the Apache Cassandra community. There is no real gain to expect in
tuning this value.
+ */
+ private static final int CONCURRENT_ASYNC_QUERIES = 100;
+
private final CassandraIO.Write<T> spec;
private final Cluster cluster;
private final Session session;
private final MappingManager mappingManager;
+ private List<ListenableFuture<Void>> writeFutures;
public WriterImpl(CassandraIO.Write<T> spec) {
this.spec = spec;
@@ -363,21 +373,37 @@ public WriterImpl(CassandraIO.Write<T> spec) {
spec.localDc(), spec.consistencyLevel());
this.session = cluster.connect(spec.keyspace());
this.mappingManager = new MappingManager(session);
+ this.writeFutures = Lists.newArrayList();
}
/**
* Write the entity to the Cassandra instance, using {@link Mapper}
obtained with the
- * {@link MappingManager}. This method use {@link Mapper#save(Object)}
method, which is
- * synchronous. It means the entity is guaranteed to be reliably committed
to Cassandra.
+ * {@link MappingManager}. This method uses {@link
Mapper#saveAsync(Object)} method, which is
+ * asynchronous. Beam will wait for all futures to complete, to guarantee
all writes have
+ * succeeded.
*/
@Override
- public void write(T entity) {
+ public void write(T entity) throws ExecutionException,
InterruptedException {
Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass());
- mapper.save(entity);
+ this.writeFutures.add(mapper.saveAsync(entity));
+ if (this.writeFutures.size() == CONCURRENT_ASYNC_QUERIES) {
+ // We reached the max number of allowed in flight queries.
+ // Write methods are synchronous in Beam as stated by the
CassandraService interface,
+ // so we wait for each async query to return before exiting.
+ LOG.debug("Waiting for a batch of {} Cassandra writes to be
executed...",
+ CONCURRENT_ASYNC_QUERIES);
+ waitForFuturesToFinish();
+ this.writeFutures = Lists.newArrayList();
+ }
}
@Override
- public void close() {
+ public void close() throws ExecutionException, InterruptedException {
+ if (this.writeFutures.size() > 0) {
+ // Waiting for the last in flight async queries to return before
finishing the bundle.
+ waitForFuturesToFinish();
+ }
+
if (session != null) {
session.close();
}
@@ -386,6 +412,11 @@ public void close() {
}
}
+ private void waitForFuturesToFinish() throws ExecutionException,
InterruptedException {
+ for (ListenableFuture<Void> future:writeFutures) {
+ future.get();
+ }
+ }
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 93992)
Time Spent: 6.5h (was: 6h 20m)
> Improve write throughput of CassandraIO
> ---------------------------------------
>
> Key: BEAM-4049
> URL: https://issues.apache.org/jira/browse/BEAM-4049
> Project: Beam
> Issue Type: Improvement
> Components: io-java-cassandra
> Affects Versions: 2.4.0
> Reporter: Alexander Dejanovski
> Assignee: Alexander Dejanovski
> Priority: Major
> Labels: performance
> Time Spent: 6.5h
> Remaining Estimate: 0h
>
> The CassandraIO currently uses the mapper to perform writes in a synchronous
> fashion.
> This implies that writes are serialized and is a very suboptimal way of
> writing to Cassandra.
> The IO should use the saveAsync() method instead of save() and should wait
> for completion each time 100 queries are in flight, in order to avoid
> overwhelming clusters.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)