Repository: beam Updated Branches: refs/heads/master 3cc4ff6d7 -> c189d5c0e
[BEAM-245] Add CassandraIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b0bb3dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b0bb3dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b0bb3dc Branch: refs/heads/master Commit: 0b0bb3dc8d18b7d78780dbd39705e16a8aae028e Parents: 3cc4ff6 Author: Jean-Baptiste Onofré <[email protected]> Authored: Tue Mar 28 16:46:37 2017 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed Jun 7 07:40:05 2017 +0200 ---------------------------------------------------------------------- sdks/java/io/cassandra/pom.xml | 113 ++++ .../beam/sdk/io/cassandra/CassandraIO.java | 510 +++++++++++++++++++ .../beam/sdk/io/cassandra/CassandraService.java | 66 +++ .../sdk/io/cassandra/CassandraServiceImpl.java | 398 +++++++++++++++ .../beam/sdk/io/cassandra/package-info.java | 22 + .../beam/sdk/io/cassandra/CassandraIOIT.java | 254 +++++++++ .../beam/sdk/io/cassandra/CassandraIOTest.java | 279 ++++++++++ .../io/cassandra/CassandraServiceImplTest.java | 138 +++++ .../sdk/io/cassandra/CassandraTestDataSet.java | 153 ++++++ .../sdk/io/common/IOTestPipelineOptions.java | 10 + sdks/java/io/pom.xml | 1 + 11 files changed, 1944 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml new file mode 100644 index 0000000..8249f57 --- /dev/null +++ b/sdks/java/io/cassandra/pom.xml @@ -0,0 +1,113 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-cassandra</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Cassandra</name> + <description>IO to read and write with Apache Cassandra database</description> + + <properties> + <cassandra.driver.version>3.2.0</cassandra.driver.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-mapping</artifactId> + <version>${cassandra.driver.version}</version> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra.driver.version}</version> + </dependency> + + <!-- compile dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java new file mode 100644 index 0000000..b6f4ef6 --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; + +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An IO to read from Apache Cassandra. + * + * <h3>Reading from Apache Cassandra</h3> + * + * <p>{@code CassandraIO} provides a source to read and returns a bounded collection of + * entities as {@code PCollection<Entity>}. An entity is built by Cassandra mapper + * ({@code com.datastax.driver.mapping.EntityMapper}) based on a + * POJO containing annotations (as described http://docs.datastax + * .com/en/developer/java-driver/2.1/manual/object_mapper/creating/"). + * + * <p>The following example illustrates various options for configuring the IO: + * + * <pre>{@code + * pipeline.apply(CassandraIO.<Person>read() + * .withHosts(Arrays.asList("host1", "host2")) + * .withPort(9042) + * .withKeyspace("beam") + * .withTable("Person") + * .withEntity(Person.class) + * .withCoder(SerializableCoder.of(Person.class)) + * // above options are the minimum set, returns PCollection<Person> + * + * }</pre> + * + * <h3>Writing to Apache Cassandra</h3> + * + * <p>{@code CassandraIO} provides a sink to write a collection of entities to Apache Cassandra. + * + * <p>The following example illustrates various options for configuring the IO write: + * + * <pre>{@code + * pipeline + * .apply(...) // provides a PCollection<Person> where Person is an entity + * .apply(CassandraIO.<Person>write() + * .withHosts(Arrays.asList("host1", "host2")) + * .withPort(9042) + * .withKeyspace("beam") + * .withEntity(Person.class)); + * }</pre> + */ +@Experimental +public class CassandraIO { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraIO.class); + + private CassandraIO() {} + + /** + * Provide a {@link Read} {@link PTransform} to read data from a Cassandra database. + */ + public static <T> Read<T> read() { + return new AutoValue_CassandraIO_Read.Builder<T>().build(); + } + + /** + * Provide a {@link Write} {@link PTransform} to write data to a Cassandra database. + */ + public static <T> Write<T> write() { + return new AutoValue_CassandraIO_Write.Builder<T>().build(); + } + + /** + * A {@link PTransform} to read data from Apache Cassandra. See {@link CassandraIO} for more + * information on usage and configuration. + */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + + @Nullable abstract List<String> hosts(); + @Nullable abstract Integer port(); + @Nullable abstract String keyspace(); + @Nullable abstract String table(); + @Nullable abstract Class<T> entity(); + @Nullable abstract Coder<T> coder(); + @Nullable abstract String username(); + @Nullable abstract String password(); + @Nullable abstract String localDc(); + @Nullable abstract String consistencyLevel(); + @Nullable abstract CassandraService<T> cassandraService(); + abstract Builder<T> builder(); + + /** + * Specify the hosts of the Apache Cassandra instances. + */ + public Read<T> withHosts(List<String> hosts) { + checkArgument(hosts != null, "CassandraIO.read().withHosts(hosts) called with null hosts"); + checkArgument(!hosts.isEmpty(), "CassandraIO.read().withHosts(hosts) called with empty " + + "hosts list"); + return builder().setHosts(hosts).build(); + } + + /** + * Specify the port number of the Apache Cassandra instances. + */ + public Read<T> withPort(int port) { + checkArgument(port > 0, "CassandraIO.read().withPort(port) called with invalid port " + + "number (%d)", port); + return builder().setPort(port).build(); + } + + /** + * Specify the Cassandra keyspace where to read data. + */ + public Read<T> withKeyspace(String keyspace) { + checkArgument(keyspace != null, "CassandraIO.read().withKeyspace(keyspace) called with " + + "null keyspace"); + return builder().setKeyspace(keyspace).build(); + } + + /** + * Specify the Cassandra table where to read data. + */ + public Read<T> withTable(String table) { + checkArgument(table != null, "CassandraIO.read().withTable(table) called with null table"); + return builder().setTable(table).build(); + } + + /** + * Specify the entity class (annotated POJO). The {@link CassandraIO} will read the data and + * convert the data as entity instances. The {@link PCollection} resulting from the read will + * contains entity elements. + */ + public Read<T> withEntity(Class<T> entity) { + checkArgument(entity != null, "CassandraIO.read().withEntity(entity) called with null " + + "entity"); + return builder().setEntity(entity).build(); + } + + /** + * Specify the {@link Coder} used to serialize the entity in the {@link PCollection}. + */ + public Read<T> withCoder(Coder<T> coder) { + checkArgument(coder != null, "CassandraIO.read().withCoder(coder) called with null coder"); + return builder().setCoder(coder).build(); + } + + /** + * Specify the username for authentication. + */ + public Read<T> withUsername(String username) { + checkArgument(username != null, "CassandraIO.read().withUsername(username) called with " + + "null username"); + return builder().setUsername(username).build(); + } + + /** + * Specify the password for authentication. + */ + public Read<T> withPassword(String password) { + checkArgument(password != null, "CassandraIO.read().withPassword(password) called with " + + "null password"); + return builder().setPassword(password).build(); + } + + /** + * Specify the local DC used for the load balancing. + */ + public Read<T> withLocalDc(String localDc) { + checkArgument(localDc != null, "CassandraIO.read().withLocalDc(localDc) called with null " + + "localDc"); + return builder().setLocalDc(localDc).build(); + } + + public Read<T> withConsistencyLevel(String consistencyLevel) { + checkArgument(consistencyLevel != null, "CassandraIO.read().withConsistencyLevel" + + "(consistencyLevel) called with null consistencyLevel"); + return builder().setConsistencyLevel(consistencyLevel).build(); + } + + /** + * Specify an instance of {@link CassandraService} used to connect and read from Cassandra + * database. + */ + public Read<T> withCassandraService(CassandraService<T> cassandraService) { + checkArgument(cassandraService != null, "CassandraIO.read().withCassandraService(service)" + + " called with null service"); + return builder().setCassandraService(cassandraService).build(); + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.apply(org.apache.beam.sdk.io.Read.from( + new CassandraSource<T>(this, null))); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState(hosts() != null || cassandraService() != null, + "CassandraIO.read() requires a list of hosts to be set via withHosts(hosts) or a " + + "Cassandra service to be set via withCassandraService(service)"); + checkState(port() != null || cassandraService() != null, "CassandraIO.read() requires a " + + "valid port number to be set via withPort(port) or a Cassandra service to be set via " + + "withCassandraService(service)"); + checkState(keyspace() != null, "CassandraIO.read() requires a keyspace to be set via " + + "withKeyspace(keyspace)"); + checkState(table() != null, "CassandraIO.read() requires a table to be set via " + + "withTable(table)"); + checkState(entity() != null, "CassandraIO.read() requires an entity to be set via " + + "withEntity(entity)"); + checkState(coder() != null, "CassandraIO.read() requires a coder to be set via " + + "withCoder(coder)"); + } + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setHosts(List<String> hosts); + abstract Builder<T> setPort(Integer port); + abstract Builder<T> setKeyspace(String keyspace); + abstract Builder<T> setTable(String table); + abstract Builder<T> setEntity(Class<T> entity); + abstract Builder<T> setCoder(Coder<T> coder); + abstract Builder<T> setUsername(String username); + abstract Builder<T> setPassword(String password); + abstract Builder<T> setLocalDc(String localDc); + abstract Builder<T> setConsistencyLevel(String consistencyLevel); + abstract Builder<T> setCassandraService(CassandraService<T> cassandraService); + abstract Read<T> build(); + } + + /** + * Helper function to either get a fake/mock Cassandra service provided by + * {@link #withCassandraService(CassandraService)} or creates and returns an implementation + * of a concrete Cassandra service dealing with a Cassandra instance. + */ + @VisibleForTesting + CassandraService<T> getCassandraService() { + if (cassandraService() != null) { + return cassandraService(); + } + return new CassandraServiceImpl<>(); + } + + } + + @VisibleForTesting + static class CassandraSource<T> extends BoundedSource<T> { + + protected final Read<T> spec; + protected final String splitQuery; + + CassandraSource(Read<T> spec, + String splitQuery) { + this.spec = spec; + this.splitQuery = splitQuery; + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return spec.coder(); + } + + @Override + public void validate() { + spec.validate(null); + } + + @Override + public BoundedReader<T> createReader(PipelineOptions pipelineOptions) { + return spec.getCassandraService().createReader(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { + return spec.getCassandraService().getEstimatedSizeBytes(spec); + } + + @Override + public List<BoundedSource<T>> split(long desiredBundleSizeBytes, + PipelineOptions pipelineOptions) { + return spec.getCassandraService() + .split(spec, desiredBundleSizeBytes); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + if (spec.hosts() != null) { + builder.add(DisplayData.item("hosts", spec.hosts().toString())); + } + if (spec.port() != null) { + builder.add(DisplayData.item("port", spec.port())); + } + builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace())); + builder.addIfNotNull(DisplayData.item("table", spec.table())); + builder.addIfNotNull(DisplayData.item("username", spec.username())); + builder.addIfNotNull(DisplayData.item("localDc", spec.localDc())); + builder.addIfNotNull(DisplayData.item("consistencyLevel", spec.consistencyLevel())); + } + } + + /** + * A {@link PTransform} to write into Apache Cassandra. See {@link CassandraIO} for details on + * usage and configuration. + */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { + + @Nullable abstract List<String> hosts(); + @Nullable abstract Integer port(); + @Nullable abstract String keyspace(); + @Nullable abstract Class<T> entity(); + @Nullable abstract String username(); + @Nullable abstract String password(); + @Nullable abstract String localDc(); + @Nullable abstract String consistencyLevel(); + @Nullable abstract CassandraService<T> cassandraService(); + abstract Builder<T> builder(); + + /** + * Specify the Cassandra instance hosts where to write data. + */ + public Write<T> withHosts(List<String> hosts) { + checkArgument(hosts != null, "CassandraIO.write().withHosts(hosts) called with null hosts"); + checkArgument(!hosts.isEmpty(), "CassandraIO.write().withHosts(hosts) called with empty " + + "hosts list"); + return builder().setHosts(hosts).build(); + } + + /** + * Specify the Cassandra instance port number where to write data. + */ + public Write<T> withPort(int port) { + checkArgument(port > 0, "CassandraIO.write().withPort(port) called with invalid port " + + "number (%d)", port); + return builder().setPort(port).build(); + } + + /** + * Specify the Cassandra keyspace where to write data. + */ + public Write<T> withKeyspace(String keyspace) { + checkArgument(keyspace != null, "CassandraIO.write().withKeyspace(keyspace) called with " + + "null keyspace"); + return builder().setKeyspace(keyspace).build(); + } + + /** + * Specify the entity class in the input {@link PCollection}. The {@link CassandraIO} will + * map this entity to the Cassandra table thanks to the annotations. + */ + public Write<T> withEntity(Class<T> entity) { + checkArgument(entity != null, "CassandraIO.write().withEntity(entity) called with null " + + "entity"); + return builder().setEntity(entity).build(); + } + + /** + * Specify the username used for authentication. + */ + public Write<T> withUsername(String username) { + checkArgument(username != null, "CassandraIO.write().withUsername(username) called with " + + "null username"); + return builder().setUsername(username).build(); + } + + /** + * Specify the password used for authentication. + */ + public Write<T> withPassword(String password) { + checkArgument(password != null, "CassandraIO.write().withPassword(password) called with " + + "null password"); + return builder().setPassword(password).build(); + } + + /** + * Specify the local DC used by the load balancing policy. + */ + public Write<T> withLocalDc(String localDc) { + checkArgument(localDc != null, "CassandraIO.write().withLocalDc(localDc) called with null" + + " localDc"); + return builder().setLocalDc(localDc).build(); + } + + public Write<T> withConsistencyLevel(String consistencyLevel) { + checkArgument(consistencyLevel != null, "CassandraIO.write().withConsistencyLevel" + + "(consistencyLevel) called with null consistencyLevel"); + return builder().setConsistencyLevel(consistencyLevel).build(); + } + + /** + * Specify the {@link CassandraService} used to connect and write into the Cassandra database. + */ + public Write<T> withCassandraService(CassandraService<T> cassandraService) { + checkArgument(cassandraService != null, "CassandraIO.write().withCassandraService" + + "(service) called with null service"); + return builder().setCassandraService(cassandraService).build(); + } + + @Override + public void validate(PipelineOptions pipelineOptions) { + checkState(hosts() != null || cassandraService() != null, + "CassandraIO.write() requires a list of hosts to be set via withHosts(hosts) or a " + + "Cassandra service to be set via withCassandraService(service)"); + checkState(port() != null || cassandraService() != null, "CassandraIO.write() requires a " + + "valid port number to be set via withPort(port) or a Cassandra service to be set via " + + "withCassandraService(service)"); + checkState(keyspace() != null, "CassandraIO.write() requires a keyspace to be set via " + + "withKeyspace(keyspace)"); + checkState(entity() != null, "CassandraIO.write() requires an entity to be set via " + + "withEntity(entity)"); + } + + @Override + public PDone expand(PCollection<T> input) { + input.apply(ParDo.of(new WriteFn<T>(this))); + return PDone.in(input.getPipeline()); + } + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setHosts(List<String> hosts); + abstract Builder<T> setPort(Integer port); + abstract Builder<T> setKeyspace(String keyspace); + abstract Builder<T> setEntity(Class<T> entity); + abstract Builder<T> setUsername(String username); + abstract Builder<T> setPassword(String password); + abstract Builder<T> setLocalDc(String localDc); + abstract Builder<T> setConsistencyLevel(String consistencyLevel); + abstract Builder<T> setCassandraService(CassandraService<T> cassandraService); + abstract Write<T> build(); + } + + /** + * Helper function to either get a fake/mock Cassandra service provided by + * {@link #withCassandraService(CassandraService)} or creates and returns an implementation + * of a concrete Cassandra service dealing with a Cassandra instance. + */ + @VisibleForTesting + CassandraService<T> getCassandraService() { + if (cassandraService() != null) { + return cassandraService(); + } + return new CassandraServiceImpl<>(); + } + + } + + private static class WriteFn<T> extends DoFn<T, Void> { + + private final Write<T> spec; + private CassandraService.Writer writer; + + public WriteFn(Write<T> spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + writer = spec.getCassandraService().createWriter(spec); + } + + @ProcessElement + public void processElement(ProcessContext processContext) { + T entity = processContext.element(); + writer.write(entity); + } + + @Teardown + public void teardown() throws Exception { + writer.close(); + writer = null; + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java new file mode 100644 index 0000000..5071762 --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.sdk.io.BoundedSource; + +/** + * An interface for real or fake implementations of Cassandra. + */ +public interface CassandraService<T> extends Serializable { + + /** + * Returns a {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader} that will read from + * Cassandra using the spec from + * {@link org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource}. + */ + BoundedSource.BoundedReader<T> createReader(CassandraIO.CassandraSource<T> source); + + /** + * Returns an estimation of the size that could be read. + */ + long getEstimatedSizeBytes(CassandraIO.Read<T> spec); + + /** + * Split a table read into several sources. + */ + List<BoundedSource<T>> split(CassandraIO.Read<T> spec, + long desiredBundleSizeBytes); + + /** + * Create a {@link Writer} that writes entities into the Cassandra instance. + */ + Writer createWriter(CassandraIO.Write<T> spec) throws Exception; + + /** + * Writer for an entity. + */ + interface Writer<T> extends AutoCloseable { + + /** + * This method should be synchronous. It means you have to be sure that the entity is fully + * stored (and committed) into the Cassandra instance when you exit from this method. + */ + void write(T entity); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java new file mode 100644 index 0000000..63c8ef4 --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PlainTextAuthProvider; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.beam.sdk.io.BoundedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the {@link CassandraService} that actually use a Cassandra instance. + */ +public class CassandraServiceImpl<T> implements CassandraService<T> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class); + + private static final long MIN_TOKEN = Long.MIN_VALUE; + private static final long MAX_TOKEN = Long.MAX_VALUE; + private static final BigInteger TOTAL_TOKEN_COUNT = + BigInteger.valueOf(MAX_TOKEN).subtract(BigInteger.valueOf(MIN_TOKEN)); + + private class CassandraReaderImpl<T> extends BoundedSource.BoundedReader<T> { + + private final CassandraIO.CassandraSource<T> source; + + private Cluster cluster; + private Session session; + private ResultSet resultSet; + private Iterator<T> iterator; + private T current; + + public CassandraReaderImpl(CassandraIO.CassandraSource<T> source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + LOG.debug("Starting Cassandra reader"); + cluster = getCluster(source.spec.hosts(), source.spec.port(), source.spec.username(), + source.spec.password(), source.spec.localDc(), source.spec.consistencyLevel()); + session = cluster.connect(); + LOG.debug("Query: " + source.splitQuery); + resultSet = session.execute(source.splitQuery); + + final MappingManager mappingManager = new MappingManager(session); + Mapper mapper = mappingManager.mapper(source.spec.entity()); + iterator = mapper.map(resultSet).iterator(); + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } + current = null; + return false; + } + + @Override + public void close() { + LOG.debug("Closing Cassandra reader"); + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public CassandraIO.CassandraSource<T> getCurrentSource() { + return source; + } + + } + + @Override + public CassandraReaderImpl<T> createReader(CassandraIO.CassandraSource<T> source) { + return new CassandraReaderImpl<>(source); + } + + @Override + public long getEstimatedSizeBytes(CassandraIO.Read<T> spec) { + try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), + spec.localDc(), spec.consistencyLevel())) { + if (isMurmur3Partitioner(cluster)) { + try { + List<TokenRange> tokenRanges = getTokenRanges(cluster, + spec.keyspace(), + spec.table()); + return getEstimatedSizeBytes(tokenRanges); + } catch (Exception e) { + LOG.warn("Can't estimate the size", e); + return 0L; + } + } else { + LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size"); + return 0L; + } + } + } + + /** + * Actually estimate the size of the data to read on the cluster, based on the given token + * ranges to address. + */ + @VisibleForTesting + protected static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) { + long size = 0L; + for (TokenRange tokenRange : tokenRanges) { + size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount; + } + return Math.round(size / getRingFraction(tokenRanges)); + } + + @Override + public List<BoundedSource<T>> split(CassandraIO.Read<T> spec, + long desiredBundleSizeBytes) { + try (Cluster cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), + spec.localDc(), spec.consistencyLevel())) { + if (isMurmur3Partitioner(cluster)) { + LOG.info("Murmur3Partitioner detected, splitting"); + return split(spec, desiredBundleSizeBytes, getEstimatedSizeBytes(spec)); + } else { + LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for " + + "the read"); + String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString(); + List<BoundedSource<T>> sources = new ArrayList<>(); + sources.add(new CassandraIO.CassandraSource<T>(spec, splitQuery)); + return sources; + } + } + } + + /** + * Compute the number of splits based on the estimated size and the desired bundle size, and + * create several sources. + */ + @VisibleForTesting + protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec, + long desiredBundleSizeBytes, + long estimatedSizeBytes) { + long numSplits = 1; + List<BoundedSource<T>> sourceList = new ArrayList<>(); + if (desiredBundleSizeBytes > 0) { + numSplits = estimatedSizeBytes / desiredBundleSizeBytes; + } + if (numSplits <= 0) { + LOG.warn("Number of splits is less than 0 ({}), fallback to 1", numSplits); + numSplits = 1; + } + + LOG.info("Number of splits is {}", numSplits); + + double startRange = MIN_TOKEN; + double endRange = MAX_TOKEN; + double startToken, endToken; + + endToken = startRange; + double incrementValue = endRange - startRange / numSplits; + String splitQuery; + if (numSplits == 1) { + // we have an unique split + splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString(); + sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery)); + } else { + // we have more than one split + for (int i = 0; i < numSplits; i++) { + startToken = endToken; + endToken = (i == numSplits) ? endRange : (startToken + incrementValue); + Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where(); + if (i > 0) { + builder = builder.and(QueryBuilder.gte("token($pk)", startToken)); + } + if (i < (numSplits - 1)) { + builder = builder.and(QueryBuilder.lt("token($pk)", endToken)); + } + sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString())); + } + } + return sourceList; + } + + /** + * Get a Cassandra cluster using hosts and port. + */ + private Cluster getCluster(List<String> hosts, int port, String username, String password, + String localDc, String consistencyLevel) { + Cluster.Builder builder = Cluster.builder() + .addContactPoints(hosts.toArray(new String[0])) + .withPort(port); + + if (username != null) { + builder.withAuthProvider(new PlainTextAuthProvider(username, password)); + } + + if (localDc != null) { + builder.withLoadBalancingPolicy( + new TokenAwarePolicy(new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build())); + } else { + builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); + } + + if (consistencyLevel != null) { + builder.withQueryOptions( + new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel))); + } + + return builder.build(); + } + + /** + * Gets the list of token ranges that a table occupies on a give Cassandra node. + * + * <p>NB: This method is compatible with Cassandra 2.1.5 and greater. + */ + private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) { + try (Session session = cluster.newSession()) { + ResultSet resultSet = + session.execute( + "SELECT range_start, range_end, partitions_count, mean_partition_size FROM " + + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?", + keyspace, + table); + + ArrayList<TokenRange> tokenRanges = new ArrayList<>(); + for (Row row : resultSet) { + TokenRange tokenRange = + new TokenRange( + row.getLong("partitions_count"), + row.getLong("mean_partition_size"), + row.getLong("range_start"), + row.getLong("range_end")); + tokenRanges.add(tokenRange); + } + // The table may not contain the estimates yet + // or have partitions_count and mean_partition_size fields = 0 + // if the data was just inserted and the amount of data in the table was small. + // This is very common situation during tests, + // when we insert a few rows and immediately query them. + // However, for tiny data sets the lack of size estimates is not a problem at all, + // because we don't want to split tiny data anyways. + // Therefore, we're not issuing a warning if the result set was empty + // or mean_partition_size and partitions_count = 0. + return tokenRanges; + } + } + + /** + * Compute the percentage of token addressed compared with the whole tokens in the cluster. + */ + @VisibleForTesting + protected static double getRingFraction(List<TokenRange> tokenRanges) { + double ringFraction = 0; + for (TokenRange tokenRange : tokenRanges) { + ringFraction = ringFraction + (distance(tokenRange.rangeStart, tokenRange.rangeEnd) + .doubleValue() / TOTAL_TOKEN_COUNT.doubleValue()); + } + return ringFraction; + } + + /** + * Measure distance between two tokens. + */ + @VisibleForTesting + protected static BigInteger distance(long left, long right) { + if (right > left) { + return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left)); + } else { + return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left)).add(TOTAL_TOKEN_COUNT); + } + } + + /** + * Check if the current partitioner is the Murmur3 (default in Cassandra version newer than 2). + */ + @VisibleForTesting + protected static boolean isMurmur3Partitioner(Cluster cluster) { + return cluster.getMetadata().getPartitioner() + .equals("org.apache.cassandra.dht.Murmur3Partitioner"); + } + + /** + * Represent a token range in Cassandra instance, wrapping the partition count, size and token + * range. + */ + @VisibleForTesting + protected static class TokenRange { + private final long partitionCount; + private final long meanPartitionSize; + private final long rangeStart; + private final long rangeEnd; + + public TokenRange( + long partitionCount, long meanPartitionSize, long rangeStart, long + rangeEnd) { + this.partitionCount = partitionCount; + this.meanPartitionSize = meanPartitionSize; + this.rangeStart = rangeStart; + this.rangeEnd = rangeEnd; + } + } + + /** + * Writer storing an entity into Apache Cassandra database. + */ + protected class WriterImpl<T> implements Writer<T> { + + private final CassandraIO.Write<T> spec; + + private final Cluster cluster; + private final Session session; + private final MappingManager mappingManager; + + public WriterImpl(CassandraIO.Write<T> spec) { + this.spec = spec; + this.cluster = getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), + spec.localDc(), spec.consistencyLevel()); + this.session = cluster.connect(spec.keyspace()); + this.mappingManager = new MappingManager(session); + } + + /** + * Write the entity to the Cassandra instance, using {@link Mapper} obtained with the + * {@link MappingManager}. This method use {@link Mapper#save(Object)} method, which is + * synchronous. It means the entity is guaranteed to be reliably committed to Cassandra. + */ + @Override + public void write(T entity) { + Mapper<T> mapper = (Mapper<T>) mappingManager.mapper(entity.getClass()); + mapper.save(entity); + } + + @Override + public void close() { + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + + } + + @Override + public Writer createWriter(CassandraIO.Write<T> spec) { + return new WriterImpl(spec); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java new file mode 100644 index 0000000..6659b62 --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from Apache Cassandra database. + */ +package org.apache.beam.sdk.io.cassandra; http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java new file mode 100644 index 0000000..e67d305 --- /dev/null +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static org.junit.Assert.assertEquals; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.PartitionKey; +import com.datastax.driver.mapping.annotations.Table; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A test of {@link CassandraIO} on a concrete and independent Cassandra instance. + * + * <p>This test requires a running Cassandra instance, and the test dataset must exists. + * + * <p>You can run this test directly using Maven with: + * + * <pre>{@code + * mvn -e -Pio-it verify -pl sdks/java/io/cassandra -DintegrationTestPipelineOptions='[ + * "--cassandraHost=1.2.3.4", + * "--cassandraPort=9042"]' + * }</pre> + */ +@RunWith(JUnit4.class) +public class CassandraIOIT implements Serializable { + + private static IOTestPipelineOptions options; + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void setup() throws Exception { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + options = TestPipeline.testingPipelineOptions() + .as(IOTestPipelineOptions.class); + } + + @AfterClass + public static void tearDown() { + // cleanup the write table + CassandraTestDataSet.cleanUpDataTable(options); + } + + @Test + public void testRead() throws Exception { + PCollection<Scientist> output = pipeline.apply(CassandraIO.<Scientist>read() + .withHosts(Collections.singletonList(options.getCassandraHost())) + .withPort(options.getCassandraPort()) + .withKeyspace(CassandraTestDataSet.KEYSPACE) + .withTable(CassandraTestDataSet.TABLE_READ_NAME) + .withEntity(Scientist.class) + .withCoder(SerializableCoder.of(Scientist.class))); + + PAssert.thatSingleton(output.apply("Count scientist", Count.<Scientist>globally())) + .isEqualTo(1000L); + + PCollection<KV<String, Integer>> mapped = + output.apply( + MapElements.via( + new SimpleFunction<Scientist, KV<String, Integer>>() { + public KV<String, Integer> apply(Scientist scientist) { + KV<String, Integer> kv = KV.of(scientist.name, scientist.id); + return kv; + } + } + ) + ); + PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey())) + .satisfies( + new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(element.getKey(), 1000 / 10, element.getValue().longValue()); + } + return null; + } + } + ); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testWrite() throws Exception { + IOTestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class); + + options.setOnSuccessMatcher( + new CassandraMatcher( + CassandraTestDataSet.getCluster(options), + CassandraTestDataSet.TABLE_WRITE_NAME)); + + TestPipeline.convertToArgs(options); + + ArrayList<ScientistForWrite> data = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + ScientistForWrite scientist = new ScientistForWrite(); + scientist.id = i; + scientist.name = "Name " + i; + data.add(scientist); + } + + pipeline + .apply(Create.of(data)) + .apply(CassandraIO.<ScientistForWrite>write() + .withHosts(Collections.singletonList(options.getCassandraHost())) + .withPort(options.getCassandraPort()) + .withKeyspace(CassandraTestDataSet.KEYSPACE) + .withEntity(ScientistForWrite.class)); + + pipeline.run().waitUntilFinish(); + } + + /** + * Simple matcher. + */ + public class CassandraMatcher extends TypeSafeMatcher<PipelineResult> + implements SerializableMatcher<PipelineResult> { + + private String tableName; + private Cluster cluster; + + public CassandraMatcher(Cluster cluster, String tableName) { + this.cluster = cluster; + this.tableName = tableName; + } + + @Override + protected boolean matchesSafely(PipelineResult pipelineResult) { + pipelineResult.waitUntilFinish(); + Session session = cluster.connect(); + ResultSet result = session.execute("select id,name from " + CassandraTestDataSet.KEYSPACE + + "." + tableName); + List<Row> rows = result.all(); + if (rows.size() != 1000) { + return false; + } + for (Row row : rows) { + if (!row.getString("name").matches("Name.*")) { + return false; + } + } + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("Expected Cassandra record pattern is (Name.*)"); + } + } + + /** + * Simple Cassandra entity representing a scientist. Used for read test. + */ + @Table(name = CassandraTestDataSet.TABLE_READ_NAME, keyspace = CassandraTestDataSet.KEYSPACE) + public static class Scientist implements Serializable { + + @PartitionKey + @Column(name = "id") + private final int id; + + @Column(name = "name") + private final String name; + + public Scientist() { + this(0, ""); + } + + public Scientist(int id) { + this(0, ""); + } + + public Scientist(int id, String name) { + this.id = id; + this.name = name; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + } + + /** + * Simple Cassandra entity representing a scientist, used for write test. + */ + @Table(name = CassandraTestDataSet.TABLE_WRITE_NAME, keyspace = CassandraTestDataSet.KEYSPACE) + public class ScientistForWrite implements Serializable { + + @PartitionKey + @Column(name = "id") + public Integer id; + + @Column(name = "name") + public String name; + + public String toString() { + return id + ":" + name; + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java new file mode 100644 index 0000000..cfd78d2 --- /dev/null +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests of {@link CassandraIO}. */ +public class CassandraIOTest implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class); + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testEstimatedSizeBytes() throws Exception { + final FakeCassandraService service = new FakeCassandraService(); + service.load(); + + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + CassandraIO.Read spec = CassandraIO.<Scientist>read().withCassandraService(service); + CassandraIO.CassandraSource source = new CassandraIO.CassandraSource( + spec, + null); + long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions); + // the size is the sum of the bytes size of the String representation of a scientist in the map + assertEquals(113890, estimatedSizeBytes); + } + + @Test + public void testRead() throws Exception { + FakeCassandraService service = new FakeCassandraService(); + service.load(); + + PCollection<Scientist> output = pipeline.apply(CassandraIO + .<Scientist>read() + .withCassandraService(service) + .withKeyspace("beam") + .withTable("scientist") + .withCoder(SerializableCoder.of(Scientist.class)) + .withEntity(Scientist.class) + ); + + PAssert.thatSingleton(output.apply("Count", Count.<Scientist>globally())) + .isEqualTo(10000L); + + PCollection<KV<String, Integer>> mapped = + output.apply( + MapElements.via( + new SimpleFunction<Scientist, KV<String, Integer>>() { + public KV<String, Integer> apply(Scientist scientist) { + return KV.of(scientist.name, scientist.id); + } + })); + PAssert.that(mapped.apply("Count occurrences per scientist", Count.<String, Integer>perKey())) + .satisfies( + new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(element.getKey(), 1000, element.getValue().longValue()); + } + return null; + } + }); + + pipeline.run(); + } + + @Test + public void testWrite() throws Exception { + FakeCassandraService service = new FakeCassandraService(); + + ArrayList<Scientist> data = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + Scientist scientist = new Scientist(); + scientist.id = i; + scientist.name = "Name " + i; + data.add(scientist); + } + + pipeline + .apply(Create.of(data)) + .apply(CassandraIO.<Scientist>write().withCassandraService(service) + .withKeyspace("beam") + .withEntity(Scientist.class)); + pipeline.run(); + + assertEquals(service.getTable().size(), 1000); + for (Scientist scientist : service.getTable().values()) { + assertTrue(scientist.name.matches("Name (\\d*)")); + } + } + + /** + * A {@link CassandraService} implementation that stores the entity in memory. + */ + private static class FakeCassandraService implements CassandraService<Scientist> { + + private static final Map<Integer, Scientist> table = new ConcurrentHashMap<>(); + + public void load() { + table.clear(); + String[] scientists = { + "Lovelace", + "Franklin", + "Meitner", + "Hopper", + "Curie", + "Faraday", + "Newton", + "Bohr", + "Galilei", + "Maxwell" + }; + for (int i = 0; i < 10000; i++) { + int index = i % scientists.length; + Scientist scientist = new Scientist(); + scientist.id = i; + scientist.name = scientists[index]; + table.put(scientist.id, scientist); + } + } + + public Map<Integer, Scientist> getTable() { + return table; + } + + @Override + public FakeCassandraReader createReader(CassandraIO.CassandraSource source) { + return new FakeCassandraReader(source); + } + + static class FakeCassandraReader extends BoundedSource.BoundedReader { + + private final CassandraIO.CassandraSource source; + + private Iterator<Scientist> iterator; + private Scientist current; + + public FakeCassandraReader(CassandraIO.CassandraSource source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + iterator = table.values().iterator(); + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } + current = null; + return false; + } + + @Override + public void close() { + iterator = null; + current = null; + } + + @Override + public Scientist getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public CassandraIO.CassandraSource getCurrentSource() { + return this.source; + } + + } + + @Override + public long getEstimatedSizeBytes(CassandraIO.Read spec) { + long size = 0L; + for (Scientist scientist : table.values()) { + size = size + scientist.toString().getBytes().length; + } + return size; + } + + @Override + public List<BoundedSource<Scientist>> split(CassandraIO.Read spec, + long desiredBundleSizeBytes) { + List<BoundedSource<Scientist>> sources = new ArrayList<>(); + sources.add(new CassandraIO.CassandraSource<Scientist>(spec, null)); + return sources; + } + + static class FakeCassandraWriter implements Writer<Scientist> { + + @Override + public void write(Scientist scientist) { + table.put(scientist.id, scientist); + } + + @Override + public void close() { + // nothing to do + } + + } + + @Override + public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) { + return new FakeCassandraWriter(); + } + + } + + /** Simple Cassandra entity used in test. */ + @Table(name = "scientist", keyspace = "beam") + public static class Scientist implements Serializable { + + @Column(name = "person_name") + public String name; + + @Column(name = "person_id") + public int id; + + public String toString() { + return id + ":" + name; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java new file mode 100644 index 0000000..6a68e90 --- /dev/null +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImplTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests on {@link CassandraServiceImplTest}. + */ +public class CassandraServiceImplTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImplTest.class); + + private static final String MURMUR3_PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner"; + + private Cluster createClusterMock() { + Metadata metadata = Mockito.mock(Metadata.class); + Mockito.when(metadata.getPartitioner()).thenReturn(MURMUR3_PARTITIONER); + Cluster cluster = Mockito.mock(Cluster.class); + Mockito.when(cluster.getMetadata()).thenReturn(metadata); + return cluster; + } + + @Test + public void testValidPartitioner() throws Exception { + assertTrue(CassandraServiceImpl.isMurmur3Partitioner(createClusterMock())); + } + + @Test + public void testDistance() throws Exception { + BigInteger distance = CassandraServiceImpl.distance(10L, 100L); + assertEquals(BigInteger.valueOf(90), distance); + + distance = CassandraServiceImpl.distance(100L, 10L); + assertEquals(new BigInteger("18446744073709551525"), distance); + } + + @Test + public void testRingFraction() throws Exception { + // simulate a first range taking "half" of the available tokens + List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>(); + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, Long.MIN_VALUE, 0)); + assertEquals(0.5, CassandraServiceImpl.getRingFraction(tokenRanges), 0); + + // add a second range to cover all tokens available + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1, 0, Long.MAX_VALUE)); + assertEquals(1.0, CassandraServiceImpl.getRingFraction(tokenRanges), 0); + } + + @Test + public void testEstimatedSizeBytes() throws Exception { + List<CassandraServiceImpl.TokenRange> tokenRanges = new ArrayList<>(); + // one partition containing all tokens, the size is actually the size of the partition + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE)); + assertEquals(1000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges)); + + // one partition with half of the tokens, we estimate the size to the double of this partition + tokenRanges = new ArrayList<>(); + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, 0)); + assertEquals(2000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges)); + + // we have three partitions covering all tokens, the size is the sum of partition size * + // partition count + tokenRanges = new ArrayList<>(); + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, Long.MIN_VALUE, -3)); + tokenRanges.add(new CassandraServiceImpl.TokenRange(1, 1000, -2, 10000)); + tokenRanges.add(new CassandraServiceImpl.TokenRange(2, 3000, 10001, Long.MAX_VALUE)); + assertEquals(8000, CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges)); + } + + @Test + public void testThreeSplits() throws Exception { + CassandraServiceImpl service = new CassandraServiceImpl(); + CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test"); + List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 150); + assertEquals(3, sources.size()); + assertTrue(sources.get(0).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\" + + "(\\$pk\\)<(.*)")); + assertTrue(sources.get(1).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\" + + "(\\$pk\\)>=(.*) AND token\\(\\$pk\\)<(.*)")); + assertTrue(sources.get(2).splitQuery.matches("SELECT \\* FROM beam.test WHERE token\\" + + "(\\$pk\\)>=(.*)")); + } + + @Test + public void testTwoSplits() throws Exception { + CassandraServiceImpl service = new CassandraServiceImpl(); + CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test"); + List<CassandraIO.CassandraSource> sources = service.split(spec, 50, 100); + assertEquals(2, sources.size()); + LOG.info("TOKEN: " + ((double) Long.MAX_VALUE / 2)); + LOG.info(sources.get(0).splitQuery); + LOG.info(sources.get(1).splitQuery); + assertEquals("SELECT * FROM beam.test WHERE token($pk)<" + ((double) Long.MAX_VALUE / 2) + ";", + sources.get(0).splitQuery); + assertEquals("SELECT * FROM beam.test WHERE token($pk)>=" + ((double) Long.MAX_VALUE / 2) + + ";", + sources.get(1).splitQuery); + } + + @Test + public void testUniqueSplit() throws Exception { + CassandraServiceImpl service = new CassandraServiceImpl(); + CassandraIO.Read spec = CassandraIO.read().withKeyspace("beam").withTable("test"); + List<CassandraIO.CassandraSource> sources = service.split(spec, 100, 100); + assertEquals(1, sources.size()); + assertEquals("SELECT * FROM beam.test;", sources.get(0).splitQuery); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java new file mode 100644 index 0000000..461f5ea --- /dev/null +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; + +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manipulates test data used by the {@link CassandraIO} tests. + * + * <p>This is independent from the tests so that for read tests it can be run separately after + * data store creation rather than every time (which can be more fragile). + */ +public class CassandraTestDataSet { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraTestDataSet.class); + + /** + * Use this to create the read tables before IT read tests. + * + * <p>To invoke this class, you can use this command line: + * (run from the cassandra root directory) + * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.cassandra + * .CassandraTestDataSet \ + * -Dexec.args="--cassandraHost=localhost --cassandraPort=9042 \ + * -Dexec.classpathScope=test + * @param args Please pass options from IOTestPipelineOptions used for connection to Cassandra as + * shown above. + */ + public static void main(String[] args) { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + IOTestPipelineOptions options = + PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class); + + createDataTable(options); + } + + public static final String KEYSPACE = "BEAM"; + public static final String TABLE_READ_NAME = "BEAM_READ_TEST"; + public static final String TABLE_WRITE_NAME = "BEAM_WRITE_TEST"; + + public static void createDataTable(IOTestPipelineOptions options) { + createTable(options, TABLE_READ_NAME); + insertTestData(options, TABLE_READ_NAME); + createTable(options, TABLE_WRITE_NAME); + } + + public static Cluster getCluster(IOTestPipelineOptions options) { + return Cluster.builder() + .addContactPoint(options.getCassandraHost()) + .withPort(options.getCassandraPort()) + .build(); + } + + private static void createTable(IOTestPipelineOptions options, String tableName) { + Cluster cluster = null; + Session session = null; + try { + cluster = getCluster(options); + session = cluster.connect(); + + LOG.info("Create {} keyspace if not exists", KEYSPACE); + session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = " + + "{'class':'SimpleStrategy', 'replication_factor':3};"); + + session.execute("USE " + KEYSPACE); + + LOG.info("Create {} table if not exists", tableName); + session.execute("CREATE TABLE IF NOT EXISTS " + tableName + "(id int, name text, PRIMARY " + + "KEY(id))"); + } finally { + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + } + + private static void insertTestData(IOTestPipelineOptions options, String tableName) { + Cluster cluster = null; + Session session = null; + try { + cluster = getCluster(options); + session = cluster.connect(); + + LOG.info("Insert test dataset"); + String[] scientists = { + "Lovelace", + "Franklin", + "Meitner", + "Hopper", + "Curie", + "Faraday", + "Newton", + "Bohr", + "Galilei", + "Maxwell" + }; + for (int i = 0; i < 1000; i++) { + int index = i % scientists.length; + session.execute("INSERT INTO " + KEYSPACE + "." + tableName + "(id, name) values(" + + i + ",'" + scientists[index] + "');"); + } + } finally { + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + } + + public static void cleanUpDataTable(IOTestPipelineOptions options) { + Cluster cluster = null; + Session session = null; + try { + cluster = getCluster(options); + session = cluster.connect(); + session.execute("TRUNCATE TABLE " + KEYSPACE + "." + TABLE_WRITE_NAME); + } finally { + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index d3915c9..387fd22 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -76,4 +76,14 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { Integer getElasticsearchTcpPort(); void setElasticsearchTcpPort(Integer value); + @Description("Host for Cassandra server (host name/ip address)") + @Default.String("cassandra-host") + String getCassandraHost(); + void setCassandraHost(String host); + + @Description("Port for Cassandra server") + @Default.Integer(7001) + Integer getCassandraPort(); + void setCassandraPort(Integer port); + } http://git-wip-us.apache.org/repos/asf/beam/blob/0b0bb3dc/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 9657612..94fc6a7 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -78,6 +78,7 @@ <module>mongodb</module> <module>mqtt</module> <module>xml</module> + <module>cassandra</module> </modules> <profiles>
