This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f6023d8a5b [sample](flink-connector) add full db sync v2 (#12090)
f6023d8a5b is described below

commit f6023d8a5b7764ab6e1635a637ec4b22aedf7ca5
Author: wudi <[email protected]>
AuthorDate: Fri Aug 26 16:13:01 2022 +0800

    [sample](flink-connector) add full db sync v2 (#12090)
    
    * add db sync v2
---
 samples/doris-demo/flink-demo-v1.1/pom.xml         |   5 +
 .../doris/demo/flink/dbsync/DatabaseFullSync.java  | 194 +++++++++++++++++++++
 .../apache/doris/demo/flink/dbsync/JdbcUtil.java   |  84 +++++++++
 3 files changed, 283 insertions(+)

diff --git a/samples/doris-demo/flink-demo-v1.1/pom.xml 
b/samples/doris-demo/flink-demo-v1.1/pom.xml
index 44a7f6304e..156483165b 100644
--- a/samples/doris-demo/flink-demo-v1.1/pom.xml
+++ b/samples/doris-demo/flink-demo-v1.1/pom.xml
@@ -104,6 +104,11 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
new file mode 100644
index 0000000000..417543df49
--- /dev/null
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.dbsync;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+/***
+ *
+ * Synchronize the full database through flink cdc
+ *
+ */
+public class DatabaseFullSync {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DatabaseFullSync.class);
+    private static String HOST = "127.0.0.1";
+    private static String MYSQL_PASSWD = "password";
+    private static int MYSQL_PORT = 3306;
+    private static int DORIS_PORT = 8030;
+    private static String MYSQL_USER = "root";
+
+
+    private static String SYNC_DB = "test";
+    private static String SYNC_TBLS = "test.*";
+    private static String TARGET_DORIS_DB = "test";
+
+    public static void main(String[] args) throws Exception {
+        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
+            .hostname(HOST)
+            .port(MYSQL_PORT)
+            .databaseList(SYNC_DB) // set captured database
+            .tableList(SYNC_TBLS) // set captured table
+            .username(MYSQL_USER)
+            .password(MYSQL_PASSWD)
+            .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+            .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        // enable checkpoint
+        env.enableCheckpointing(10000);
+
+        DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, 
WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
+
+        //get table list
+        List<String> tableList = getTableList();
+        LOG.info("sync table list:{}",tableList);
+        for(String tbl : tableList){
+            SingleOutputStreamOperator<String> filterStream = 
filterTableData(cdcSource, tbl);
+            SingleOutputStreamOperator<String> cleanStream = 
clean(filterStream);
+            DorisSink dorisSink = buildDorisSink(tbl);
+            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
+        }
+        env.execute("Full Database Sync ");
+    }
+
+    /**
+     * Get real data
+     * {
+     *     "before":null,
+     *     "after":{
+     *         "id":1,
+     *         "name":"zhangsan-1",
+     *         "age":18
+     *     },
+     *     "source":{
+     *         "db":"test",
+     *         "table":"test_1",
+     *         ...
+     *     },
+     *     "op":"c",
+     *     ...
+     * }
+     * */
+    private static SingleOutputStreamOperator<String> 
clean(SingleOutputStreamOperator<String> source) {
+        return source.flatMap(new FlatMapFunction<String,String>(){
+            @Override
+            public void flatMap(String row, Collector<String> out) throws 
Exception {
+                try{
+                    JSONObject rowJson = JSON.parseObject(row);
+                    String op = rowJson.getString("op");
+                    //history,insert,update
+                    if(Arrays.asList("r","c","u").contains(op)){
+                        
out.collect(rowJson.getJSONObject("after").toJSONString());
+                    }else{
+                        LOG.info("filter other op:{}",op);
+                    }
+                }catch (Exception ex){
+                    LOG.warn("filter other format binlog:{}",row);
+                }
+            }
+        });
+    }
+
+    /**
+     * Divide according to tablename
+     * */
+    private static SingleOutputStreamOperator<String> 
filterTableData(DataStreamSource<String> source, String table) {
+        return source.filter(new FilterFunction<String>() {
+            @Override
+            public boolean filter(String row) throws Exception {
+                try {
+                    JSONObject rowJson = JSON.parseObject(row);
+                    JSONObject source = rowJson.getJSONObject("source");
+                    String tbl = source.getString("table");
+                    return table.equals(tbl);
+                }catch (Exception ex){
+                    ex.printStackTrace();
+                    return false;
+                }
+            }
+        });
+    }
+
+    /**
+     * Get all MySQL tables that need to be synchronized
+     * */
+    private static List<String> getTableList() {
+        List<String> tables = new ArrayList<>();
+        String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM 
information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";
+        List<JSONObject> tableList = JdbcUtil.executeQuery(HOST, MYSQL_PORT, 
MYSQL_USER, MYSQL_PASSWD, sql);
+        for(JSONObject jsob : tableList){
+            String schemaName = jsob.getString("TABLE_SCHEMA");
+            String tblName = jsob.getString("TABLE_NAME");
+            String schemaTbl = schemaName  + "." + tblName;
+            if(schemaTbl.matches(SYNC_TBLS)){
+                tables.add(tblName);
+            }
+        }
+        return tables;
+    }
+
+    /**
+     * create doris sink
+     * */
+    public static DorisSink buildDorisSink(String table){
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes(HOST + ":" + DORIS_PORT)
+            .setTableIdentifier(TARGET_DORIS_DB + "." + table)
+            .setUsername("root")
+            .setPassword("");
+
+        Properties pro = new Properties();
+        //json data format
+        pro.setProperty("format", "json");
+        pro.setProperty("read_json_by_line", "true");
+        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder()
+            .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload 
label prefix,
+            .setStreamLoadProp(pro).build();
+
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+            .setDorisExecutionOptions(executionOptions)
+            .setSerializer(new SimpleStringSerializer()) //serialize according 
to string
+            .setDorisOptions(dorisBuilder.build());
+
+        return builder.build();
+    }
+}
diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/JdbcUtil.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/JdbcUtil.java
new file mode 100644
index 0000000000..8d1c7c1b12
--- /dev/null
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/JdbcUtil.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.dbsync;
+
+import com.alibaba.fastjson.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JdbcUtil {
+
+    static {
+        try {
+            Class.forName("com.mysql.jdbc.Driver");
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
+
+    public static void main(String[] args) throws SQLException {
+    }
+
+    public static List<JSONObject> executeQuery(String hostUrl, int port, 
String user, String password, String sql){
+        List<JSONObject> beJson = new ArrayList<>();
+        String connectionUrl = 
String.format("jdbc:mysql://%s:%s/",hostUrl,port);
+        Connection con = null;
+        try {
+            con = DriverManager.getConnection(connectionUrl,user,password);
+            PreparedStatement ps = con.prepareStatement(sql);
+            ResultSet rs = ps.executeQuery();
+            beJson = resultSetToJson(rs);
+        } catch (SQLException e) {
+            e.printStackTrace();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                con.close();
+            } catch (Exception e) {
+            }
+        }
+        return beJson;
+    }
+
+    private static List<JSONObject> resultSetToJson(ResultSet rs) throws 
SQLException {
+        List<JSONObject> list = new ArrayList<>();
+        ResultSetMetaData metaData = rs.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        while (rs.next()) {
+            JSONObject jsonObj = new JSONObject();
+            for (int i = 1; i <= columnCount; i++) {
+                String columnName =metaData.getColumnLabel(i);
+                String value = rs.getString(columnName);
+                jsonObj.put(columnName, value);
+            }
+            list.add(jsonObj);
+        }
+        return list;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to