This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3a8a85b [Optimize][Extension] optimize extension datax
doriswriter,Remove import doris via csv in Dataxwriter, only support via json
(#7568)
3a8a85b is described below
commit 3a8a85b7396dcbe4aabffe5b8bd134e25816a934
Author: weajun <[email protected]>
AuthorDate: Sun Jan 9 13:27:52 2022 +0800
[Optimize][Extension] optimize extension datax doriswriter,Remove import
doris via csv in Dataxwriter, only support via json (#7568)
* 1.Remove import doris via csv in Dataxwriter, only support via json;
2.Format Dataxwriter code;
3.Optimize exception handling and reduce multiple output of exception logs;
4.Update the dataxwriter's documentation;
* Delete DorisCsvCodec.java
delete unused file
extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
* 1.remove `format` config key;
2.Optimize serialization code in DorisJsonCodec class
---
docs/en/extending-doris/datax.md | 4 +-
docs/zh-CN/extending-doris/datax.md | 4 +-
extension/DataX/doriswriter/doc/doriswriter.md | 12 ---
.../plugin/writer/doriswriter/DorisCodec.java | 34 +++---
.../plugin/writer/doriswriter/DorisCsvCodec.java | 52 ---------
.../plugin/writer/doriswriter/DorisFlushBatch.java | 108 ++++++++++---------
.../plugin/writer/doriswriter/DorisJsonCodec.java | 42 ++++----
.../plugin/writer/doriswriter/DorisWriter.java | 117 +++++++++------------
.../writer/doriswriter/DorisWriterEmitter.java | 67 ++++++------
.../datax/plugin/writer/doriswriter/Key.java | 56 +++-------
10 files changed, 194 insertions(+), 302 deletions(-)
diff --git a/docs/en/extending-doris/datax.md b/docs/en/extending-doris/datax.md
index 1776f87..c8762d6 100644
--- a/docs/en/extending-doris/datax.md
+++ b/docs/en/extending-doris/datax.md
@@ -52,7 +52,7 @@ Because the doriswriter plug-in depends on some modules in
the DataX code base,
The help doc can be found in `doriswriter/doc`
-2. `init_env.sh`
+2. `init-env.sh`
The script mainly performs the following steps:
@@ -67,7 +67,7 @@ Because the doriswriter plug-in depends on some modules in
the DataX code base,
### How to build
-1. Run `init_env.sh`
+1. Run `init-env.sh`
2. Modify code of doriswriter in `DataX/doriswriter` if you need.
3. Build doriswriter
diff --git a/docs/zh-CN/extending-doris/datax.md
b/docs/zh-CN/extending-doris/datax.md
index 975fa7e..6c11f5b 100644
--- a/docs/zh-CN/extending-doris/datax.md
+++ b/docs/zh-CN/extending-doris/datax.md
@@ -52,7 +52,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
doriswriter 插件帮助文档在这里:`doriswriter/doc`
-2. `init_env.sh`
+2. `init-env.sh`
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:
@@ -67,7 +67,7 @@ doriswriter 插件依赖的 DataX 代码中的一些模块。而这些模块并
### 编译
-1. 运行 `init_env.sh`
+1. 运行 `init-env.sh`
2. 按需修改 `DataX/doriswriter` 中的代码。
3. 编译 doriswriter:
diff --git a/extension/DataX/doriswriter/doc/doriswriter.md
b/extension/DataX/doriswriter/doc/doriswriter.md
index 1bf9590..a878390 100644
--- a/extension/DataX/doriswriter/doc/doriswriter.md
+++ b/extension/DataX/doriswriter/doc/doriswriter.md
@@ -164,18 +164,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
- 必选:否
- 默认值:`\n`
-* **format**
-
- - 描述:导入数据的格式, 可以使是json或者csv。
- - 必选:否
- - 默认值:`json`
-
-* **columnSeparator**
-
- - 描述:当导入的格式是csv时, 字段之间的分隔符。支持多个字节, 例如'\x01\x02'。
- - 必选:否
- - 默认值:`\t`
-
* **loadProps**
- 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
index fd14e56..9a364f4 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java
@@ -1,22 +1,20 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Column;
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
deleted file mode 100644
index b7c6f76..0000000
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
-package com.alibaba.datax.plugin.writer.doriswriter;
-
-import com.alibaba.datax.common.element.Record;
-
-import java.util.ArrayList;
-import java.util.List;
-
-// Convert DataX data to csv
-public class DorisCsvCodec extends DorisCodec {
-
- private final String columnSeparator;
-
- public DorisCsvCodec(final List<String> fieldNames, String
columnSeparator) {
- super(fieldNames);
- this.columnSeparator = columnSeparator;
- }
-
- @Override
- public String serialize(final Record row) {
- if (null == this.fieldNames) {
- return "";
- }
- List<String> list = new ArrayList<>();
-
- for (int i = 0; i < this.fieldNames.size(); i++) {
- Object value = this.convertColumn(row.getColumn(i));
- list.add(value != null ? value.toString() : "\\N");
- }
-
- return String.join(columnSeparator, list);
- }
-
-}
\ No newline at end of file
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
index 7c56fbe..493ddec 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java
@@ -1,60 +1,58 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
package com.alibaba.datax.plugin.writer.doriswriter;
// A wrapper class to hold a batch of loaded rows
public class DorisFlushBatch {
- private String lineDelimiter;
- private String label;
- private long rows = 0;
- private StringBuilder data = new StringBuilder();
-
- public DorisFlushBatch(String lineDelimiter) {
- this.lineDelimiter = lineDelimiter;
- }
-
- public void setLabel(String label) {
- this.label = label;
- }
-
- public String getLabel() {
- return label;
- }
-
- public long getRows() {
- return rows;
- }
-
- public void putData(String row) {
- if (data.length() > 0) {
- data.append(lineDelimiter);
- }
- data.append(row);
- rows++;
- }
-
- public StringBuilder getData() {
- return data;
- }
-
- public long getSize() {
- return data.length();
- }
+ private String lineDelimiter;
+ private String label;
+ private long rows = 0;
+ private StringBuilder data = new StringBuilder();
+
+ public DorisFlushBatch(String lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public long getRows() {
+ return rows;
+ }
+
+ public void putData(String row) {
+ if (data.length() > 0) {
+ data.append(lineDelimiter);
+ }
+ data.append(row);
+ rows++;
+ }
+
+ public StringBuilder getData() {
+ return data;
+ }
+
+ public long getSize() {
+ return data.length();
+ }
}
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
index 68a2cbc..8d4568c 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java
@@ -1,22 +1,20 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
@@ -28,9 +26,11 @@ import java.util.Map;
// Convert DataX data to json
public class DorisJsonCodec extends DorisCodec {
+ private Map<String, Object> rowMap;
public DorisJsonCodec(final List<String> fieldNames) {
super(fieldNames);
+ this.rowMap = new HashMap<>(this.fieldNames.size());
}
@Override
@@ -38,7 +38,8 @@ public class DorisJsonCodec extends DorisCodec {
if (null == this.fieldNames) {
return "";
}
- final Map<String, Object> rowMap = new HashMap<String,
Object>(this.fieldNames.size());
+
+ rowMap.clear();
int idx = 0;
for (final String fieldName : this.fieldNames) {
rowMap.put(fieldName, this.convertColumn(row.getColumn(idx)));
@@ -46,5 +47,4 @@ public class DorisJsonCodec extends DorisCodec {
}
return JSON.toJSONString(rowMap);
}
-
-}
\ No newline at end of file
+}
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
index be96c5b..71e2d1a 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
@@ -1,22 +1,19 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
@@ -32,11 +29,9 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.base.Strings;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
@@ -49,8 +44,6 @@ public class DorisWriter extends Writer {
}
public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
- private static final Logger LOG =
LoggerFactory.getLogger(DorisWriter.Task.class);
-
private DorisWriterEmitter dorisWriterEmitter;
private Key keys;
private DorisCodec rowCodec;
@@ -62,11 +55,7 @@ public class DorisWriter extends Writer {
@Override
public void init() {
this.keys = new Key(super.getPluginJobConf());
- if("csv".equalsIgnoreCase(this.keys.getFormat())){
- this.rowCodec = new
DorisCsvCodec(this.keys.getColumns(),this.keys.getColumnSeparator());
- }else{
- this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
- }
+ this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
this.dorisWriterEmitter = new DorisWriterEmitter(keys);
}
@@ -77,47 +66,43 @@ public class DorisWriter extends Writer {
@Override
public void startWrite(RecordReceiver recordReceiver) {
String lineDelimiter = this.keys.getLineDelimiter();
- try {
- DorisFlushBatch flushBatch = new
DorisFlushBatch(lineDelimiter);
- long batchCount = 0;
- long batchByteSize = 0L;
- Record record;
- // loop to get record from datax
- while ((record = recordReceiver.getFromReader()) != null) {
- // check column size
- if (record.getColumnNumber() !=
this.keys.getColumns().size()) {
- throw
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
- String.format("config writer column info
error. because the column number of reader is :%s" +
- "and the column number of
writer is:%s. please check you datax job config json.",
- record.getColumnNumber(),
this.keys.getColumns().size()));
- }
- // codec record
- final String recordStr = this.rowCodec.serialize(record);
-
- // put into buffer
- flushBatch.putData(recordStr);
- batchCount += 1;
- batchByteSize += recordStr.length();
- // trigger buffer
- if (batchCount >= this.keys.getBatchRows() ||
batchByteSize >= this.keys.getBatchByteSize()) {
- // generate doris stream load label
- flush(flushBatch, batchCount, batchByteSize);
- // clear buffer
- batchCount = 0;
- batchByteSize = 0L;
- flushBatch = new DorisFlushBatch(lineDelimiter);
- }
- } // end of while
-
- if (flushBatch.getSize() > 0) {
- flush(flushBatch, batchCount, batchByteSize);
+ DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter);
+ long batchCount = 0;
+ long batchByteSize = 0L;
+ Record record;
+ // loop to get record from datax
+ while ((record = recordReceiver.getFromReader()) != null) {
+ // check column size
+ if (record.getColumnNumber() != this.keys.getColumns().size())
{
+ throw
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+ String.format("config writer column info error.
because the column number of reader is :%s" +
+ "and the column number of writer
is:%s. please check you datax job config json.",
+ record.getColumnNumber(),
this.keys.getColumns().size()));
}
- } catch (Exception e) {
- throw
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ // codec record
+ final String recordStr = this.rowCodec.serialize(record);
+
+ // put into buffer
+ flushBatch.putData(recordStr);
+ batchCount += 1;
+ batchByteSize += recordStr.length();
+ // trigger buffer
+ if (batchCount >= this.keys.getBatchRows() || batchByteSize >=
this.keys.getBatchByteSize()) {
+ // generate doris stream load label
+ flush(flushBatch);
+ // clear buffer
+ batchCount = 0;
+ batchByteSize = 0L;
+ flushBatch = new DorisFlushBatch(lineDelimiter);
+ }
+ } // end of while
+
+ if (flushBatch.getSize() > 0) {
+ flush(flushBatch);
}
}
- private void flush(DorisFlushBatch flushBatch, long batchCount, long
batchByteSize) throws IOException {
+ private void flush(DorisFlushBatch flushBatch) {
final String label = getStreamLoadLabel();
flushBatch.setLabel(label);
dorisWriterEmitter.doStreamLoad(flushBatch);
@@ -271,4 +256,4 @@ public class DorisWriter extends Writer {
}
}
-}
\ No newline at end of file
+}
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
index 6e9e967..fb4e46b 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
@@ -1,22 +1,19 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
@@ -26,7 +23,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
@@ -72,8 +68,9 @@ public class DorisWriterEmitter {
initHostList();
initRequestConfig();
}
- private void initRequestConfig(){
-
requestConfig=RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
+
+ private void initRequestConfig() {
+ requestConfig =
RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
}
// get target host from config
@@ -94,24 +91,29 @@ public class DorisWriterEmitter {
/**
* execute doris stream load
*/
- public void doStreamLoad(final DorisFlushBatch flushData) throws
IOException {
+ public void doStreamLoad(final DorisFlushBatch flushData) {
long start = System.currentTimeMillis();
final String host = this.getAvailableHost();
if (null == host) {
- throw new IOException("None of the load url can be connected.");
+ throw
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the
load url can be connected.");
}
final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/"
+ this.keys.getTable() + "/_stream_load";
// do http put request and get response
- final Map<String, Object> loadResult = this.doHttpPut(loadUrl,
flushData);
+ final Map<String, Object> loadResult;
+ try {
+ loadResult = this.doHttpPut(loadUrl, flushData);
+ } catch (IOException e) {
+ throw
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+ }
long cost = System.currentTimeMillis() - start;
LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ",
cost(ms): " + cost);
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
- throw new IOException("Unable to flush data to doris: unknown
result status.");
+ throw
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Unable to
flush data to doris: unknown result status.");
}
if (loadResult.get(keyStatus).equals("Fail")) {
- throw new IOException("Failed to flush data to doris.\n" +
JSON.toJSONString(loadResult));
+ throw
DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Failed to
flush data to doris.\n" + JSON.toJSONString(loadResult));
}
}
@@ -194,15 +196,10 @@ public class DorisWriterEmitter {
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.AUTHORIZATION,
this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
httpPut.setHeader("label", flushBatch.getLabel());
- httpPut.setHeader("format", this.keys.getFormat());
+ httpPut.setHeader("format", "json");
httpPut.setHeader("line_delimiter",
this.keys.getLineDelimiterDesc());
-
- if ("csv".equalsIgnoreCase(this.keys.getFormat())) {
- httpPut.setHeader("column_separator",
this.keys.getColumnSeparatorDesc());
- } else {
- httpPut.setHeader("read_json_by_line", "true");
- httpPut.setHeader("fuzzy_parse", "true");
- }
+ httpPut.setHeader("read_json_by_line", "true");
+ httpPut.setHeader("fuzzy_parse", "true");
// Use ByteArrayEntity instead of StringEntity to handle Chinese
correctly
httpPut.setEntity(new
ByteArrayEntity(flushBatch.getData().toString().getBytes()));
diff --git
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
index 79b83c1..cfbef96 100644
---
a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
+++
b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java
@@ -1,22 +1,20 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- */
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.exception.DataXException;
@@ -48,24 +46,18 @@ public class Key implements Serializable {
public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
public static final String LABEL_PREFIX = "labelPrefix";
public static final String LINE_DELIMITER = "lineDelimiter";
- public static final String COLUMN_SEPARATOR = "columnSeparator";
- public static final String FORMAT = "format";
public static final String CONNECT_TIMEOUT = "connectTimeout";
private final Configuration options;
- private final String columnSeparatorDesc;
private final String lineDelimiterDesc;
private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000;
private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024;
// 100MB
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
private static final String DEFAULT_LINE_DELIMITER = "\n";
- private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
- private static final String DEFAULT_FORMAT = "json";
private static final int DEFAULT_CONNECT_TIMEOUT = -1;
public Key(final Configuration options) {
this.options = options;
- this.columnSeparatorDesc = parseHexReadable(this.getColumnSeparator());
this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter());
}
@@ -134,23 +126,10 @@ public class Key implements Serializable {
return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
}
- public String getFormat() {
- return this.options.getString(FORMAT, DEFAULT_FORMAT);
- }
-
- public String getColumnSeparator() {
- return this.options.getString(COLUMN_SEPARATOR,
DEFAULT_COLUMN_SEPARATOR);
- }
-
public int getConnectTimeout() {
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
-
- public String getColumnSeparatorDesc() {
- return columnSeparatorDesc;
- }
-
public String getLineDelimiterDesc() {
return lineDelimiterDesc;
}
@@ -190,4 +169,3 @@ public class Key implements Serializable {
}
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]