This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c33321f  [Feature][DataX] Implementation Datax doriswriter plugin 
(#6107)
c33321f is described below

commit c33321ff42d98527c382d092b0e8c946f39a7ccf
Author: huzk <[email protected]>
AuthorDate: Thu Jul 8 09:33:02 2021 +0800

    [Feature][DataX] Implementation Datax doriswriter plugin (#6107)
---
 extension/DataX/doriswriter/doc/doriswriter.md     | 147 ++++++++++++
 extension/DataX/doriswriter/pom.xml                | 104 ++++++++
 .../doriswriter/src/main/assembly/package.xml      |  35 +++
 .../plugin/writer/doriswriter/DorisFlushBatch.java |  47 ++++
 .../plugin/writer/doriswriter/DorisJsonCodec.java  |  88 +++++++
 .../plugin/writer/doriswriter/DorisWriter.java     | 261 +++++++++++++++++++++
 .../writer/doriswriter/DorisWriterEmitter.java     | 177 ++++++++++++++
 .../datax/plugin/writer/doriswriter/Key.java       | 123 ++++++++++
 .../doriswriter/src/main/resources/plugin.json     |   6 +
 .../src/main/resources/plugin_job_template.json    |  15 ++
 10 files changed, 1003 insertions(+)

diff --git a/extension/DataX/doriswriter/doc/doriswriter.md 
b/extension/DataX/doriswriter/doc/doriswriter.md
new file mode 100644
index 0000000..e58eff2
--- /dev/null
+++ b/extension/DataX/doriswriter/doc/doriswriter.md
@@ -0,0 +1,147 @@
+# DorisWriter 插件文档
+
+## 1 快速介绍
+DorisWriter支持将大批量数据写入Doris中。
+
+## 2 实现原理
+DorisWriter 通过Doris原生支持Stream load方式导入数据, 
DorisWriter会将`reader`读取的数据进行缓存在内存中,拼接成Json文本,然后批量导入至Doris。
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+这里是一份从Stream读取数据后导入至Doris的配置文件。
+
+```
+{
+    "job": {
+        "setting": {
+            "speed": {
+                "channel": 1
+            },
+            "errorLimit": {
+                "record": 0,
+                "percentage": 0
+            }
+        },
+        "content": [
+            {
+                "reader": {
+                    "name": "streamreader",
+                    "parameter": {
+                        "column": [
+                            {
+                                "value": "皮蛋1",
+                                "type": "string"
+                            },
+                            {
+                                "value": "皮蛋2",
+                                "type": "string"
+                            },
+                            {
+                                "value": "111",
+                                "type": "long"
+                            },
+                            {
+                                "value": "222",
+                                "type": "long"
+                            }
+                        ],
+                        "sliceRecordCount": 100
+                    }
+                },
+                "writer": {
+                    "name": "doriswriter",
+                    "parameter": {
+                        "username": "dxx",
+                        "password": "123456",
+                        "database": "test",
+                        "table": "datax_test",
+                        "column": [
+                            "k1",
+                            "k2",
+                            "v1",
+                            "v2"
+                        ],
+                        "preSql": [],
+                        "postSql": [],
+                        "jdbcUrl": "jdbc:mysql://10.93.6.247:9030/",
+                        "beLoadUrl": [
+                            "10.93.6.167:8041"
+                        ],
+                        "loadProps": {
+                        }
+                    }
+                }
+            }
+        ]
+    }
+}
+```
+
+
+
+### 3.2 参数说明
+
+* **username**
+
+  - 描述:访问Doris数据库的用户名
+  - 必选:是
+  - 默认值:无
+
+* **password**
+
+  - 描述:访问Doris数据库的密码
+  - 必选:是
+  - 默认值:无
+
+* **database**
+
+  - 描述:访问Doris表的数据库名称。
+  - 必选:是
+  - 默认值:无
+
+* **table**
+
+  - 描述:访问Doris表的表名称。
+  - 必选:是
+  - 默认值:无
+
+* **beLoadUrl**
+
+  - 描述:Doris BE的地址用于Stream load,可以为多个BE地址,形如`BE_ip:Be_webserver_port`。
+  - 必选:是
+  - 默认值:无
+
+* **column**
+
+  - 描述:目的表**需要写入数据**的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
+  - 必选:是
+  - 默认值:否
+
+* **preSql**
+
+  - 描述:写入数据到目的表前,会先执行这里的标准语句。
+  - 必选:否
+  - 默认值:无
+
+* **postSql**
+
+  - 描述:写入数据到目的表后,会执行这里的标准语句。
+  - 必选:否
+  - 默认值:无
+
+* **jdbcUrl**
+
+  - 描述:目的数据库的 JDBC 连接信息,用于执行`preSql`及`postSql`。
+  - 必选:否
+  - 默认值:无
+
+* **loadProps**
+
+  - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
+  - 必选:否
+  - 默认值:无
+
+
+
diff --git a/extension/DataX/doriswriter/pom.xml 
b/extension/DataX/doriswriter/pom.xml
new file mode 100644
index 0000000..f4b4ea5
--- /dev/null
+++ b/extension/DataX/doriswriter/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>datax-all</artifactId>
+        <groupId>com.alibaba.datax</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>doriswriter</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.datax</groupId>
+            <artifactId>datax-common</artifactId>
+            <version>${datax-project-version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.datax</groupId>
+            <artifactId>plugin-rdbms-util</artifactId>
+            <version>${datax-project-version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.driver.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.3</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <!-- compiler plugin -->
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>${jdk-version}</source>
+                    <target>${jdk-version}</target>
+                    <encoding>${project-sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+
+            <!-- assembly plugin -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/package.xml</descriptor>
+                    </descriptors>
+                    <finalName>datax</finalName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>dwzip</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/extension/DataX/doriswriter/src/main/assembly/package.xml 
b/extension/DataX/doriswriter/src/main/assembly/package.xml
new file mode 100644
index 0000000..4edb15a
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+<assembly
+        
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
 http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+    <id></id>
+    <formats>
+        <format>dir</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>src/main/resources</directory>
+            <includes>
+                <include>plugin.json</include>
+                <include>plugin_job_template.json</include>
+            </includes>
+            <outputDirectory>plugin/writer/doriswriter</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>target/</directory>
+            <includes>
+                <include>doriswriter-0.0.1-SNAPSHOT.jar</include>
+            </includes>
+            <outputDirectory>plugin/writer/doriswriter</outputDirectory>
+        </fileSet>
+    </fileSets>
+
+    <dependencySets>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <outputDirectory>plugin/writer/doriswriter/libs</outputDirectory>
+            <scope>runtime</scope>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
new file mode 100644
index 0000000..7a9638d
--- /dev/null
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
@@ -0,0 +1,47 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+ */
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import java.util.List;
+
+public class DorisFlushBatch
+{
+       private String label;
+       private Long bytes;
+       private List<String> rows;
+
+       public DorisFlushBatch(final String label, final Long bytes, final 
List<String> rows) {
+               this.label = label;
+               this.bytes = bytes;
+               this.rows = rows;
+       }
+
+       public String getLabel() {
+               return this.label;
+       }
+
+       public Long getBytes() {
+               return this.bytes;
+       }
+
+       public List<String> getRows() {
+               return this.rows;
+       }
+}
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
new file mode 100644
index 0000000..c73a8d5
--- /dev/null
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -0,0 +1,88 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+ */
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.DateColumn;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+public class DorisJsonCodec {
+
+    private static String timeZone = "GMT+8";
+    private static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
+
+    private final List<String> fieldNames;
+
+    public DorisJsonCodec(final List<String> fieldNames) {
+        this.fieldNames = fieldNames;
+    }
+
+    public String serialize(final Record row) {
+        if (null == this.fieldNames) {
+            return "";
+        }
+        final Map<String, Object> rowMap = new HashMap<String, 
Object>(this.fieldNames.size());
+        int idx = 0;
+        for (final String fieldName : this.fieldNames) {
+            rowMap.put(fieldName, 
this.columnConvert2String(row.getColumn(idx)));
+            ++idx;
+        }
+        return JSON.toJSONString(rowMap);
+    }
+
+
+    /**
+     *  convert datax internal  data to string
+     *
+     * @param col
+     * @return
+     */
+    private String columnConvert2String(final Column col) {
+        if (null == col.getRawData()) {
+            return null;
+        }
+        if (Column.Type.BOOL == col.getType()) {
+            return String.valueOf(col.asLong());
+        }
+        if (Column.Type.DATE != col.getType()) {
+            return col.asString();
+        }
+        final DateColumn.DateType type = ((DateColumn) col).getSubType();
+        if (type == DateColumn.DateType.DATE) {
+            return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", 
timeZoner);
+        }
+        if (type == DateColumn.DateType.TIME) {
+            return DateFormatUtils.format(col.asDate(), "HH:mm:ss", timeZoner);
+        }
+        if (type == DateColumn.DateType.DATETIME) {
+            return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", 
timeZoner);
+        }
+        return null;
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
new file mode 100644
index 0000000..5148dcd
--- /dev/null
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -0,0 +1,261 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+ */
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+public class DorisWriter extends Writer {
+    public DorisWriter() {
+    }
+
+    public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
+        private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.Task.class);
+
+        private DorisWriterEmitter dorisWriterEmitter;
+        private Key keys;
+        private DorisJsonCodec rowCodec;
+
+
+        public Task() {
+        }
+
+        @Override
+        public void init() {
+            this.keys = new Key(super.getPluginJobConf());
+            this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
+            this.dorisWriterEmitter = new DorisWriterEmitter(keys);
+        }
+
+        @Override
+        public void prepare() {
+        }
+
+        @Override
+        public void startWrite(RecordReceiver recordReceiver) {
+            try {
+                List<String> buffer = new ArrayList<>();
+                int batchCount = 0;
+                long batchByteSize = 0L;
+                Record record;
+                // loop to get record from datax
+                while ((record = recordReceiver.getFromReader()) != null) {
+                    // check column size
+                    if (record.getColumnNumber() != 
this.keys.getColumns().size()) {
+                        throw 
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+                                String.format("config writer column info 
error. because  the column number of  reader is  :%s and the column number of 
writer is:%s . please check you datax job config json.", 
record.getColumnNumber(), this.keys.getColumns().size()));
+                    }
+                    // codec record
+                    final String recordStr = this.rowCodec.serialize(record);
+                    // put into buffer
+                    buffer.add(recordStr);
+                    batchCount += 1;
+                    batchByteSize += recordStr.getBytes().length;
+                    // trigger buffer
+                    if (batchCount >= this.keys.getBatchRows() || 
batchByteSize >= this.keys.getBatchByteSize()) {
+                        // generate doris stream load label
+                        final String label = getStreamLoadLabel();
+                        LOG.debug(String.format("Doris buffer Sinking 
triggered: rows[%d] label[%s].", batchCount, label));
+                        final DorisFlushBatch flushBatch = new 
DorisFlushBatch(label, batchByteSize, buffer);
+                        dorisWriterEmitter.doStreamLoad(flushBatch);
+                        // clear buffer
+                        batchCount = 0;
+                        batchByteSize = 0L;
+                        buffer.clear();
+                    }
+                }
+                if (buffer.size() > 0) {
+                    final DorisFlushBatch flushBatch = new 
DorisFlushBatch(getStreamLoadLabel(), batchByteSize, buffer);
+                    dorisWriterEmitter.doStreamLoad(flushBatch);
+                }
+
+            } catch (Exception e) {
+                throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+            }
+        }
+
+        private String getStreamLoadLabel() {
+            return "datax_doris_writer_" + UUID.randomUUID().toString();
+        }
+
+        @Override
+        public void post() {
+
+        }
+
+        @Override
+        public void destroy() {
+        }
+
+        @Override
+        public boolean supportFailOver() {
+            return false;
+        }
+    }
+
+    public static class Job extends com.alibaba.datax.common.spi.Writer.Job {
+        private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.Job.class);
+        private Configuration originalConfig = null;
+        private Key keys;
+
+        public Job() {
+        }
+
+        @Override
+        public void init() {
+            this.originalConfig = super.getPluginJobConf();
+            this.keys = new Key(super.getPluginJobConf());
+            this.keys.doPretreatment();
+        }
+
+        @Override
+        public void preCheck() {
+            this.init();
+            this.preCheckPrePareSQL(this.keys);
+            this.preCheckPostSQL(this.keys);
+        }
+
+        @Override
+        public void prepare() {
+            String username = this.keys.getUsername();
+            String password = this.keys.getPassword();
+            String jdbcUrl = this.keys.getJdbcUrl();
+            List<String> renderedPreSqls = 
this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable());
+            if (!renderedPreSqls.isEmpty()) {
+                Connection conn = DBUtil.getConnection(DataBaseType.MySql, 
jdbcUrl, username, password);
+                LOG.info("prepare execute preSqls:[{}]. doris jdbc url为:{}.", 
String.join(";", renderedPreSqls), jdbcUrl);
+                this.executeSqls(conn, renderedPreSqls);
+                DBUtil.closeDBResources(null, null, conn);
+            }
+
+        }
+
+        @Override
+        public List<Configuration> split(int mandatoryNumber) {
+            List<Configuration> configurations = new 
ArrayList<>(mandatoryNumber);
+
+            for (int i = 0; i < mandatoryNumber; ++i) {
+                configurations.add(this.originalConfig);
+            }
+
+            return configurations;
+        }
+
+        @Override
+        public void post() {
+            String username = this.keys.getUsername();
+            String password = this.keys.getPassword();
+            String jdbcUrl = this.keys.getJdbcUrl();
+            List<String> renderedPostSqls = 
this.renderPreOrPostSqls(this.keys.getPostSqlList(), this.keys.getTable());
+            if (!renderedPostSqls.isEmpty()) {
+                Connection conn = DBUtil.getConnection(DataBaseType.MySql, 
jdbcUrl, username, password);
+                LOG.info("prepare execute postSqls:[{}]. doris jdbc url为:{}.", 
String.join(";", renderedPostSqls), jdbcUrl);
+                this.executeSqls(conn, renderedPostSqls);
+                DBUtil.closeDBResources(null, null, conn);
+            }
+
+        }
+
+        @Override
+        public void destroy() {
+        }
+
+        private List<String> renderPreOrPostSqls(final List<String> 
preOrPostSqls, final String tableName) {
+            if (null == preOrPostSqls) {
+                return Collections.emptyList();
+            }
+            final List<String> renderedSqls = new ArrayList<>();
+            for (final String sql : preOrPostSqls) {
+                if (!Strings.isNullOrEmpty(sql)) {
+                    
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
+                }
+            }
+            return renderedSqls;
+        }
+
+        private void executeSqls(final Connection conn, final List<String> 
sqls) {
+            Statement stmt = null;
+            String currentSql = null;
+            try {
+                stmt = conn.createStatement();
+                for (String s : sqls) {
+                    final String sql = currentSql = s;
+                    DBUtil.executeSqlWithoutResultSet(stmt, sql);
+                }
+            } catch (Exception e) {
+                throw RdbmsException.asQueryException(DataBaseType.MySql, e, 
currentSql, null, null);
+            } finally {
+                DBUtil.closeDBResources(null, stmt, null);
+            }
+        }
+
+        private void preCheckPrePareSQL(final Key keys) {
+            final String table = keys.getTable();
+            final List<String> preSqls = keys.getPreSqlList();
+            final List<String> renderedPreSqls = renderPreOrPostSqls(preSqls, 
table);
+            if (!renderedPreSqls.isEmpty()) {
+                LOG.info("prepare check preSqls:[{}].", String.join(";", 
renderedPreSqls));
+                for (final String sql : renderedPreSqls) {
+                    try {
+                        DBUtil.sqlValid(sql, DataBaseType.MySql);
+                    } catch (ParserException e) {
+                        throw 
RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql);
+                    }
+                }
+            }
+        }
+
+        private void preCheckPostSQL(final Key keys) {
+            final String table = keys.getTable();
+            final List<String> postSqls = keys.getPostSqlList();
+            final List<String> renderedPostSqls = 
renderPreOrPostSqls(postSqls, table);
+            if (!renderedPostSqls.isEmpty()) {
+                LOG.info("prepare check postSqls:[{}].", String.join(";", 
renderedPostSqls));
+                for (final String sql : renderedPostSqls) {
+                    try {
+                        DBUtil.sqlValid(sql, DataBaseType.MySql);
+                    } catch (ParserException e) {
+                        throw 
RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql);
+                    }
+                }
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
new file mode 100644
index 0000000..70792e3
--- /dev/null
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -0,0 +1,177 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+ */
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+
+public class DorisWriterEmitter {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriterEmitter.class);
+       ;
+       private final Key keys;
+       private int pos;
+
+
+       public DorisWriterEmitter(final Key keys) {
+               this.keys = keys;
+       }
+
+
+       /**
+        * execute doris stream load
+        */
+       public void doStreamLoad(final DorisFlushBatch flushData) throws 
IOException {
+               final String host = this.getAvailableHost();
+               if (null == host) {
+                       throw new IOException("None of the host in `beLoadUrl` 
could be connected.");
+               }
+               final String loadUrl = host + "/api/" + this.keys.getDatabase() 
+ "/" + this.keys.getTable() + "/_stream_load";
+               LOG.info(String.format("Start to join batch data: rows[%d] 
bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), 
flushData.getLabel()));
+               // do http put request
+               final Map<String, Object> loadResult = this.doHttpPut(loadUrl, 
flushData.getLabel(), this.mergeRows(flushData.getRows()));
+               // get response
+               final String keyStatus = "Status";
+               if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+                       throw new IOException("Unable to flush data to doris: 
unknown result status.");
+               }
+               LOG.info("StreamLoad response:\n" + 
JSON.toJSONString(loadResult));
+               if (loadResult.get(keyStatus).equals("Fail")) {
+                       throw new IOException("Failed to flush data to 
doris.\n" + JSON.toJSONString(loadResult));
+               }
+       }
+
+       /**
+        * loop to get  be host
+        * @return
+        */
+       private String getAvailableHost() {
+               final List<String> hostList = this.keys.getBeLoadUrlList();
+               if (this.pos >= hostList.size()) {
+                       this.pos = 0;
+               }
+               while (this.pos < hostList.size()) {
+                       final String host = "http://"; + hostList.get(this.pos);
+                       if (this.tryHttpConnection(host)) {
+                               return host;
+                       }
+                       ++this.pos;
+               }
+               return null;
+       }
+
+       private boolean tryHttpConnection(final String host) {
+               try {
+                       final URL url = new URL(host);
+                       final HttpURLConnection co = (HttpURLConnection) 
url.openConnection();
+                       co.setConnectTimeout(1000);
+                       co.connect();
+                       co.disconnect();
+                       return true;
+               } catch (Exception e) {
+                       LOG.warn("Failed to connect to address:{} , Exception 
={}", host, e);
+                       return false;
+               }
+       }
+
+       private byte[] mergeRows(final List<String> rows) {
+               final StringJoiner stringJoiner = new StringJoiner(",", "[", 
"]");
+               for (final String row : rows) {
+                       stringJoiner.add(row);
+               }
+               return stringJoiner.toString().getBytes(StandardCharsets.UTF_8);
+       }
+
+       private Map<String, Object> doHttpPut(final String loadUrl, final 
String label, final byte[] data) throws IOException {
+               LOG.info(String.format("Executing stream load to: '%s', size: 
'%s'", loadUrl, data.length));
+               final HttpClientBuilder httpClientBuilder = 
HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
+                       @Override
+                       protected boolean isRedirectable(final String method) {
+                               return true;
+                       }
+               });
+               try (final CloseableHttpClient httpclient = 
httpClientBuilder.build()) {
+                       final HttpPut httpPut = new HttpPut(loadUrl);
+                       final List<String> cols = this.keys.getColumns();
+                       if (null != cols && !cols.isEmpty()) {
+                               httpPut.setHeader("columns", String.join(",", 
cols));
+                       }
+                       // put loadProps to http header
+                       final Map<String, Object> loadProps = 
this.keys.getLoadProps();
+                       if (null != loadProps) {
+                               for (final Map.Entry<String, Object> entry : 
loadProps.entrySet()) {
+                                       httpPut.setHeader(entry.getKey(), 
String.valueOf(entry.getValue()));
+                               }
+                       }
+                       httpPut.setHeader("Expect", "100-continue");
+                       httpPut.setHeader("label", label);
+                       httpPut.setHeader("Content-Type", 
"application/x-www-form-urlencoded");
+                       httpPut.setHeader("Authorization", 
this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
+                       httpPut.setHeader("format", "json");
+                       httpPut.setHeader("strip_outer_array", "true");
+                       httpPut.setEntity(new ByteArrayEntity(data));
+                       
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+                       try (final CloseableHttpResponse resp = 
httpclient.execute(httpPut)) {
+                               final int code = 
resp.getStatusLine().getStatusCode();
+                               if (HttpStatus.SC_OK != code) {
+                                       LOG.warn("Request failed with code:{}", 
code);
+                                       return null;
+                               }
+                               final HttpEntity respEntity = resp.getEntity();
+                               if (null == respEntity) {
+                                       LOG.warn("Request failed with empty 
response.");
+                                       return null;
+                               }
+                               return (Map<String, Object>) 
JSON.parse(EntityUtils.toString(respEntity));
+                       }
+               }
+       }
+
+       private String getBasicAuthHeader(final String username, final String 
password) {
+               final String auth = username + ":" + password;
+               final byte[] encodedAuth = 
Base64.getEncoder().encode(auth.getBytes());
+               return "Basic " + new String(encodedAuth);
+       }
+
+
+}
\ No newline at end of file
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
new file mode 100644
index 0000000..f670e59
--- /dev/null
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
@@ -0,0 +1,123 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+ */
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class Key implements Serializable
+{
+    public static final String JDBC_URL = "jdbcUrl";
+    public static final String DATABASE = "database";
+    public static final String TABLE = "table";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+    public static final String BE_LOAD_URL = "beLoadUrl";
+    public static final String COLUMN = "column";
+    public static final String PRE_SQL = "preSql";
+    public static final String POST_SQL = "postSql";
+    public static final String LOAD_PROPS = "loadProps";
+    public static final String MAX_BATCH_ROWS = "maxBatchRows";
+    public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
+    private final Configuration options;
+
+    public Key(final Configuration options) {
+        this.options = options;
+    }
+
+    public void doPretreatment() {
+        this.validateRequired();
+        this.validateStreamLoadUrl();
+    }
+
+    public String getJdbcUrl() {
+        return this.options.getString(JDBC_URL);
+    }
+
+    public String getDatabase() {
+        return this.options.getString(DATABASE);
+    }
+
+    public String getTable() {
+        return this.options.getString(TABLE);
+    }
+
+    public String getUsername() {
+        return this.options.getString(USERNAME);
+    }
+
+    public String getPassword() {
+        return this.options.getString(PASSWORD);
+    }
+
+    public List<String> getBeLoadUrlList() {
+        return this.options.getList(BE_LOAD_URL, String.class);
+    }
+
+    public List<String> getColumns() {
+        return this.options.getList(COLUMN, String.class);
+    }
+
+    public List<String> getPreSqlList() {
+        return this.options.getList(PRE_SQL, String.class);
+    }
+
+    public List<String> getPostSqlList() {
+        return this.options.getList(POST_SQL, String.class);
+    }
+
+    public Map<String, Object> getLoadProps() {
+        return this.options.getMap(LOAD_PROPS);
+    }
+
+    public int getBatchRows() {
+        final Integer rows = this.options.getInt(MAX_BATCH_ROWS);
+        return (null == rows) ? 500000 : rows;
+    }
+
+    public long getBatchByteSize() {
+        final Long size = this.options.getLong(MAX_BATCH_BYTE_SIZE);
+        return (null == size) ? 94371840L : size;
+    }
+
+
+    private void validateStreamLoadUrl() {
+        final List<String> urlList = this.getBeLoadUrlList();
+        for (final String host : urlList) {
+            if (host.split(":").length < 2) {
+                throw 
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "loadUrl的格式不正确,请输入 
`be_ip:be_http_ip;be_ip:be_http_ip`。");
+            }
+        }
+    }
+
+    private void validateRequired() {
+        final String[] requiredOptionKeys =  new String[] { USERNAME, 
PASSWORD, DATABASE, TABLE, COLUMN, BE_LOAD_URL };
+        for (final String optionKey : requiredOptionKeys) {
+            this.options.getNecessaryValue(optionKey, 
DBUtilErrorCode.REQUIRED_VALUE);
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/extension/DataX/doriswriter/src/main/resources/plugin.json 
b/extension/DataX/doriswriter/src/main/resources/plugin.json
new file mode 100644
index 0000000..9d2ad49
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+  "name": "doriswriter",
+  "class": "com.alibaba.datax.plugin.writer.doriswriter.DorisWriter",
+  "description": "",
+  "developer": ""
+}
diff --git 
a/extension/DataX/doriswriter/src/main/resources/plugin_job_template.json 
b/extension/DataX/doriswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 0000000..152f1ee
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,15 @@
+{
+  "name": "doriswriter",
+  "parameter": {
+    "username": "",
+    "password": "",
+    "database": "",
+    "table": "",
+    "column": [],
+    "preSql": [],
+    "postSql": [],
+    "jdbcUrl": "",
+    "beLoadUrl": [],
+    "loadProps": {}
+  }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to