vmarquez commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r434318846
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -370,384 +488,16 @@ private CassandraIO() {}
return autoBuild();
}
}
- }
-
- @VisibleForTesting
- static class CassandraSource<T> extends BoundedSource<T> {
- final Read<T> spec;
- final List<String> splitQueries;
- // split source ached size - can't be calculated when already split
- Long estimatedSize;
- private static final String MURMUR3PARTITIONER =
"org.apache.cassandra.dht.Murmur3Partitioner";
-
- CassandraSource(Read<T> spec, List<String> splitQueries) {
- this(spec, splitQueries, null);
- }
-
- private CassandraSource(Read<T> spec, List<String> splitQueries, Long
estimatedSize) {
- this.estimatedSize = estimatedSize;
- this.spec = spec;
- this.splitQueries = splitQueries;
- }
-
- @Override
- public Coder<T> getOutputCoder() {
- return spec.coder();
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions pipelineOptions) {
- return new CassandraReader(this);
- }
-
- @Override
- public List<BoundedSource<T>> split(
- long desiredBundleSizeBytes, PipelineOptions pipelineOptions) {
- try (Cluster cluster =
- getCluster(
- spec.hosts(),
- spec.port(),
- spec.username(),
- spec.password(),
- spec.localDc(),
- spec.consistencyLevel())) {
- if (isMurmur3Partitioner(cluster)) {
- LOG.info("Murmur3Partitioner detected, splitting");
- return splitWithTokenRanges(
- spec, desiredBundleSizeBytes,
getEstimatedSizeBytes(pipelineOptions), cluster);
- } else {
- LOG.warn(
- "Only Murmur3Partitioner is supported for splitting, using a
unique source for "
- + "the read");
- return Collections.singletonList(
- new CassandraIO.CassandraSource<>(spec,
Collections.singletonList(buildQuery(spec))));
- }
- }
- }
-
- private static String buildQuery(Read spec) {
- return (spec.query() == null)
- ? String.format("SELECT * FROM %s.%s", spec.keyspace().get(),
spec.table().get())
- : spec.query().get().toString();
- }
-
- /**
- * Compute the number of splits based on the estimated size and the
desired bundle size, and
- * create several sources.
- */
- private List<BoundedSource<T>> splitWithTokenRanges(
- CassandraIO.Read<T> spec,
- long desiredBundleSizeBytes,
- long estimatedSizeBytes,
- Cluster cluster) {
- long numSplits =
- getNumSplits(desiredBundleSizeBytes, estimatedSizeBytes,
spec.minNumberOfSplits());
- LOG.info("Number of desired splits is {}", numSplits);
-
- SplitGenerator splitGenerator = new
SplitGenerator(cluster.getMetadata().getPartitioner());
- List<BigInteger> tokens =
- cluster.getMetadata().getTokenRanges().stream()
- .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
- .collect(Collectors.toList());
- List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits,
tokens);
- LOG.info("{} splits were actually generated", splits.size());
-
- final String partitionKey =
-
cluster.getMetadata().getKeyspace(spec.keyspace().get()).getTable(spec.table().get())
- .getPartitionKey().stream()
- .map(ColumnMetadata::getName)
- .collect(Collectors.joining(","));
-
- List<TokenRange> tokenRanges =
- getTokenRanges(cluster, spec.keyspace().get(), spec.table().get());
- final long estimatedSize =
getEstimatedSizeBytesFromTokenRanges(tokenRanges) / splits.size();
-
- List<BoundedSource<T>> sources = new ArrayList<>();
- for (List<RingRange> split : splits) {
- List<String> queries = new ArrayList<>();
- for (RingRange range : split) {
- if (range.isWrapping()) {
Review comment:
Hm, let me think about this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]