[
https://issues.apache.org/jira/browse/BEAM-9008?focusedWorklogId=640878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-640878
]
ASF GitHub Bot logged work on BEAM-9008:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Aug/21 23:40
Start Date: 23/Aug/21 23:40
Worklog Time Spent: 10m
Work Description: vmarquez edited a comment on pull request #10546:
URL: https://github.com/apache/beam/pull/10546#issuecomment-904204549
> Can we expect any breaking user API changes?
No breaking API changes.
>Does this PR contain all necessary tests to test a new functionality and
corner cases?
Yes, the two tests I added test both using readAll with a supplied RingRange
(specified the Read<A> passed in) and Queries (specified with the Read<A>).
>@vmarquez Did you try to run it on a real distributed cluster with a real
data? If yes, could you provide any details on this (if possible)?
Sure! The company I'm working for now has been running this current
version (of my fork) in production for ~six months. We've used it in various
dataflow pipelines hitting a cluster of 5 Scylla servers. For batch runs
we've gotten up to ~3M elements per second running with hundreds of dataflow
worker nodes (not a common occurence but it was done!), for streaming we've
only used it on a much smaller scale, but been doing up to hundreds per second
on two nodes.
>Do we expect any performance degradation (or improvement!) for
CassandraIO.read() since a Read part was mainly rewritten? It would be great to
compare.
For improvements, I think this basically makes some things possible that
just weren't before. If you have billions of rows it's almost untenable to
have to read the entire DB into ram and then filter if you only want a subset.
With readAll you can programatically generate thousands or tens of thousands of
queries (or RingRange) and send it to readAll instead.
I don't expect any performance degredation (after I make one final change
Ismeal has asked for): Previously the group of ring ranges that were queried
were done with async calls. This doesn't make a lot of sense to me since in
essence it 'undoes' the grouping of the ring ranges imo. I talk about this in
the final chance @iemejia asked for.
One of the ways we are able to achieve such good performance for large batch
jobs with this current change is due to being able to linearize the queries and
not overload a specific shard (or core, in Scylla's case). Because we can pass
in a Set of RingRanges to a single Read<A> that might hit the readAll, you can
(with a bit more code not provided here) in essence 'group' multiple Read<A>
'queries' into a single Read<A> query (by deriving the ringrange a query hits).
This is useful as if you are programatically generating 10,000 queries for
instance, you may want to group by what shard they are hitting for optimal
performance.
>Have been all PR's comments addressed? If not, what is missing?
https://github.com/vmarquez/beam/blob/feature/BEAM-9008/cassandraio_readall/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L418
@iemejia wanted me to flatten the results here so to mitgiate the change
from async to sync DB calls: The idea being that instead of a single
`processElement` call having a number of ring ranges to query (but async),
instead we'll have multiple calls to `processElement` each of which is a sync
call.
I agree that is probably most similar to the current behavior and I'm fine
making that change. Happy to hear other thoughts or suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 640878)
Time Spent: 18h 20m (was: 18h 10m)
> Add readAll() method to CassandraIO
> -----------------------------------
>
> Key: BEAM-9008
> URL: https://issues.apache.org/jira/browse/BEAM-9008
> Project: Beam
> Issue Type: New Feature
> Components: io-java-cassandra
> Affects Versions: 2.16.0
> Reporter: vincent marquez
> Assignee: vincent marquez
> Priority: P3
> Time Spent: 18h 20m
> Remaining Estimate: 0h
>
> When querying a large cassandra database, it's often *much* more useful to
> programatically generate the queries needed to to be run rather than reading
> all partitions and attempting some filtering.
> As an example:
> {code:java}
> public class Event {
> @PartitionKey(0) public UUID accountId;
> @PartitionKey(1)public String yearMonthDay;
> @ClusteringKey public UUID eventId;
> //other data...
> }{code}
> If there is ten years worth of data, you may want to only query one year's
> worth. Here each token range would represent one 'token' but all events for
> the day.
> {code:java}
> Set<UUID> accounts = getRelevantAccounts();
> Set<String> dateRange = generateDateRange("2018-01-01", "2019-01-01");
> PCollection<TokenRange> tokens = generateTokens(accounts, dateRange);
> {code}
>
> I propose an additional _readAll()_ PTransform that can take a PCollection
> of token ranges and can return a PCollection<T> of what the query would
> return.
> *Question: How much code should be in common between both methods?*
> Currently the read connector already groups all partitions into a List of
> Token Ranges, so it would be simple to refactor the current read() based
> method to a 'ParDo' based one and have them both share the same function.
> Reasons against sharing code between read and readAll
> * Not having the read based method return a BoundedSource connector would
> mean losing the ability to know the size of the data returned
> * Currently the CassandraReader executes all the grouped TokenRange queries
> *asynchronously* which is (maybe?) fine when all that's happening is
> splitting up all the partition ranges but terrible for executing potentially
> millions of queries.
> Reasons _for_ sharing code would be simplified code base and that both of
> the above issues would most likely have a negligable performance impact.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)