iemejia commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r503788939
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -128,11 +126,18 @@
private CassandraIO() {}
+ private static final String MURMUR3PARTITIONER =
"org.apache.cassandra.dht.Murmur3Partitioner";
Review comment:
Please move this one inside of `SplitFn`.
You would probably need some hack like:
```
PCollection<Read<T>> split = (PCollection<Read<T>>) input.apply("Split",
ParDo.of(new SplitFn()));
return split
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Read", ParDo.of(new ReadFn<>()));
```
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ ReadAll<T> readAll = CassandraIO.readAll();
+
+ return input
+ .apply(Create.of(this))
+ .apply("Split", ParDo.of(new SplitFn()))
+ .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+ .apply("ReadAll", readAll.withCoder(this.coder()));
+ }
+
+ private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+ @ProcessElement
+ public void process(
+ @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>>
outputReceiver) {
+ getRingRanges(read)
+ .forEach(
+ rr ->
+ outputReceiver.output(
+ CassandraIO.<T>read()
+ .withRingRanges(new HashSet<>(rr))
+ .withCoder(coder())
+ .withConsistencyLevel(consistencyLevel())
+ .withEntity(entity())
+ .withHosts(hosts())
+ .withKeyspace(keyspace())
+ .withLocalDc(localDc())
+ .withPort(port())
+ .withPassword(password())
+ .withQuery(query())
+ .withTable(table())
+ .withUsername(username())
+ .withMapperFactoryFn(mapperFactoryFn())));
+ }
+
+ Stream<Set<RingRange>> getRingRanges(Read<T> read) {
Review comment:
return a Set (see comment above)
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -143,6 +148,36 @@ private CassandraIO() {}
return Write.<T>builder(MutationType.DELETE).build();
}
+ /** Get a Cassandra cluster using hosts and port. */
+ static Cluster getCluster(
Review comment:
Why we have a different getCluster method than the one for the Read,
also why were socket options removed there? I think we can maybe move this one
outside to a sort of CassandraUtils package, but that's optional, however the
use of the same method should not be changed by this PR if there is not a
strong reason to do so.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+ private transient Cluster cluster;
+
+ private transient Session session;
+
+ private transient Read<T> lastRead;
+
+ @ProcessElement
+ public void processElement(@Element Read<T> read, OutputReceiver<T>
receiver) {
+ Session session = getSession(read);
+ Mapper<T> mapper = read.mapperFactoryFn().apply(session);
+ String partitionKey =
+
cluster.getMetadata().getKeyspace(read.keyspace().get()).getTable(read.table().get())
+ .getPartitionKey().stream()
+ .map(ColumnMetadata::getName)
+ .collect(Collectors.joining(","));
+
+ String query = generateRangeQuery(read, partitionKey, read.ringRanges() !=
null);
+ PreparedStatement preparedStatement = session.prepare(query);
+ Set<RingRange> ringRanges =
+ read.ringRanges() == null ? Collections.<RingRange>emptySet() :
read.ringRanges().get();
+
+ for (RingRange rr : ringRanges) {
+ Token startToken =
cluster.getMetadata().newToken(rr.getStart().toString());
+ Token endToken = cluster.getMetadata().newToken(rr.getEnd().toString());
+ ResultSet rs =
+ session.execute(preparedStatement.bind().setToken(0,
startToken).setToken(1, endToken));
+ Iterator<T> iter = mapper.map(rs);
+ while (iter.hasNext()) {
+ T n = iter.next();
+ receiver.output(n);
+ }
+ }
+
+ if (read.ringRanges() == null) {
+ ResultSet rs = session.execute(preparedStatement.bind());
+ Iterator<T> iter = mapper.map(rs);
+ while (iter.hasNext()) {
+ receiver.output(iter.next());
+ }
+ }
+ }
+
+ @Teardown
+ public void teardown() {
+ if (session != null) {
+ this.session.close();
+ }
+ if (cluster != null) {
+ this.cluster.close();
+ }
+ }
+
+ private Session getSession(Read<T> read) {
+ if (cluster == null || !reuseCluster(this.lastRead, read)) {
+ this.cluster =
+ CassandraIO.getCluster(
+ read.hosts(),
+ read.port(),
+ read.username(),
+ read.password(),
+ read.localDc(),
+ read.consistencyLevel());
+ }
+ if (session == null || !reuseSession(lastRead, read)) {
+ this.session = this.cluster.connect(read.keyspace().get());
+ }
+ this.lastRead = read;
+ return this.session;
+ }
+
+ private static <T> boolean reuseCluster(Read<T> readA, Read<T> readB) {
+ return readA != null
+ && readA.hosts().get().equals(readB.hosts().get())
+ && readA.port().get().equals(readB.port().get())
+ && ((readA.username() != null &&
readA.username().equals(readB.username()))
+ || (readA.username() == null && readB.username() == null))
+ && ((readA.consistencyLevel() != null
+ && readA.consistencyLevel().equals(readB.consistencyLevel()))
+ || (readA.consistencyLevel() == null && readB.consistencyLevel()
== null))
+ && ((readA.localDc() != null &&
readA.localDc().equals(readB.consistencyLevel()))
+ || (readA.localDc() == null && readB.localDc() == null));
+ }
+
+ // TODO: Unit test
+ private static <T> boolean reuseSession(Read<T> readA, Read<T> readB) {
+ return (readA.keyspace() != null &&
readA.keyspace().equals(readB.keyspace()))
+ || (readA.keyspace() == null && readB.keyspace() == null);
+ }
+
+ /*
+ private static String generateRangeQuery(Read<?> spec, String partitionKey) {
Review comment:
Remove if unused
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -423,396 +550,20 @@ private CassandraIO() {}
}
}
- @VisibleForTesting
- static class CassandraSource<T> extends BoundedSource<T> {
- final Read<T> spec;
- final List<String> splitQueries;
- // split source ached size - can't be calculated when already split
- Long estimatedSize;
- private static final String MURMUR3PARTITIONER =
"org.apache.cassandra.dht.Murmur3Partitioner";
-
- CassandraSource(Read<T> spec, List<String> splitQueries) {
- this(spec, splitQueries, null);
- }
-
- private CassandraSource(Read<T> spec, List<String> splitQueries, Long
estimatedSize) {
- this.estimatedSize = estimatedSize;
- this.spec = spec;
- this.splitQueries = splitQueries;
- }
-
- @Override
- public Coder<T> getOutputCoder() {
- return spec.coder();
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
- return new CassandraReader(this);
- }
-
- @Override
- public List<BoundedSource<T>> split(
- long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
- try (Cluster cluster =
- getCluster(
- spec.hosts(),
- spec.port(),
- spec.username(),
- spec.password(),
- spec.localDc(),
- spec.consistencyLevel(),
- spec.connectTimeout(),
- spec.readTimeout())) {
- if (isMurmur3Partitioner(cluster)) {
- LOG.info("Murmur3Partitioner detected, splitting");
- return splitWithTokenRanges(
- spec, desiredBundleSizeBytes,
getEstimatedSizeBytes(pipelineOptions), cluster);
- } else {
- LOG.warn(
- "Only Murmur3Partitioner is supported for splitting, using a
unique source for "
- + "the read");
- return Collections.singletonList(
- new CassandraIO.CassandraSource<>(spec,
Collections.singletonList(buildQuery(spec))));
- }
- }
- }
-
- private static String buildQuery(Read spec) {
- return (spec.query() == null)
- ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(),
spec.table().get())
- : spec.query().get().toString();
- }
-
- /**
- * Compute the number of splits based on the estimated size and the
desired bundle size, and
- * create several sources.
- */
- private List<BoundedSource<T>> splitWithTokenRanges(
- CassandraIO.Read<T> spec,
- long desiredBundleSizeBytes,
- long estimatedSizeBytes,
- Cluster cluster) {
- long numSplits =
- getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes,
spec.minNumberOfSplits());
- LOG.info("Number of desired splits is {}", numSplits);
-
- SplitGenerator splitGenerator = new
SplitGenerator(cluster.getMetadata().getPartitioner());
- List<BigInteger> tokens =
- cluster.getMetadata().getTokenRanges().stream()
- .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
- .collect(Collectors.toList());
- List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits,
tokens);
- LOG.info("{} splits were actually generated", splits.size());
-
- final String partitionKey =
-
cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
- .getPartitionKey().stream()
- .map(ColumnMetadata::getName)
- .collect(Collectors.joining(","));
-
- List<TokenRange> tokenRanges =
- getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
- final long estimatedSize =
getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
- List<BoundedSource<T>> sources = new ArrayList<>();
- for (List<RingRange> split : splits) {
- List<String> queries = new ArrayList<>();
- for (RingRange range : split) {
- if (range.isWrapping()) {
- // A wrapping range is one that overlaps from the end of the
partitioner range and its
- // start (ie : when the start token of the split is greater than
the end token)
- // We need to generate two queries here : one that goes from the
start token to the end
- // of
- // the partitioner range, and the other from the start of the
partitioner range to the
- // end token of the split.
- queries.add(generateRangeQuery(spec, partitionKey,
range.getStart(), null));
- // Generation of the second query of the wrapping range
- queries.add(generateRangeQuery(spec, partitionKey, null,
range.getEnd()));
- } else {
- queries.add(generateRangeQuery(spec, partitionKey,
range.getStart(), range.getEnd()));
- }
- }
- sources.add(new CassandraIO.CassandraSource<>(spec, queries,
estimatedSize));
- }
- return sources;
- }
-
- private static String generateRangeQuery(
- Read spec, String partitionKey, BigInteger rangeStart, BigInteger
rangeEnd) {
- final String rangeFilter =
- Joiner.on(" AND ")
- .skipNulls()
- .join(
- rangeStart == null
- ? null
- : String.format("(token(%s) >= %d)", partitionKey,
rangeStart),
- rangeEnd == null
- ? null
- : String.format("(token(%s) < %d)", partitionKey,
rangeEnd));
- final String query =
- (spec.query() == null)
- ? buildQuery(spec) + " WHERE " + rangeFilter
- : buildQuery(spec) + " AND " + rangeFilter;
- LOG.debug("CassandraIO generated query : {}", query);
- return query;
- }
-
- private static long getNumSplits(
- long desiredBundleSizeBytes,
- long estimatedSizeBytes,
- @Nullable ValueProvider<Integer> minNumberOfSplits) {
- long numSplits =
- desiredBundleSizeBytes > 0 ? (estimatedSizeBytes /
desiredBundleSizeBytes) : 1;
- if (numSplits <= 0) {
- LOG.warn("Number of splits is less than 0 ({}), fallback to 1",
numSplits);
- numSplits = 1;
- }
- return minNumberOfSplits != null ? Math.max(numSplits,
minNumberOfSplits.get()) : numSplits;
- }
-
- /**
- * Returns cached estimate for split or if missing calculate size for
whole table. Highly
- * innacurate if query is specified.
- *
- * @param pipelineOptions
- * @return
- */
- @Override
- public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
- if (estimatedSize != null) {
- return estimatedSize;
- } else {
- try (Cluster cluster =
- getCluster(
- spec.hosts(),
- spec.port(),
- spec.username(),
- spec.password(),
- spec.localDc(),
- spec.consistencyLevel(),
- spec.connectTimeout(),
- spec.readTimeout())) {
- if (isMurmur3Partitioner(cluster)) {
- try {
- List<TokenRange> tokenRanges =
- getTokenRanges(cluster, spec.keyspace().get(),
spec.table().get());
- this.estimatedSize =
getEstimatedSizeBytesFromTokenRanges(tokenRanges);
- return this.estimatedSize;
- } catch (Exception e) {
- LOG.warn("Can't estimate the size", e);
- return 0L;
- }
- } else {
- LOG.warn("Only Murmur3 partitioner is supported, can't estimate
the size");
- return 0L;
- }
- }
- }
- }
-
- @VisibleForTesting
- static long getEstimatedSizeBytesFromTokenRanges(List<TokenRange>
tokenRanges) {
- long size = 0L;
- for (TokenRange tokenRange : tokenRanges) {
- size = size + tokenRange.meanPartitionSize * tokenRange.partitionCount;
- }
- return Math.round(size / getRingFraction(tokenRanges));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- if (spec.hosts() != null) {
- builder.add(DisplayData.item("hosts", spec.hosts().toString()));
- }
- if (spec.port() != null) {
- builder.add(DisplayData.item("port", spec.port()));
- }
- builder.addIfNotNull(DisplayData.item("keyspace", spec.keyspace()));
- builder.addIfNotNull(DisplayData.item("table", spec.table()));
- builder.addIfNotNull(DisplayData.item("username", spec.username()));
- builder.addIfNotNull(DisplayData.item("localDc", spec.localDc()));
- builder.addIfNotNull(DisplayData.item("consistencyLevel",
spec.consistencyLevel()));
- }
- // ------------- CASSANDRA SOURCE UTIL METHODS ---------------//
-
- /**
- * Gets the list of token ranges that a table occupies on a give Cassandra
node.
- *
- * <p>NB: This method is compatible with Cassandra 2.1.5 and greater.
- */
- private static List<TokenRange> getTokenRanges(Cluster cluster, String
keyspace, String table) {
- try (Session session = cluster.newSession()) {
- ResultSet resultSet =
- session.execute(
- "SELECT range_start, range_end, partitions_count,
mean_partition_size FROM "
- + "system.size_estimates WHERE keyspace_name = ? AND
table_name = ?",
- keyspace,
- table);
-
- ArrayList<TokenRange> tokenRanges = new ArrayList<>();
- for (Row row : resultSet) {
- TokenRange tokenRange =
- new TokenRange(
- row.getLong("partitions_count"),
- row.getLong("mean_partition_size"),
- new BigInteger(row.getString("range_start")),
- new BigInteger(row.getString("range_end")));
- tokenRanges.add(tokenRange);
- }
- // The table may not contain the estimates yet
- // or have partitions_count and mean_partition_size fields = 0
- // if the data was just inserted and the amount of data in the table
was small.
- // This is very common situation during tests,
- // when we insert a few rows and immediately query them.
- // However, for tiny data sets the lack of size estimates is not a
problem at all,
- // because we don't want to split tiny data anyways.
- // Therefore, we're not issuing a warning if the result set was empty
- // or mean_partition_size and partitions_count = 0.
- return tokenRanges;
- }
- }
-
- /** Compute the percentage of token addressed compared with the whole
tokens in the cluster. */
- @VisibleForTesting
- static double getRingFraction(List<TokenRange> tokenRanges) {
- double ringFraction = 0;
- for (TokenRange tokenRange : tokenRanges) {
- ringFraction =
- ringFraction
- + (distance(tokenRange.rangeStart,
tokenRange.rangeEnd).doubleValue()
- /
SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue());
- }
- return ringFraction;
- }
-
- /**
- * Check if the current partitioner is the Murmur3 (default in Cassandra
version newer than 2).
- */
- @VisibleForTesting
- static boolean isMurmur3Partitioner(Cluster cluster) {
- return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
- }
-
- /** Measure distance between two tokens. */
- @VisibleForTesting
- static BigInteger distance(BigInteger left, BigInteger right) {
- return (right.compareTo(left) > 0)
- ? right.subtract(left)
- :
right.subtract(left).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
- }
-
- /**
- * Represent a token range in Cassandra instance, wrapping the partition
count, size and token
- * range.
- */
- @VisibleForTesting
- static class TokenRange {
- private final long partitionCount;
- private final long meanPartitionSize;
- private final BigInteger rangeStart;
- private final BigInteger rangeEnd;
-
- TokenRange(
- long partitionCount, long meanPartitionSize, BigInteger rangeStart,
BigInteger rangeEnd) {
- this.partitionCount = partitionCount;
- this.meanPartitionSize = meanPartitionSize;
- this.rangeStart = rangeStart;
- this.rangeEnd = rangeEnd;
- }
- }
-
- private class CassandraReader extends BoundedSource.BoundedReader<T> {
- private final CassandraIO.CassandraSource<T> source;
- private Cluster cluster;
- private Session session;
- private Iterator<T> iterator;
- private T current;
-
- CassandraReader(CassandraSource<T> source) {
- this.source = source;
- }
-
- @Override
- public boolean start() {
- LOG.debug("Starting Cassandra reader");
- cluster =
- getCluster(
- source.spec.hosts(),
- source.spec.port(),
- source.spec.username(),
- source.spec.password(),
- source.spec.localDc(),
- source.spec.consistencyLevel(),
- source.spec.connectTimeout(),
- source.spec.readTimeout());
- session = cluster.connect(source.spec.keyspace().get());
- LOG.debug("Queries: " + source.splitQueries);
- List<ResultSetFuture> futures = new ArrayList<>();
- for (String query : source.splitQueries) {
- futures.add(session.executeAsync(query));
- }
-
- final Mapper<T> mapper = getMapper(session, source.spec.entity());
-
- for (ResultSetFuture result : futures) {
- if (iterator == null) {
- iterator = mapper.map(result.getUninterruptibly());
- } else {
- iterator = Iterators.concat(iterator,
mapper.map(result.getUninterruptibly()));
- }
- }
-
- return advance();
- }
-
- @Override
- public boolean advance() {
- if (iterator.hasNext()) {
- current = iterator.next();
- return true;
- }
- current = null;
- return false;
- }
-
- @Override
- public void close() {
- LOG.debug("Closing Cassandra reader");
- if (session != null) {
- session.close();
- }
- if (cluster != null) {
- cluster.close();
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (current == null) {
- throw new NoSuchElementException();
- }
- return current;
- }
-
- @Override
- public CassandraIO.CassandraSource<T> getCurrentSource() {
- return source;
- }
-
- private Mapper<T> getMapper(Session session, Class<T> enitity) {
- return source.spec.mapperFactoryFn().apply(session);
- }
- }
- }
-
/** Specify the mutation type: either write or delete. */
public enum MutationType {
WRITE,
DELETE
}
+ /**
+ * Check if the current partitioner is the Murmur3 (default in Cassandra
version newer than 2).
+ */
+ @VisibleForTesting
+ static boolean isMurmur3Partitioner(Cluster cluster) {
Review comment:
Can we move this one into `SplitFn` too since it is not used in other
places.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ ReadAll<T> readAll = CassandraIO.readAll();
+
+ return input
+ .apply(Create.of(this))
+ .apply("Split", ParDo.of(new SplitFn()))
Review comment:
Can you please move this Split into the ReadAll expand method. When not
specified RingRanges like in the `read()` case we should split them on the
`ReadAll` expansion. Of course if RingRanges are specified we will probably
ignore recalculating them.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1281,4 +1032,44 @@ private void waitForFuturesToFinish() throws
ExecutionException, InterruptedExce
}
}
}
+
+ /**
+ * A {@link PTransform} to read data from Apache Cassandra. See {@link
CassandraIO} for more
+ * information on usage and configuration.
+ */
+ @AutoValue
+ public abstract static class ReadAll<T> extends
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+ @Nullable
+ abstract Coder<T> coder();
+
+ abstract ReadAll.Builder<T> builder();
Review comment:
Remove `ReadAll.`
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1281,4 +1032,44 @@ private void waitForFuturesToFinish() throws
ExecutionException, InterruptedExce
}
}
}
+
+ /**
+ * A {@link PTransform} to read data from Apache Cassandra. See {@link
CassandraIO} for more
+ * information on usage and configuration.
+ */
+ @AutoValue
+ public abstract static class ReadAll<T> extends
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+ @Nullable
+ abstract Coder<T> coder();
+
+ abstract ReadAll.Builder<T> builder();
+
+ /** Specify the {@link Coder} used to serialize the entity in the {@link
PCollection}. */
+ public ReadAll<T> withCoder(Coder<T> coder) {
+ checkArgument(coder != null, "coder can not be null");
+ return builder().setCoder(coder).build();
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<Read<T>> input) {
+ checkArgument(coder() != null, "withCoder() is required");
+ return input
+ .apply("Reshuffle", Reshuffle.viaRandomKey())
+ .apply("Read", ParDo.of(new ReadFn<>()))
+ .setCoder(this.coder());
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
Review comment:
Move this up before the `withCoder` method
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ ReadAll<T> readAll = CassandraIO.readAll();
+
+ return input
+ .apply(Create.of(this))
+ .apply("Split", ParDo.of(new SplitFn()))
+ .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+ .apply("ReadAll", readAll.withCoder(this.coder()));
+ }
+
+ private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+ @ProcessElement
+ public void process(
+ @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>>
outputReceiver) {
+ getRingRanges(read)
+ .forEach(
+ rr ->
+ outputReceiver.output(
+ CassandraIO.<T>read()
+ .withRingRanges(new HashSet<>(rr))
+ .withCoder(coder())
+ .withConsistencyLevel(consistencyLevel())
+ .withEntity(entity())
+ .withHosts(hosts())
+ .withKeyspace(keyspace())
+ .withLocalDc(localDc())
+ .withPort(port())
+ .withPassword(password())
+ .withQuery(query())
+ .withTable(table())
+ .withUsername(username())
+ .withMapperFactoryFn(mapperFactoryFn())));
+ }
+
+ Stream<Set<RingRange>> getRingRanges(Read<T> read) {
+ if (read.ringRanges() == null || read.ringRanges().get() == null) {
+ try (Cluster cluster =
+ getCluster(
+ read.hosts(),
+ read.port(),
+ read.username(),
+ read.password(),
+ read.localDc(),
+ read.consistencyLevel())) {
+ if (isMurmur3Partitioner(cluster)) {
+ LOG.info("Murmur3Partitioner detected, splitting");
+ Integer splitCount;
+ if (read.minNumberOfSplits() != null &&
read.minNumberOfSplits().get() != null) {
+ splitCount = read.minNumberOfSplits().get();
+ } else {
+ splitCount = cluster.getMetadata().getAllHosts().size();
+ }
+ List<BigInteger> tokens =
+ cluster.getMetadata().getTokenRanges().stream()
+ .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
+ .collect(Collectors.toList());
+ SplitGenerator splitGenerator =
+ new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+ return splitGenerator.generateSplits(splitCount, tokens).stream()
+ .map(l -> new HashSet<>(l));
+
+ } else {
+ LOG.warn(
+ "Only Murmur3Partitioner is supported for splitting, using
an unique source for "
+ + "the read");
+ String partitioner = cluster.getMetadata().getPartitioner();
+ RingRange totalRingRange =
+ RingRange.of(
+ SplitGenerator.getRangeMin(partitioner),
+ SplitGenerator.getRangeMax(partitioner));
+ return Collections.<Set<RingRange>>singleton(
Review comment:
If we change the type I suppose we can make this return simpler.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1170,4 +898,44 @@ private void waitForFuturesToFinish() throws
ExecutionException, InterruptedExce
}
}
}
+
+ /**
+ * A {@link PTransform} to read data from Apache Cassandra. See {@link
CassandraIO} for more
+ * information on usage and configuration.
+ */
+ @AutoValue
+ public abstract static class ReadAll<T> extends
PTransform<PCollection<Read<T>>, PCollection<T>> {
+
+ @Nullable
+ abstract Coder<T> coder();
+
+ abstract ReadAll.Builder<T> builder();
+
+ /** Specify the {@link Coder} used to serialize the entity in the {@link
PCollection}. */
+ public ReadAll<T> withCoder(Coder<T> coder) {
Review comment:
Right not so beautiful but well we have to live with this :+1:
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.cassandra.CassandraIO.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadFn<T> extends DoFn<Read<T>, T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);
+
+ private transient Cluster cluster;
+
+ private transient Session session;
+
+ private transient Read<T> lastRead;
+
+ @ProcessElement
+ public void processElement(@Element Read<T> read, OutputReceiver<T>
receiver) {
+ Session session = getSession(read);
+ Mapper<T> mapper = read.mapperFactoryFn().apply(session);
+ String partitionKey =
+
cluster.getMetadata().getKeyspace(read.keyspace().get()).getTable(read.table().get())
+ .getPartitionKey().stream()
+ .map(ColumnMetadata::getName)
+ .collect(Collectors.joining(","));
+
+ String query = generateRangeQuery(read, partitionKey, read.ringRanges() !=
null);
+ PreparedStatement preparedStatement = session.prepare(query);
+ Set<RingRange> ringRanges =
+ read.ringRanges() == null ? Collections.<RingRange>emptySet() :
read.ringRanges().get();
+
+ for (RingRange rr : ringRanges) {
+ Token startToken =
cluster.getMetadata().newToken(rr.getStart().toString());
+ Token endToken = cluster.getMetadata().newToken(rr.getEnd().toString());
+ ResultSet rs =
+ session.execute(preparedStatement.bind().setToken(0,
startToken).setToken(1, endToken));
+ Iterator<T> iter = mapper.map(rs);
+ while (iter.hasNext()) {
+ T n = iter.next();
+ receiver.output(n);
+ }
+ }
+
+ if (read.ringRanges() == null) {
+ ResultSet rs = session.execute(preparedStatement.bind());
+ Iterator<T> iter = mapper.map(rs);
+ while (iter.hasNext()) {
+ receiver.output(iter.next());
+ }
+ }
+ }
+
+ @Teardown
+ public void teardown() {
+ if (session != null) {
+ this.session.close();
+ }
+ if (cluster != null) {
+ this.cluster.close();
+ }
+ }
+
+ private Session getSession(Read<T> read) {
+ if (cluster == null || !reuseCluster(this.lastRead, read)) {
+ this.cluster =
+ CassandraIO.getCluster(
+ read.hosts(),
+ read.port(),
+ read.username(),
+ read.password(),
+ read.localDc(),
+ read.consistencyLevel());
+ }
+ if (session == null || !reuseSession(lastRead, read)) {
+ this.session = this.cluster.connect(read.keyspace().get());
+ }
+ this.lastRead = read;
+ return this.session;
+ }
+
+ private static <T> boolean reuseCluster(Read<T> readA, Read<T> readB) {
+ return readA != null
Review comment:
This is still a bit messy, don't you think we can get something similar
by using something like connection pooling and delegate this complexity to the
driver (where it should reside)?
https://docs.datastax.com/en/developer/java-driver/2.1/manual/pooling/
Notice that I am not familiar with this but after a quicklook seems to be
worth the look.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ ReadAll<T> readAll = CassandraIO.readAll();
+
+ return input
+ .apply(Create.of(this))
+ .apply("Split", ParDo.of(new SplitFn()))
+ .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+ .apply("ReadAll", readAll.withCoder(this.coder()));
+ }
+
+ private class SplitFn extends DoFn<Read<T>, Read<T>> {
+
+ @ProcessElement
+ public void process(
+ @Element CassandraIO.Read<T> read, OutputReceiver<Read<T>>
outputReceiver) {
+ getRingRanges(read)
Review comment:
I prefer to assign the result of `getRingRanges` to a Set and assign it
to a variable to help ease potential debugging in the future.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/RingRange.java
##########
@@ -55,4 +58,9 @@ public boolean isWrapping() {
public String toString() {
return String.format("(%s,%s]", start.toString(), end.toString());
}
+
+ public static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {
Review comment:
Any reason not to do this? It looks like a convenience method that if
you depend in other place (outside of Beam) you can easily move there.
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -373,7 +419,86 @@ private CassandraIO() {}
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ ReadAll<T> readAll = CassandraIO.readAll();
+
+ return input
+ .apply(Create.of(this))
+ .apply("Split", ParDo.of(new SplitFn()))
+ .setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() {}))
+ .apply("ReadAll", readAll.withCoder(this.coder()));
+ }
+
+ private class SplitFn extends DoFn<Read<T>, Read<T>> {
Review comment:
`private static class SplitFn<T>`
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
return autoBuild();
}
}
- }
-
- @VisibleForTesting
- static class CassandraSource<T> extends BoundedSource<T> {
- final Read<T> spec;
- final List<String> splitQueries;
- // split source ached size - can't be calculated when already split
- Long estimatedSize;
- private static final String MURMUR3PARTITIONER =
"org.apache.cassandra.dht.Murmur3Partitioner";
-
- CassandraSource(Read<T> spec, List<String> splitQueries) {
- this(spec, splitQueries, null);
- }
-
- private CassandraSource(Read<T> spec, List<String> splitQueries, Long
estimatedSize) {
- this.estimatedSize = estimatedSize;
- this.spec = spec;
- this.splitQueries = splitQueries;
- }
-
- @Override
- public Coder<T> getOutputCoder() {
- return spec.coder();
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
- return new CassandraReader(this);
- }
-
- @Override
- public List<BoundedSource<T>> split(
- long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
- try (Cluster cluster =
- getCluster(
- spec.hosts(),
- spec.port(),
- spec.username(),
- spec.password(),
- spec.localDc(),
- spec.consistencyLevel())) {
- if (isMurmur3Partitioner(cluster)) {
- LOG.info("Murmur3Partitioner detected, splitting");
- return splitWithTokenRanges(
- spec, desiredBundleSizeBytes,
getEstimatedSizeBytes(pipelineOptions), cluster);
- } else {
- LOG.warn(
- "Only Murmur3Partitioner is supported for splitting, using a
unique source for "
- + "the read");
- return Collections.singletonList(
- new CassandraIO.CassandraSource<>(spec,
Collections.singletonList(buildQuery(spec))));
- }
- }
- }
-
- private static String buildQuery(Read spec) {
- return (spec.query() == null)
- ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(),
spec.table().get())
- : spec.query().get().toString();
- }
-
- /**
- * Compute the number of splits based on the estimated size and the
desired bundle size, and
- * create several sources.
- */
- private List<BoundedSource<T>> splitWithTokenRanges(
- CassandraIO.Read<T> spec,
- long desiredBundleSizeBytes,
- long estimatedSizeBytes,
- Cluster cluster) {
- long numSplits =
- getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes,
spec.minNumberOfSplits());
- LOG.info("Number of desired splits is {}", numSplits);
-
- SplitGenerator splitGenerator = new
SplitGenerator(cluster.getMetadata().getPartitioner());
- List<BigInteger> tokens =
- cluster.getMetadata().getTokenRanges().stream()
- .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
- .collect(Collectors.toList());
- List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits,
tokens);
- LOG.info("{} splits were actually generated", splits.size());
-
- final String partitionKey =
-
cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
- .getPartitionKey().stream()
- .map(ColumnMetadata::getName)
- .collect(Collectors.joining(","));
-
- List<TokenRange> tokenRanges =
- getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
- final long estimatedSize =
getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
- List<BoundedSource<T>> sources = new ArrayList<>();
- for (List<RingRange> split : splits) {
- List<String> queries = new ArrayList<>();
- for (RingRange range : split) {
- if (range.isWrapping()) {
Review comment:
What is the take on this? More concretely I really would like we could
test corner cases of splitting like we had in the removed test methods like
`testEstimatedSizeBytesFromTokenRanges` and the others.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]