This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2ffc9b8 [improvement] json load by line (#23)
2ffc9b8 is described below
commit 2ffc9b8a37ffbd05d33334df434001d18d5b177a
Author: madong <[email protected]>
AuthorDate: Fri Apr 8 09:29:40 2022 +0800
[improvement] json load by line (#23)
---
.../java/org/apache/doris/flink/sink/DorisSink.java | 4 +++-
.../apache/doris/flink/sink/writer/DorisStreamLoad.java | 17 ++---------------
.../apache/doris/flink/table/DorisDynamicTableSink.java | 3 +--
.../java/org/apache/doris/flink/DorisSinkExample.java | 2 ++
.../org/apache/doris/flink/DorisSinkSQLExample.java | 4 +++-
.../org/apache/doris/flink/DorisSourceSinkExample.java | 4 ++--
.../doris/flink/sink/writer/TestDorisStreamLoad.java | 2 +-
7 files changed, 14 insertions(+), 22 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index 0c0da3f..efb530e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -126,10 +126,12 @@ public class DorisSink<IN> implements Sink<IN,
DorisCommittable, DorisWriterStat
public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
- Preconditions.checkNotNull(dorisReadOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp());
+ if(dorisReadOptions == null) {
+ dorisReadOptions = DorisReadOptions.builder().build();
+ }
return new DorisSink<>(dorisOptions, dorisReadOptions,
dorisExecutionOptions, serializer);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index be9573f..7849559 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -58,6 +58,7 @@ import static
org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
/**
@@ -70,8 +71,6 @@ public class DorisStreamLoad implements Serializable {
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN =
"http://%s/api/%s/_stream_load_2pc";
- private static final byte[] JSON_ARRAY_START =
"[".getBytes(StandardCharsets.UTF_8);
- private static final byte[] JSON_ARRAY_END =
"]".getBytes(StandardCharsets.UTF_8);
private static final String JOB_EXIST_FINISHED = "FINISHED";
private String loadUrlStr;
@@ -86,7 +85,6 @@ public class DorisStreamLoad implements Serializable {
private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
- private final String format;
private boolean loadBatchFirstRecord;
public DorisStreamLoad(String hostPort,
@@ -109,12 +107,7 @@ public class DorisStreamLoad implements Serializable {
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new
ExecutorThreadFactory("stream-load-upload"));
this.recordStream = new RecordStream(executionOptions.getBufferSize(),
executionOptions.getBufferCount());
- this.format =
executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, CSV);
- if (JSON.equals(format)) {
- lineDelimiter = ",".getBytes();
- } else {
- lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
"\n").getBytes();
- }
+ lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
loadBatchFirstRecord = true;
}
@@ -219,9 +212,6 @@ public class DorisStreamLoad implements Serializable {
}
public RespContent stopLoad() throws IOException{
- if (JSON.equals(format)) {
- recordStream.write(JSON_ARRAY_END);
- }
recordStream.endInput();
LOG.info("stream load stopped.");
Preconditions.checkState(pendingLoadFuture != null);
@@ -261,9 +251,6 @@ public class DorisStreamLoad implements Serializable {
LOG.warn(err, e);
throw e;
}
- if (JSON.equals(format)) {
- recordStream.write(JSON_ARRAY_START);
- }
}
private void abortTransaction(long txnID) throws Exception {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 81e42ad..53d727f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -77,8 +77,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Properties loadProperties = executionOptions.getStreamLoadProp();
- boolean deletable = RestService.isUniqueKeyType(options, readOptions,
LOG) && executionOptions.getDeletable();
-
+ boolean deletable = RestService.isUniqueKeyType(options, readOptions,
LOG) || executionOptions.getDeletable();
if (!loadProperties.containsKey(COLUMNS_KEY)) {
String[] fieldNames = tableSchema.getFieldNames();
Preconditions.checkState(fieldNames != null && fieldNames.length >
0);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
index e71b85b..9c459b7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
@@ -21,6 +21,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
@@ -40,6 +41,7 @@ public class DorisSinkExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.enableCheckpointing(10000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
Time.milliseconds(30000)));
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
index 8965e97..87da571 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
@@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.flink;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,6 +32,7 @@ public class DorisSinkSQLExample {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
List<Tuple2<String, Integer>> data = new ArrayList<>();
@@ -53,7 +55,7 @@ public class DorisSinkSQLExample {
" 'sink.buffer-count' = '4',\n" +
" 'sink.buffer-size' = '4086'," +
" 'sink.label-prefix' = 'doris_label',\n" +
- " 'sink.properties.strip_outer_array' = 'true'\n" +
+ " 'sink.properties.read_json_by_line' = 'true'\n" +
")");
tEnv.executeSql("INSERT INTO doris_test_sink select name,age from
doris_test");
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index c4ce1a5..60524c8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -55,8 +55,8 @@ public class DorisSourceSinkExample {
" 'table.identifier' = 'db.table',\n" +
" 'username' = 'root',\n" +
" 'password' = '',\n" +
- " 'sink.batch.size' = '3',\n" +
- " 'sink.max-retries' = '2'\n" +
+ " 'sink.properties.format' = 'csv',\n" +
+ " 'sink.label-prefix' = 'doris_csv_table'\n" +
")");
tEnv.executeSql("INSERT INTO doris_test_sink select
name,age,price,sale from doris_test");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index d1c6276..b401a5b 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -106,7 +106,7 @@ public class TestDorisStreamLoad {
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "json");
executionOptions = OptionUtils.buildExecutionOptional(properties);
- byte[] expectBuffer = "[{\"id\": 1},{\"id\":
2}]".getBytes(StandardCharsets.UTF_8);
+ byte[] expectBuffer = "{\"id\": 1}\n{\"id\":
2}".getBytes(StandardCharsets.UTF_8);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse preCommitResponse =
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]