[ 
https://issues.apache.org/jira/browse/KUDU-3290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356981#comment-17356981
 ] 

Andrew Wong edited comment on KUDU-3290 at 6/3/21, 11:54 PM:
-------------------------------------------------------------

I'm curious what the use case is for the data stored in Kafka. Is it meant to 
be a physical backup of all ops that are sent to Kudu? A couple of other 
approaches come to mind that don't quite go as into the weeds into Kudu as 
what's outlined here, so I'm curious if they would fit your needs:
 * Periodically run a differential scan in Kudu, similar to an incremental 
backup, but storing the results in Kudu. Kudu's differential backup allows 
users to supply two (relatively recent) timestamps and Kudu will return the 
logical changes to the table that happened between those timestamps. This 
doesn't work that well if the goal is to get every individual mutation between 
the two timestamps, since diff scans summarize the changes between the two 
timestamps. However, if the process performing this ran frequently enough (e.g. 
every few seconds), getting a real-time replication may not be out of the 
picture. 
[Here's|https://github.com/apache/kudu/blob/master/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala#L78]
 an example of Spark using this kind of scan.
 * With whatever ingestion tool is currently writing to Kudu, also write to 
Kafka. Usually we see pipelines built with Spark or Nifi or Streamsets that 
ingest to Kudu – if you have such a pipeline, duplicating the pipeline into 
Kafka would be another relatively quick solution, though with the caveat that 
failures of either system may need to be worked around.

One concern I would have about the built-in Raft replication to an external 
system is the fact that Kudu Raft replicates all writes before applying the 
operations to the underlying data stores. This means that we may replicate 
operations that fail because a row already exists, and that is only caught 
after replication. Depending on how you are using the Kafka replica, I'm not 
sure the best way to handle this. Perhaps that's okay for your usage, but I can 
see it being confusing to replicate row operations that failed in Kudu.


was (Author: andrew.wong):
I'm curious what the use case is for the data stored in Kafka. Is it meant to 
be a physical backup of all ops that are sent to Kudu? A couple of other 
approaches come to mind that don't quite go as into the weeds into Kudu as 
what's outlined here, so I'm curious if they would fit your needs:
 * Periodically run a differential scan in Kudu, similar to an incremental 
backup, but storing the results in Kudu. Kudu's differential backup allows 
users to supply two (relatively recent) timestamps and Kudu will return the 
logical changes to the table that happened between those timestamps. This 
doesn't work that well if the goal is to get every individual mutation between 
the two timestamps, since diff scans summarize the changes between the two 
timestamps. However, if the process performing this ran frequently enough (e.g. 
every few seconds), getting a real-time replication may not be out of the 
picture. 
[Here's|https://github.com/apache/kudu/blob/master/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala#L78]
 an example of Spark using this kind of scan.
 * With whatever ingestion tool is currently writing to Kudu, also write to 
Kafka. Usually we see pipelines built with Spark or Nifi or Streamsets that 
ingest to Kudu – if you have such a pipeline, duplicating the pipeline into 
Kafka would be another relatively quick solution, though with the caveat that 
failures of either system may need to be worked around.

One concern I would have about the built-in replication to an external system 
is the fact that Kudu Raft replicates all writes before applying the operations 
to the underlying data stores. This means that we may replicate operations that 
fail because a row already exists, and that is only caught after replication. 
Depending on how you are using the Kafka replica, I'm not sure the best way to 
handle this. Perhaps that's okay for your usage, but I can see it being 
confusing to replicate row operations that failed in Kudu.

> Implement Replicate table's data to Kafka(or other Storage System)
> ------------------------------------------------------------------
>
>                 Key: KUDU-3290
>                 URL: https://issues.apache.org/jira/browse/KUDU-3290
>             Project: Kudu
>          Issue Type: New Feature
>          Components: tserver
>            Reporter: shenxingwuying
>            Priority: Critical
>
> h1. background & problem
> We use kudu to store the user profile data, because business requirements, 
> exchange and share data from multi-tenant users, which is reasonable in our 
> application scene, we need replicate data from one system to another. The 
> destination storage system we pick kafka, because of our company's 
> architecture at now.
> At this time, we have two ideas to solve it.
> h1. two replication scheme
> Generally, Raft group has three replicas, one is leader and the other two are 
> followers. We’ll add a replica, its role is Learner. Learner only receive all 
> the data, but not pariticipart in ther leadership election.
> The leaner replica, its state machine will be a plugin system, eg:
>  # We can support KuduEngine, which just a data backup like mongodb’s hidden 
> replica.
>  # We can write to the thirdparty store system, like kafka or any other 
> system we need. Then we can replicate data to another system use its client.
> At Paxos has a learner role, which only receive data. we need such a role for 
> new membership.
> But it Kudu Learner has been used for the copying(recovering) tablet replica. 
> Maybe we need a new role name, at this, we still use Learner to represent the 
> new role. (We should think over new role name)
> In our application scene, we will replicate data to kafka, and I will explain 
> the method.
> h2. Learner replication
>  # Add a new replica role, maybe we call it learner, because Paxos has a 
> learner role, which only receive data. We need such a role for new 
> membership. But at Kudu Learner has been used for the copying(recovering) 
> tablet replica. Maybe we need a new role name, at this, we still use Learner 
> to represent the new role. (We should think over new role name)
>  # The voters's safepoint of clean obsoleted wal is min(leader’ max wal 
> sequence number, followers max wal sequence number, learner’ max wal sequence 
> number)
>  # The learner not voter, not partitipant in elections
>  # Raft can replication data to the learner
>  # The process of learner applydb, just like raft followers, the logs before 
> committed index will replicate to kafka, kafka’s response ok. the apply index 
> will increase.
>  # We need kafka client, it will be added to kudu as an option, maybe as an 
> compile option
>  # When a kudu-tserver decomission or corrupted, the learner must move to new 
> kudu-tserver. So the leader should save learner apply OpId, and replicate to 
> followers, when learner's failover when leader down.
>  # The leader must save the learners apply OpId and replicate it to 
> followers, when learner's recovery can make sure no data loss when leader 
> down. If leader no save the applyIndex, learner maybe loss data
>  # Followers save the learners applyindex and term, coz followers maybe 
> become leader.
>  # When load balancer running,we shoud support move learner another 
> kudu-tserver
>  # Table should add a switch option to determine whether raft group has 
> learner, can support setting it when creating table.
>  # Support altering table to add learners maybe an idea, but need solve the 
> base data migrate problem.
>  # Base data migrate. The simple but heavy cost, when learner's max_OpId < 
> committed_OpId (maybe data loss, maybe we alter table add learner replication 
> for a existing table), we can trigger a full scan at the timestamp and 
> replicate data to learner, and then recover the appendEntries flow.
>  # Kudu not support split and merge, we not discuss it now. If KuduSupport 
> split or merge, we can implement it use 12, of course we can use more better 
> method.
>  # If we need the funtion, our cluster should at least 4 tservers.
> If kafka fail or topic not exist, the learner will stop replicate wal, that 
> will occupt more disk space. if learner loss or corrupted, it can recover 
> from the leader. We need make sure the safepoint.
> h2. Leader replication
> We can replication data to kafka or any other storage system from leader 
> directly.
>  # We need not set a role, but the dest is kafka, PeerManager's one peer is 
> different from the others, that will make something complex.
>  # Reuse the leader’s wal, save a output network bandwidth compare to above 
> method.
>  # All replica should maintenance a point to save the apply OpId at kafka.
>  # Safepoint of clean obsoleted wal is min( voters’ max wal sequence number, 
> applyIndex at kafka), which leader save it.
>  # Any leader transfer must recover the apply OpId at kafka.
>  # We need kafka client, it will be added to kudu as an option, may be as an 
> compile option
>  # If kafka's topic or kafka failure, print errorlog. and wal holds
>  # Some process the same as Learner replication. such as base data replicate.
>  # If we need the funtion, our cluster should at least 3 tservers.
> If kafka fail or topic not exist, the leader will stop replicate wal to 
> kafka, that will occupt more disk space. We need make sure the safepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to