This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a8a85b  [Optimize][Extension] optimize extension datax 
doriswriter,Remove import doris via csv in Dataxwriter, only support via json 
(#7568)
3a8a85b is described below

commit 3a8a85b7396dcbe4aabffe5b8bd134e25816a934
Author: weajun <[email protected]>
AuthorDate: Sun Jan 9 13:27:52 2022 +0800

    [Optimize][Extension] optimize extension datax doriswriter,Remove import 
doris via csv in Dataxwriter, only support via json (#7568)
    
    * 1.Remove import doris via csv in Dataxwriter, only support via json;
    2.Format Dataxwriter code;
    3.Optimize exception handling and reduce multiple output of exception logs;
    4.Update the dataxwriter's documentation;
    
    * Delete DorisCsvCodec.java
    
    delete unused file 
extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
    
    * 1.remove `format` config key;
    2.Optimize serialization code in DorisJsonCodec class
---
 docs/en/extending-doris/datax.md                   |   4 +-
 docs/zh-CN/extending-doris/datax.md                |   4 +-
 extension/DataX/doriswriter/doc/doriswriter.md     |  12 ---
 .../plugin/writer/doriswriter/DorisCodec.java      |  34 +++---
 .../plugin/writer/doriswriter/DorisCsvCodec.java   |  52 ---------
 .../plugin/writer/doriswriter/DorisFlushBatch.java | 108 ++++++++++---------
 .../plugin/writer/doriswriter/DorisJsonCodec.java  |  42 ++++----
 .../plugin/writer/doriswriter/DorisWriter.java     | 117 +++++++++------------
 .../writer/doriswriter/DorisWriterEmitter.java     |  67 ++++++------
 .../datax/plugin/writer/doriswriter/Key.java       |  56 +++-------
 10 files changed, 194 insertions(+), 302 deletions(-)

diff --git a/docs/en/extending-doris/datax.md b/docs/en/extending-doris/datax.md
index 1776f87..c8762d6 100644
--- a/docs/en/extending-doris/datax.md
+++ b/docs/en/extending-doris/datax.md
@@ -52,7 +52,7 @@ Because the doriswriter plug-in depends on some modules in 
the DataX code base,
 
     The help doc can be found in `doriswriter/doc`
 
-2. `init_env.sh`
+2. `init-env.sh`
 
     The script mainly performs the following steps:
 
@@ -67,7 +67,7 @@ Because the doriswriter plug-in depends on some modules in 
the DataX code base,
 
 ### How to build
 
-1. Run `init_env.sh`
+1. Run `init-env.sh`
 2. Modify code of doriswriter in `DataX/doriswriter` if you need.
 3. Build doriswriter
 
diff --git a/docs/zh-CN/extending-doris/datax.md 
b/docs/zh-CN/extending-doris/datax.md
index 975fa7e..6c11f5b 100644
--- a/docs/zh-CN/extending-doris/datax.md
+++ b/docs/zh-CN/extending-doris/datax.md
@@ -52,7 +52,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
 
     doriswriter 插件帮助文档在这里:`doriswriter/doc`
 
-2. `init_env.sh`
+2. `init-env.sh`
 
     这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:
     
@@ -67,7 +67,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
 
 ### 编译
 
-1. 运行 `init_env.sh`
+1. 运行 `init-env.sh`
 2. 按需修改 `DataX/doriswriter` 中的代码。
 3. 编译 doriswriter:
 
diff --git a/extension/DataX/doriswriter/doc/doriswriter.md 
b/extension/DataX/doriswriter/doc/doriswriter.md
index 1bf9590..a878390 100644
--- a/extension/DataX/doriswriter/doc/doriswriter.md
+++ b/extension/DataX/doriswriter/doc/doriswriter.md
@@ -164,18 +164,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
   - 必选:否
   - 默认值:`\n`
   
-* **format**
-
-  - 描述:导入数据的格式, 可以使是json或者csv。
-  - 必选:否
-  - 默认值:`json`
-  
-* **columnSeparator**
-
-  - 描述:当导入的格式是csv时, 字段之间的分隔符。支持多个字节, 例如'\x01\x02'。
-  - 必选:否
-  - 默认值:`\t`
-
 * **loadProps**
 
   - 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
index fd14e56..9a364f4 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
@@ -1,22 +1,20 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
 package com.alibaba.datax.plugin.writer.doriswriter;
 
 import com.alibaba.datax.common.element.Column;
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
deleted file mode 100644
index b7c6f76..0000000
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
-package com.alibaba.datax.plugin.writer.doriswriter;
-
-import com.alibaba.datax.common.element.Record;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// Convert DataX data to csv
-public class DorisCsvCodec extends DorisCodec {
-
-    private final String columnSeparator;
-
-    public DorisCsvCodec(final List<String> fieldNames, String 
columnSeparator) {
-        super(fieldNames);
-        this.columnSeparator = columnSeparator;
-    }
-
-    @Override
-    public String serialize(final Record row) {
-        if (null == this.fieldNames) {
-            return "";
-        }
-        List<String> list = new ArrayList<>();
-
-        for (int i = 0; i < this.fieldNames.size(); i++) {
-            Object value = this.convertColumn(row.getColumn(i));
-            list.add(value != null ? value.toString() : "\\N");
-        }
-
-        return String.join(columnSeparator, list);
-    }
-
-}
\ No newline at end of file
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
index 7c56fbe..493ddec 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
@@ -1,60 +1,58 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package com.alibaba.datax.plugin.writer.doriswriter;
 
 // A wrapper class to hold a batch of loaded rows
 public class DorisFlushBatch {
-       private String lineDelimiter;
-       private String label;
-       private long rows = 0;
-       private StringBuilder data = new StringBuilder();
-
-       public DorisFlushBatch(String lineDelimiter) {
-               this.lineDelimiter = lineDelimiter;
-       }
-
-       public void setLabel(String label) {
-               this.label = label;
-       }
-
-       public String getLabel() {
-               return label;
-       }
-
-       public long getRows() {
-               return rows;
-       }
-
-       public void putData(String row) {
-               if (data.length() > 0) {
-                       data.append(lineDelimiter);
-               }
-               data.append(row);
-               rows++;
-       }
-
-       public StringBuilder getData() {
-               return data;
-       }
-
-       public long getSize() {
-               return data.length();
-       }
+    private String lineDelimiter;
+    private String label;
+    private long rows = 0;
+    private StringBuilder data = new StringBuilder();
+
+    public DorisFlushBatch(String lineDelimiter) {
+        this.lineDelimiter = lineDelimiter;
+    }
+
+    public void setLabel(String label) {
+        this.label = label;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public long getRows() {
+        return rows;
+    }
+
+    public void putData(String row) {
+        if (data.length() > 0) {
+            data.append(lineDelimiter);
+        }
+        data.append(row);
+        rows++;
+    }
+
+    public StringBuilder getData() {
+        return data;
+    }
+
+    public long getSize() {
+        return data.length();
+    }
 }
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
index 68a2cbc..8d4568c 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -1,22 +1,20 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
 package com.alibaba.datax.plugin.writer.doriswriter;
 
 import com.alibaba.datax.common.element.Record;
@@ -28,9 +26,11 @@ import java.util.Map;
 
 // Convert DataX data to json
 public class DorisJsonCodec extends DorisCodec {
+    private Map<String, Object> rowMap;
 
     public DorisJsonCodec(final List<String> fieldNames) {
         super(fieldNames);
+        this.rowMap = new HashMap<>(this.fieldNames.size());
     }
 
     @Override
@@ -38,7 +38,8 @@ public class DorisJsonCodec extends DorisCodec {
         if (null == this.fieldNames) {
             return "";
         }
-        final Map<String, Object> rowMap = new HashMap<String, 
Object>(this.fieldNames.size());
+
+        rowMap.clear();
         int idx = 0;
         for (final String fieldName : this.fieldNames) {
             rowMap.put(fieldName, this.convertColumn(row.getColumn(idx)));
@@ -46,5 +47,4 @@ public class DorisJsonCodec extends DorisCodec {
         }
         return JSON.toJSONString(rowMap);
     }
-
-}
\ No newline at end of file
+}
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
index be96c5b..71e2d1a 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -1,22 +1,19 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 
 package com.alibaba.datax.plugin.writer.doriswriter;
 
@@ -32,11 +29,9 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
 import com.alibaba.datax.plugin.rdbms.writer.Constant;
 import com.alibaba.druid.sql.parser.ParserException;
 import com.google.common.base.Strings;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -49,8 +44,6 @@ public class DorisWriter extends Writer {
     }
 
     public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
-        private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.Task.class);
-
         private DorisWriterEmitter dorisWriterEmitter;
         private Key keys;
         private DorisCodec rowCodec;
@@ -62,11 +55,7 @@ public class DorisWriter extends Writer {
         @Override
         public void init() {
             this.keys = new Key(super.getPluginJobConf());
-            if("csv".equalsIgnoreCase(this.keys.getFormat())){
-                this.rowCodec = new 
DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
-            }else{
-                this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
-            }
+            this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
             this.dorisWriterEmitter = new DorisWriterEmitter(keys);
         }
 
@@ -77,47 +66,43 @@ public class DorisWriter extends Writer {
         @Override
         public void startWrite(RecordReceiver recordReceiver) {
             String lineDelimiter = this.keys.getLineDelimiter();
-            try {
-                DorisFlushBatch flushBatch = new 
DorisFlushBatch(lineDelimiter);
-                long batchCount = 0;
-                long batchByteSize = 0L;
-                Record record;
-                // loop to get record from datax
-                while ((record = recordReceiver.getFromReader()) != null) {
-                    // check column size
-                    if (record.getColumnNumber() != 
this.keys.getColumns().size()) {
-                        throw 
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
-                                String.format("config writer column info 
error. because the column number of reader is :%s" +
-                                                "and the column number of 
writer is:%s. please check you datax job config json.",
-                                        record.getColumnNumber(), 
this.keys.getColumns().size()));
-                    }
-                    // codec record
-                    final String recordStr = this.rowCodec.serialize(record);
-
-                    // put into buffer
-                    flushBatch.putData(recordStr);
-                    batchCount += 1;
-                    batchByteSize += recordStr.length();
-                    // trigger buffer
-                    if (batchCount >= this.keys.getBatchRows() || 
batchByteSize >= this.keys.getBatchByteSize()) {
-                        // generate doris stream load label
-                        flush(flushBatch, batchCount, batchByteSize);
-                        // clear buffer
-                        batchCount = 0;
-                        batchByteSize = 0L;
-                        flushBatch = new DorisFlushBatch(lineDelimiter);
-                    }
-                } // end of while
-
-                if (flushBatch.getSize() > 0) {
-                    flush(flushBatch, batchCount, batchByteSize);
+            DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter);
+            long batchCount = 0;
+            long batchByteSize = 0L;
+            Record record;
+            // loop to get record from datax
+            while ((record = recordReceiver.getFromReader()) != null) {
+                // check column size
+                if (record.getColumnNumber() != this.keys.getColumns().size()) 
{
+                    throw 
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+                            String.format("config writer column info error. 
because the column number of reader is :%s" +
+                                            "and the column number of writer 
is:%s. please check you datax job config json.",
+                                    record.getColumnNumber(), 
this.keys.getColumns().size()));
                 }
-            } catch (Exception e) {
-                throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+                // codec record
+                final String recordStr = this.rowCodec.serialize(record);
+
+                // put into buffer
+                flushBatch.putData(recordStr);
+                batchCount += 1;
+                batchByteSize += recordStr.length();
+                // trigger buffer
+                if (batchCount >= this.keys.getBatchRows() || batchByteSize >= 
this.keys.getBatchByteSize()) {
+                    // generate doris stream load label
+                    flush(flushBatch);
+                    // clear buffer
+                    batchCount = 0;
+                    batchByteSize = 0L;
+                    flushBatch = new DorisFlushBatch(lineDelimiter);
+                }
+            } // end of while
+
+            if (flushBatch.getSize() > 0) {
+                flush(flushBatch);
             }
         }
 
-        private void flush(DorisFlushBatch flushBatch, long batchCount, long 
batchByteSize) throws IOException {
+        private void flush(DorisFlushBatch flushBatch) {
             final String label = getStreamLoadLabel();
             flushBatch.setLabel(label);
             dorisWriterEmitter.doStreamLoad(flushBatch);
@@ -271,4 +256,4 @@ public class DorisWriter extends Writer {
         }
 
     }
-}
\ No newline at end of file
+}
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
index 6e9e967..fb4e46b 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -1,22 +1,19 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
 
 package com.alibaba.datax.plugin.writer.doriswriter;
 
@@ -26,7 +23,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpRequest;
@@ -72,8 +68,9 @@ public class DorisWriterEmitter {
         initHostList();
         initRequestConfig();
     }
-    private void initRequestConfig(){
-        
requestConfig=RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
+
+    private void initRequestConfig() {
+        requestConfig = 
RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
     }
 
     // get target host from config
@@ -94,24 +91,29 @@ public class DorisWriterEmitter {
     /**
      * execute doris stream load
      */
-    public void doStreamLoad(final DorisFlushBatch flushData) throws 
IOException {
+    public void doStreamLoad(final DorisFlushBatch flushData) {
         long start = System.currentTimeMillis();
         final String host = this.getAvailableHost();
         if (null == host) {
-            throw new IOException("None of the load url can be connected.");
+            throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the 
load url can be connected.");
         }
         final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" 
+ this.keys.getTable() + "/_stream_load";
         // do http put request and get response
-        final Map<String, Object> loadResult = this.doHttpPut(loadUrl, 
flushData);
+        final Map<String, Object> loadResult;
+        try {
+            loadResult = this.doHttpPut(loadUrl, flushData);
+        } catch (IOException e) {
+            throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+        }
 
         long cost = System.currentTimeMillis() - start;
         LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ", 
cost(ms): " + cost);
         final String keyStatus = "Status";
         if (null == loadResult || !loadResult.containsKey(keyStatus)) {
-            throw new IOException("Unable to flush data to doris: unknown 
result status.");
+            throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Unable to 
flush data to doris: unknown result status.");
         }
         if (loadResult.get(keyStatus).equals("Fail")) {
-            throw new IOException("Failed to flush data to doris.\n" + 
JSON.toJSONString(loadResult));
+            throw 
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Failed to 
flush data to doris.\n" + JSON.toJSONString(loadResult));
         }
     }
 
@@ -194,15 +196,10 @@ public class DorisWriterEmitter {
             httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
             httpPut.setHeader(HttpHeaders.AUTHORIZATION, 
this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
             httpPut.setHeader("label", flushBatch.getLabel());
-            httpPut.setHeader("format", this.keys.getFormat());
+            httpPut.setHeader("format", "json");
             httpPut.setHeader("line_delimiter", 
this.keys.getLineDelimiterDesc());
-
-            if ("csv".equalsIgnoreCase(this.keys.getFormat())) {
-                httpPut.setHeader("column_separator", 
this.keys.getColumnSeparatorDesc());
-            } else {
-                httpPut.setHeader("read_json_by_line", "true");
-                httpPut.setHeader("fuzzy_parse", "true");
-            }
+            httpPut.setHeader("read_json_by_line", "true");
+            httpPut.setHeader("fuzzy_parse", "true");
 
             // Use ByteArrayEntity instead of StringEntity to handle Chinese 
correctly
             httpPut.setEntity(new 
ByteArrayEntity(flushBatch.getData().toString().getBytes()));
diff --git 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
index 79b83c1..cfbef96 100644
--- 
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
+++ 
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
@@ -1,22 +1,20 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
-  -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package com.alibaba.datax.plugin.writer.doriswriter;
 
 import com.alibaba.datax.common.exception.DataXException;
@@ -48,24 +46,18 @@ public class Key implements Serializable {
     public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
     public static final String LABEL_PREFIX = "labelPrefix";
     public static final String LINE_DELIMITER = "lineDelimiter";
-    public static final String COLUMN_SEPARATOR = "columnSeparator";
-    public static final String FORMAT = "format";
     public static final String CONNECT_TIMEOUT = "connectTimeout";
     private final Configuration options;
-    private final String columnSeparatorDesc;
     private final String lineDelimiterDesc;
 
     private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000;
     private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; 
// 100MB
     private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
-    private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
-    private static final String DEFAULT_FORMAT = "json";
     private static final int DEFAULT_CONNECT_TIMEOUT = -1;
 
     public Key(final Configuration options) {
         this.options = options;
-        this.columnSeparatorDesc = parseHexReadable(this.getColumnSeparator());
         this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter());
     }
 
@@ -134,23 +126,10 @@ public class Key implements Serializable {
         return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
     }
 
-    public String getFormat() {
-        return this.options.getString(FORMAT, DEFAULT_FORMAT);
-    }
-
-    public String getColumnSeparator() {
-        return this.options.getString(COLUMN_SEPARATOR, 
DEFAULT_COLUMN_SEPARATOR);
-    }
-
     public int getConnectTimeout() {
         return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
     }
 
-
-    public String getColumnSeparatorDesc() {
-        return columnSeparatorDesc;
-    }
-
     public String getLineDelimiterDesc() {
         return lineDelimiterDesc;
     }
@@ -190,4 +169,3 @@ public class Key implements Serializable {
         }
     }
 }
-

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to