yuangjiang commented on issue #963:
URL: 
https://github.com/apache/incubator-seatunnel/issues/963#issuecomment-1023908340


   spark cdc is feasible, only need to define a spark source to do a simple 
test. CDC can be developed based on debezium
   similar to this
   class DefaultSource extends StreamSourceProvider with DataSourceRegister 
with Logging {
     override def sourceSchema(sqlContext: SQLContext, schema: 
Option[StructType], providerName: String, parameters: Map[String, String]): 
(String, StructType) = {
       (shortName(),schema.get)
     }
   
     override def createSource(sqlContext: SQLContext, metadataPath: String, 
schema: Option[StructType], providerName: String, parameters: Map[String, 
String]): Source = {
       val debeziumOffset:DebeziumOffset = new DebeziumOffset
       val handover:Handover = new Handover
       val changeConsumer:DebeziumChangeConsumer = new 
DebeziumChangeConsumer(handover)
       val debeziumEngine:SparkDebeziumEngine = new 
SparkDebeziumEngine(debeziumOffset,handover,changeConsumer)
       val javaParameters:java.util.Map[String,String] = new 
util.HashMap[String,String]()
       for (parameter <- parameters){
         javaParameters.put(parameter._1,parameter._2)
       }
       debeziumEngine.EngineInit(javaParameters)
       new DebeziumSource(sqlContext,schema.get,debeziumOffset,debeziumEngine)
     }
   
     override def shortName(): String = "debezium"
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to