[
https://issues.apache.org/jira/browse/BEAM-9008?focusedWorklogId=640877&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-640877
]
ASF GitHub Bot logged work on BEAM-9008:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Aug/21 23:35
Start Date: 23/Aug/21 23:35
Worklog Time Spent: 10m
Work Description: vmarquez commented on pull request #10546:
URL: https://github.com/apache/beam/pull/10546#issuecomment-904204549
> Can we expect any breaking user API changes?
No breaking API changes.
>Does this PR contain all necessary tests to test a new functionality and
corner cases?
Yes, the two tests I added test both using RingRanges and Queries.
>@vmarquez Did you try to run it on a real distributed cluster with a real
data? If yes, could you provide any details on this (if possible)?
Sure! The company I'm working for now has been running this current
version (of my fork) in production for six months. We've used it in various
dataflow pipelines on a cluster of 5 Scylla servers. For batch runs we've
gotten up to ~3M elements per second running with hundreds of dataflow worker
nodes, for streaming we've only used it on a much smaller sacle, but been doing
up to hundreds per second on two nodes.
>Do we expect any performance degradation (or improvement!) for
CassandraIO.read() since a Read part was mainly rewritten? It would be great to
compare.
For improvements, I think this basically makes some things possible that
just weren't before. If you have billions of rows it's almost untenable to
have to read the entire DB into ram and then filter if you only want a subset.
With readAll you can programatically generate thousands or tens of thousands of
queries and send it to readAll instead.
I don't expect any performance degredation (after I make one final change
Ismeal has asked for): Previously the group of ring ranges that were queried
were done with async calls. This doesn't make a lot of sense since in essence
it 'undoes' the splitting of the ring ranges (why not just make more splits and
have more sync calls?)
One of the ways we are able to achieve such good performance for large batch
jobs with this current change is due to being able to linearize the queries and
not overload a specific shard (or core, in Scylla's case).
>Have been all PR's comments addressed? If not, what is missing?
https://github.com/vmarquez/beam/blob/feature/BEAM-9008/cassandraio_readall/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L418
@iemejia wanted me to flatten the results here so to mitgiate the change
from async to sync DB calls: The idea being that instead of a single
`processElement` call having a number of ring ranges to query (but async),
instead we'll have multiple calls to `processElement` each of which is a sync
call.
I agree that is probably most similar to the current behavior and I'm fine
making that change. Happy to hear other thoughts or suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 640877)
Time Spent: 18h 10m (was: 18h)
> Add readAll() method to CassandraIO
> -----------------------------------
>
> Key: BEAM-9008
> URL: https://issues.apache.org/jira/browse/BEAM-9008
> Project: Beam
> Issue Type: New Feature
> Components: io-java-cassandra
> Affects Versions: 2.16.0
> Reporter: vincent marquez
> Assignee: vincent marquez
> Priority: P3
> Time Spent: 18h 10m
> Remaining Estimate: 0h
>
> When querying a large cassandra database, it's often *much* more useful to
> programatically generate the queries needed to to be run rather than reading
> all partitions and attempting some filtering.
> As an example:
> {code:java}
> public class Event {
> @PartitionKey(0) public UUID accountId;
> @PartitionKey(1)public String yearMonthDay;
> @ClusteringKey public UUID eventId;
> //other data...
> }{code}
> If there is ten years worth of data, you may want to only query one year's
> worth. Here each token range would represent one 'token' but all events for
> the day.
> {code:java}
> Set<UUID> accounts = getRelevantAccounts();
> Set<String> dateRange = generateDateRange("2018-01-01", "2019-01-01");
> PCollection<TokenRange> tokens = generateTokens(accounts, dateRange);
> {code}
>
> I propose an additional _readAll()_ PTransform that can take a PCollection
> of token ranges and can return a PCollection<T> of what the query would
> return.
> *Question: How much code should be in common between both methods?*
> Currently the read connector already groups all partitions into a List of
> Token Ranges, so it would be simple to refactor the current read() based
> method to a 'ParDo' based one and have them both share the same function.
> Reasons against sharing code between read and readAll
> * Not having the read based method return a BoundedSource connector would
> mean losing the ability to know the size of the data returned
> * Currently the CassandraReader executes all the grouped TokenRange queries
> *asynchronously* which is (maybe?) fine when all that's happening is
> splitting up all the partition ranges but terrible for executing potentially
> millions of queries.
> Reasons _for_ sharing code would be simplified code base and that both of
> the above issues would most likely have a negligable performance impact.
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)