[ 
https://issues.apache.org/jira/browse/CASSANDRA-8844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242064#comment-15242064
 ] 

Joshua McKenzie commented on CASSANDRA-8844:
--------------------------------------------

bq. On recovery, we are going to delete the CDC Commit Logs instead of moving 
them to the CDC Overflow folder; we use ACLSM#deleteUntrackedCommitLogSegment, 
which isn't overwritten for the CDC case
Fixed.

bq. Right now, there is no way to avoid getting a failed allocation even if the 
consumer is matching the speed of the CDC overflow logic. CDC keyspaces will 
have at least 250ms of failure after it has written up to capacity files, even 
though the space could have been reclaimed. I suggest that in 
CommitLogSegmentManagerCDC#discard, we also call maybeUpdateCDCSizeCounterAsync 
so that we update the size, but not too quickly
Fixed, though I also augmented that signature to allow for bypassing the sleep 
interval. Rather than forcing a 250ms wait / throttling like we need on 
mutation application, I think it's reasonable to have no sleep on the discard 
path and immediately recover any unknown free space in the counter if a 
consumer is live.

bq. The reset of recalculating in CLSMCDC#updateCDCDirectorySize should happen 
inside of a finally; for example, if we get an IOException, we will never be 
able to recalculate the CDC directory size. If this is intentional, we should 
make sure that we explicitly flag that decision
Good catch. Changed to put the manager wake and CAS in finally so it shouldn't 
be exposed to a potential hang there.

bq. We aren't actually splitting the space between the regular Commit Log and 
the CDC log, so I'd think we should use the same space for the commit log and 
the CDC log
Not sure I understand here. The data/cdc_overflow and data/cdc are split on 
disk, but not necessarily split as far as us having independent allocation 
space for each directory. Same goes for cdc and commitlog. I'd actually be more 
in favor of allowing tuning of all three rather than glomming cdc w/commitlog. 
Thoughts?

bq. In DropKeyspaceStatement#announceMigration, we should keep the catching of 
the exception as we had before; this check is not sufficient, as it is the same 
as in the validate step. Even though we've passed validation, we could still 
get an exception when we try to update the schema
Reverted. I dislike the flow of the code in this method and I'm fairly sure 
{{ifExists && oldKsm == null}} better reflects the logical intent of what we 
were trying for before (ConfigurationException on non-existant w/ifExists is 
ok), but I concede the point that the new code isn't strictly necessarily in 
terms of this patch and is also subtly behaviorally different.

bq. The FileUtils.createDirectory calls should be in the checks for cdc being 
empty; right now, it only works if saved_caches hasn't been specified
Not sure I follow. It's also in DatabaseDescriptor.createAllDirectories. Could 
you clarify the context of this point a bit?

bq. In Parser.g, do we need to use anything in the value of the map? or can we 
just use a null value?
Done

bq. In Config.java, the change in name from 
commitlog_max_compression_buffers_in_pool to 
commitlog_max_compression_buffers_per_pool isn't compatible for users who used 
that option; we need a NEWS entry for it
Keeping, noted in NEWS.txt. This was an undocumented variable in the .yaml so I 
suspect overrides are limited in the wild. Also added a more formal NEWS.txt 
entry and CHANGES.txt for CDC as a feature

bq. In PropertyDefinitions#getSet, can we just use the keySet instead of 
creating a new HashSet for it?
Fixed. Missed the forest for the trees while implementing that one.

bq. In AbstractCommitLogSegmentManager#start, we should include the type in the 
name of the Thread so that we can tell whether the thread is for the standard 
CL or the CDC CL
Added.

bq. Would be good to add a flag to CommitLogReadErrorReason to tell whether the 
error is recoverable or not; this would explain whether we will check the 
return value or not in CR#readMutation
I augmented the enum names to indicate which are recoverable and which are not 
and extended the interface to support that. I didn't like having those 2 
concepts (recoverable and unrecoverable errors) living in the same method since 
it was rather misleading to have a "shouldStopOnX" with a caller that didn't 
care about your return. In the case of the CommitLogReplayer, it will continue 
to pass that into a single method for logical purposes, but subsequent 
implementers can take more granular action.

bq. Not sure if there is a reason to keep MutationInitiator, it serves a 
similar role to the new ICommitLogReadHandler
Similar, but different enough (hijacking the futures operations for mocking in 
tests) that I'd prefer leaving that to a future effort if we choose to go that 
route.

bq. Don't understand the immediate use case in the comment above 
ICommitLogReadHandler#prepReader
Removed comment - had that as a rough example of what it could be used for, but 
people can dig around and figure that out for themselves so we don't couple the 
comment in the interface w/an implementation detail. Check the code and usage 
of globalPosition in CommitLogReplayer.java for context.

bq. In KeyspaceParams, we should combine those two constructors and just use 
the create() calls where the 2 parameter case is used
Needed a 2 param .create() method that took ReplicationParams. Went ahead and 
created one of those so we have a single ctor, though I'm kind of neutral on 
the topic.

bq. The KeyspaceParams#validate is a "best-effort", since we can do things like 
change the topology on one side of a split and change the CDC DC's on the other
As we discussed offline, since we don't re-validate after reconciliation of a 
partition event, this comes down to a) don't make schema changes during a 
partition event, and b) if you fat-finger the CDC datacenter name while 
performing this update during a time window you shouldn't, well, there's only 
so much we can do. It's trivial to alter and re-create the cdc_datacenters 
param so I say we just acknowledge this here on the ticket and move on.

bq. The comment in CommitLogSegmentManagerCDCTest#testCDCFunctionality about 
the directory structure should also be in the test .yaml
Good point, and done.

bq. In cassandra.yaml above cdc_overflow_directory, should be /data/cdc_overflow
fixed

bq. potentailly -> potentially in cassandra.yaml
fixed

bq. CommitLogTest#testRecovery(byte[]) doesn't look used
It's not, but that's code from CASSANDRA-6018 so I'm not sure whether or not 
Jason had plans to use that. Wasn't added by this patch nor touched by it, so 
going to keep it out of this ticket.

bq. In AbstractCommitLogSegmentManager#awaitManagementTasksCompletion could add 
a new job, and use a signal to signal the current thread. Since they will be 
done, all previous jobs will have been completed
That's already noted in the comment by Jason. Didn't want to scope creep CDC 
effort.

bq. Comment at the end of ACLSM#forceRecycleAll mention that the method has a 
return value, but it is void
Predates CDC, but fixed.

bq. In CommitLog#recoverSegmentManager, the comment above 
manager.allocatingFrom() refers to Standard, works for both Standard and CDC

bq. We should annotate CommitLog#resetUnsafe, stopUnsafe, and restartUnsafe as 
VisibleForTesting
fixed

bq. CommitLogReader#ALL_MUTATIONS should be public, possibly with 
VisibleForTesting since we refer to it in the javadocs for the public method 
CommitLogReader#readCommitLogSegment
Made public, and not necessary to annotate as VisibleForTesting since scope 
wasn't increased to accommodate unit tests. We could also keep it private and 
expose another signature that accepts a CommitLogSegmentPosition without a 
mutationLimit, but for now I'd prefer to keep from having exposed methods that 
aren't used by anyone.

bq. CommitLogReader#readCommitLogSegment should be VisibleForTesting
Added though with minor hesitation. I partially relaxed scope on these methods 
with the notion that future CDC Consumers will write to the CommitLogReader 
interface and will thus need them.

bq. There are a lot of params in javadocs that aren't properly defined, we 
should remove them
If you're referring to the ones I left blank with a hyphen, I basically skipped 
definition of anything that was completely self-identifying to avoid 
redundancy. My approach is that these serve as extra meta-info for IDE's during 
development as I don't know of anyone generating an actual set of API 
documentation from the Javadoc. That being said, I've tidied it up, commented 
the ones that were missing, and normalized the spacing.

bq. In CommitLogReader#readMutation, can replace the catch statement with 
invalidMutations.computeIfAbsent(ex.cfId, id -> new 
AtomicInteger()).incrementAndGet();
This code was copied over from CommitLogReplayer and I wasn't (and still am 
not) really looking to clean up / refactor the guts of the functionality of the 
initial implementation.

bq. Split the CommitLogReplayer constructor on multiple lines
Done, though it barely violated the 120 char constraint. :)

bq. Comments in CommitLogReplayer#construct refer to replay position by 
abbeviation, should be update to CLSP
Fixed.

bq. There are a lot of variable names rp which should be updated because of the 
ReplayPosition rename
I think I got them all now. Don't quote me on that.

bq. There is a comment above CommitLogSegment.Allocation that no longer 
applies; the logic was removed in CASSANDRA-7515, but the comment was not 
updated
Reomved.

bq. In CommitLogSegmentManagerCDC#updateCDCDirectorySize, IEXception should be 
IOException
InterruptedException is to deal with a shutdown interrupting the thread sleep. 
The inner caught IOException is handled by CommitLog.handleCommitError so 
IOException will never propogate up.

bq. Above CLSMCDC#atCapacity, typo: submit should submitted
fixed.

bq. CommitLogSegmentReader#EncryptedSegmenter and 
CommitLogSegmentReader#CompressedSegmenter should share the same constructor 
signature
Fixed.

bq. Interfaces don't normally start with 'I': ICommitLogReadHandler
{{IAsyncCallback}}, {{IAuthenticator}}, {{IBitSet}}, {{ICache}}, 
{{ICompressedFile}}... I could keep going. :) Snarkiness aside, the 
aforementioned don't constitute the "normal" majority of our interface and, 
upon further reflection, it's a tautology. Changed.

bq. Typo in DirectorySizeBench#setUp; "div size" should be "dir size"
Not a typo. 8192/32, "div size". Refined comment to make that more clear.

bq. In CDCStatementTest, both in testAlterAddBadCDCNts and in 
testDropWithCDCFails, we need to make sure that we call fail if we reach the 
end of the try, since we're expecting an exception
Funny you should mention that, since one of the tests was a: missing CDC 
entirely in the statement, and b: trying to catch the wrong kind of exception. 
Fixed.

bq. In CommitLogReaderTest and CommitLogSegmentManagerCDCTest, we should be 
using the junit Assert
Fixed.

bq. In CommitLogSegmentManagerCDCTest#testCDCWriteTimeout, we can just use fail 
at the end of the {{ try { for}} block, and removing the pass = true in catch
Fixed.

[~blambov]: Thanks for the feedback. I'll start working through that tomorrow, 
along with your update from earlier today [~carlyeks].

> Change Data Capture (CDC)
> -------------------------
>
>                 Key: CASSANDRA-8844
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-8844
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Coordination, Local Write-Read Paths
>            Reporter: Tupshin Harper
>            Assignee: Joshua McKenzie
>            Priority: Critical
>             Fix For: 3.x
>
>
> "In databases, change data capture (CDC) is a set of software design patterns 
> used to determine (and track) the data that has changed so that action can be 
> taken using the changed data. Also, Change data capture (CDC) is an approach 
> to data integration that is based on the identification, capture and delivery 
> of the changes made to enterprise data sources."
> -Wikipedia
> As Cassandra is increasingly being used as the Source of Record (SoR) for 
> mission critical data in large enterprises, it is increasingly being called 
> upon to act as the central hub of traffic and data flow to other systems. In 
> order to try to address the general need, we (cc [~brianmhess]), propose 
> implementing a simple data logging mechanism to enable per-table CDC patterns.
> h2. The goals:
> # Use CQL as the primary ingestion mechanism, in order to leverage its 
> Consistency Level semantics, and in order to treat it as the single 
> reliable/durable SoR for the data.
> # To provide a mechanism for implementing good and reliable 
> (deliver-at-least-once with possible mechanisms for deliver-exactly-once ) 
> continuous semi-realtime feeds of mutations going into a Cassandra cluster.
> # To eliminate the developmental and operational burden of users so that they 
> don't have to do dual writes to other systems.
> # For users that are currently doing batch export from a Cassandra system, 
> give them the opportunity to make that realtime with a minimum of coding.
> h2. The mechanism:
> We propose a durable logging mechanism that functions similar to a commitlog, 
> with the following nuances:
> - Takes place on every node, not just the coordinator, so RF number of copies 
> are logged.
> - Separate log per table.
> - Per-table configuration. Only tables that are specified as CDC_LOG would do 
> any logging.
> - Per DC. We are trying to keep the complexity to a minimum to make this an 
> easy enhancement, but most likely use cases would prefer to only implement 
> CDC logging in one (or a subset) of the DCs that are being replicated to
> - In the critical path of ConsistencyLevel acknowledgment. Just as with the 
> commitlog, failure to write to the CDC log should fail that node's write. If 
> that means the requested consistency level was not met, then clients *should* 
> experience UnavailableExceptions.
> - Be written in a Row-centric manner such that it is easy for consumers to 
> reconstitute rows atomically.
> - Written in a simple format designed to be consumed *directly* by daemons 
> written in non JVM languages
> h2. Nice-to-haves
> I strongly suspect that the following features will be asked for, but I also 
> believe that they can be deferred for a subsequent release, and to guage 
> actual interest.
> - Multiple logs per table. This would make it easy to have multiple 
> "subscribers" to a single table's changes. A workaround would be to create a 
> forking daemon listener, but that's not a great answer.
> - Log filtering. Being able to apply filters, including UDF-based filters 
> would make Casandra a much more versatile feeder into other systems, and 
> again, reduce complexity that would otherwise need to be built into the 
> daemons.
> h2. Format and Consumption
> - Cassandra would only write to the CDC log, and never delete from it. 
> - Cleaning up consumed logfiles would be the client daemon's responibility
> - Logfile size should probably be configurable.
> - Logfiles should be named with a predictable naming schema, making it 
> triivial to process them in order.
> - Daemons should be able to checkpoint their work, and resume from where they 
> left off. This means they would have to leave some file artifact in the CDC 
> log's directory.
> - A sophisticated daemon should be able to be written that could 
> -- Catch up, in written-order, even when it is multiple logfiles behind in 
> processing
> -- Be able to continuously "tail" the most recent logfile and get 
> low-latency(ms?) access to the data as it is written.
> h2. Alternate approach
> In order to make consuming a change log easy and efficient to do with low 
> latency, the following could supplement the approach outlined above
> - Instead of writing to a logfile, by default, Cassandra could expose a 
> socket for a daemon to connect to, and from which it could pull each row.
> - Cassandra would have a limited buffer for storing rows, should the listener 
> become backlogged, but it would immediately spill to disk in that case, never 
> incurring large in-memory costs.
> h2. Additional consumption possibility
> With all of the above, still relevant:
> - instead (or in addition to) using the other logging mechanisms, use CQL 
> transport itself as a logger.
> - Extend the CQL protoocol slightly so that rows of data can be return to a 
> listener that didn't explicit make a query, but instead registered itself 
> with Cassandra as a listener for a particular event type, and in this case, 
> the event type would be anything that would otherwise go to a CDC log.
> - If there is no listener for the event type associated with that log, or if 
> that listener gets backlogged, the rows will again spill to the persistent 
> storage.
> h2. Possible Syntax
> {code:sql}
> CREATE TABLE ... WITH CDC LOG
> {code}
> Pros: No syntax extesions
> Cons: doesn't make it easy to capture the various permutations (i'm happy to 
> be proven wrong) of per-dc logging. also, the hypothetical multiple logs per 
> table would break this
> {code:sql}
> CREATE CDC_LOG mylog ON mytable WHERE MyUdf(mycol1, mycol2) = 5 with 
> DCs={'dc1','dc3'}
> {code}
> Pros: Expressive and allows for easy DDL management of all aspects of CDC
> Cons: Syntax additions. Added complexity, partly for features that might not 
> be implemented



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to